implemented subscriptions module

This commit is contained in:
d1nch8g
2026-05-09 01:59:21 +03:00
parent 8acdee4b60
commit 48550ea3a4
5 changed files with 463 additions and 18 deletions
+2 -2
View File
@@ -56,8 +56,8 @@ type Subscriptions interface {
}
type Subscription struct {
Email string `json:"email" gob:"1"`
Signature []byte `json:"signature" gob:"2"`
Email string `json:"email"`
Signature []byte `json:"signature"`
}
type SubscriptionReference struct {
+9 -7
View File
@@ -1,6 +1,8 @@
package pebble
import (
"fmt"
"github.com/cockroachdb/pebble"
"github.com/d1nch8g/mesh/database"
)
@@ -8,8 +10,8 @@ import (
type DB struct {
pebble *pebble.DB
users *users
// subscriptions *subscriptions
users *users
subscriptions *subscriptions
// files *files
// emails *emails
// messages *messages
@@ -21,14 +23,14 @@ type DB struct {
func New(path string) (*DB, error) {
p, err := pebble.Open(path, &pebble.Options{})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to open db: %w", err)
}
return &DB{
pebble: p,
users: &users{db: p},
// subscriptions: &subscriptions{db: p},
users: &users{db: p},
subscriptions: &subscriptions{db: p},
// files: &files{db: p},
// emails: &emails{db: p},
// messages: &messages{db: p},
@@ -57,9 +59,9 @@ func key(user, domain string, args ...string) []byte {
return b
}
func (d *DB) Users() database.Users { return d.users }
func (d *DB) Users() database.Users { return d.users }
func (d *DB) Subscriptions() database.Subscriptions { return d.subscriptions }
// func (d *DB) Subscriptions() database.Subscriptions { return d.subscriptions }
// func (d *DB) Files() database.Files { return d.files }
// func (d *DB) Emails() database.Emails { return d.emails }
// func (d *DB) Messages() database.Messages { return d.messages }
+167
View File
@@ -1,9 +1,176 @@
package pebble
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/d1nch8g/mesh/database"
)
const subsPrefix = "subs"
type subscriptions struct {
db *pebble.DB
}
func (s *subscriptions) Subscribe(user, targetEmail string, sig []byte) error {
k := key(user, subsPrefix, "out", targetEmail)
if err := s.db.Set(k, sig, pebble.NoSync); err != nil {
return fmt.Errorf("store subscription %q -> %q: %w", user, targetEmail, err)
}
return nil
}
func (s *subscriptions) Unsubscribe(user, targetEmail string) error {
k := key(user, subsPrefix, "out", targetEmail)
_, closer, err := s.db.Get(k)
if err != nil {
if err == pebble.ErrNotFound {
return database.ErrNotFound
}
return fmt.Errorf("check subscription %q -> %q: %w", user, targetEmail, err)
}
closer.Close()
if err := s.db.Delete(k, pebble.NoSync); err != nil {
return fmt.Errorf("delete subscription %q -> %q: %w", user, targetEmail, err)
}
return nil
}
func (s *subscriptions) OfUser(user string) ([]database.Subscription, error) {
prefix := key(user, subsPrefix, "out")
iter, err := s.db.NewIter(&pebble.IterOptions{
LowerBound: prefix,
UpperBound: append(prefix, 0xFF),
})
if err != nil {
return nil, fmt.Errorf("scan subscriptions of %q: %w", user, err)
}
defer iter.Close()
var subs []database.Subscription
for iter.First(); iter.Valid(); iter.Next() {
keyParts := bytes.Split(iter.Key(), []byte(":"))
targetEmail := string(keyParts[len(keyParts)-1])
subs = append(subs, database.Subscription{
Email: targetEmail,
Signature: iter.Value(),
})
}
return subs, nil
}
func (s *subscriptions) ToUser(user string) ([]database.Subscription, error) {
prefix := key(user, subsPrefix, "in")
iter, err := s.db.NewIter(&pebble.IterOptions{
LowerBound: prefix,
UpperBound: append(prefix, 0xFF),
})
if err != nil {
return nil, fmt.Errorf("scan subscribers to %q: %w", user, err)
}
defer iter.Close()
var subs []database.Subscription
for iter.First(); iter.Valid(); iter.Next() {
var sub database.Subscription
if err := gob.NewDecoder(bytes.NewReader(iter.Value())).Decode(&sub); err != nil {
return nil, fmt.Errorf("decode subscriber to %q: %w", user, err)
}
subs = append(subs, sub)
}
return subs, nil
}
func (s *subscriptions) UpdateReference(user, domain string, delta int) error {
domainKey := key(user, subsPrefix, "ref", domain)
var domainCount int64
data, closer, err := s.db.Get(domainKey)
if err == nil {
domainCount = int64(binary.LittleEndian.Uint64(data))
closer.Close()
} else if err != pebble.ErrNotFound {
return fmt.Errorf("read reference %q -> %q: %w", user, domain, err)
}
newDomainCount := domainCount + int64(delta)
if newDomainCount < 0 {
newDomainCount = 0
}
totalKey := key(user, subsPrefix, "total")
var totalCount int64
data, closer, err = s.db.Get(totalKey)
if err == nil {
totalCount = int64(binary.LittleEndian.Uint64(data))
closer.Close()
} else if err != pebble.ErrNotFound {
return fmt.Errorf("read total for %q: %w", user, err)
}
newTotal := totalCount + int64(delta)
if newTotal < 0 {
newTotal = 0
}
batch := s.db.NewBatch()
defer batch.Close()
if newDomainCount == 0 {
_ = batch.Delete(domainKey, pebble.NoSync)
} else {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], uint64(newDomainCount))
_ = batch.Set(domainKey, buf[:], pebble.NoSync)
}
if newTotal == 0 {
_ = batch.Delete(totalKey, pebble.NoSync)
} else {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], uint64(newTotal))
_ = batch.Set(totalKey, buf[:], pebble.NoSync)
}
if err := batch.Commit(pebble.NoSync); err != nil {
return fmt.Errorf("commit reference update for %q -> %q: %w", user, domain, err)
}
return nil
}
func (s *subscriptions) ListReferences(user string) ([]database.SubscriptionReference, error) {
prefix := key(user, subsPrefix, "ref")
iter, err := s.db.NewIter(&pebble.IterOptions{
LowerBound: prefix,
UpperBound: append(prefix, 0xFF),
})
if err != nil {
return nil, fmt.Errorf("scan references for %q: %w", user, err)
}
defer iter.Close()
var refs []database.SubscriptionReference
for iter.First(); iter.Valid(); iter.Next() {
domain := string(iter.Key()[len(prefix)+1:])
count := binary.LittleEndian.Uint64(iter.Value())
refs = append(refs, database.SubscriptionReference{
Domain: domain,
Count: int(count),
})
}
return refs, nil
}
+256
View File
@@ -0,0 +1,256 @@
package pebble
import (
"testing"
"github.com/d1nch8g/mesh/database"
"github.com/stretchr/testify/require"
)
func TestSubscriptions_Subscribe(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
subs := db.Subscriptions()
t.Run("subscribe new target", func(t *testing.T) {
err := subs.Subscribe("masha@d1.com", "bob@d2.io", []byte("sig123"))
require.NoError(t, err)
})
t.Run("subscribe overwrites existing", func(t *testing.T) {
err := subs.Subscribe("masha@d1.com", "bob@d2.io", []byte("sig456"))
require.NoError(t, err)
})
t.Run("subscribe multiple targets", func(t *testing.T) {
require.NoError(t, subs.Subscribe("masha@d1.com", "alice@d3.net", []byte("sig789")))
require.NoError(t, subs.Subscribe("masha@d1.com", "carol@d4.org", []byte("sig000")))
})
t.Run("subscribe different users", func(t *testing.T) {
require.NoError(t, subs.Subscribe("petya@d1.com", "bob@d2.io", []byte("sig111")))
require.NoError(t, subs.Subscribe("petya@d1.com", "alice@d3.net", []byte("sig222")))
})
}
func TestSubscriptions_Unsubscribe(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
subs := db.Subscriptions()
require.NoError(t, subs.Subscribe("masha@d1.com", "bob@d2.io", []byte("sig123")))
require.NoError(t, subs.Subscribe("masha@d1.com", "alice@d3.net", []byte("sig456")))
t.Run("unsubscribe existing", func(t *testing.T) {
err := subs.Unsubscribe("masha@d1.com", "bob@d2.io")
require.NoError(t, err)
// Verify removed.
list, err := subs.OfUser("masha@d1.com")
require.NoError(t, err)
for _, sub := range list {
require.NotEqual(t, "bob@d2.io", sub.Email)
}
})
t.Run("unsubscribe non-existing returns ErrNotFound", func(t *testing.T) {
err := subs.Unsubscribe("masha@d1.com", "nonexistent@d5.io")
require.ErrorIs(t, err, database.ErrNotFound)
})
t.Run("unsubscribe already removed returns ErrNotFound", func(t *testing.T) {
err := subs.Unsubscribe("masha@d1.com", "bob@d2.io")
require.ErrorIs(t, err, database.ErrNotFound)
})
}
func TestSubscriptions_OfUser(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
subs := db.Subscriptions()
t.Run("empty subscriptions", func(t *testing.T) {
list, err := subs.OfUser("newuser@d1.com")
require.NoError(t, err)
require.Empty(t, list)
})
t.Run("single subscription", func(t *testing.T) {
require.NoError(t, subs.Subscribe("masha@d1.com", "bob@d2.io", []byte("sig123")))
list, err := subs.OfUser("masha@d1.com")
require.NoError(t, err)
require.Len(t, list, 1)
require.Equal(t, "bob@d2.io", list[0].Email)
require.Equal(t, []byte("sig123"), list[0].Signature)
})
t.Run("multiple subscriptions", func(t *testing.T) {
require.NoError(t, subs.Subscribe("masha@d1.com", "alice@d3.net", []byte("sig456")))
list, err := subs.OfUser("masha@d1.com")
require.NoError(t, err)
require.Len(t, list, 2)
})
t.Run("isolation between users", func(t *testing.T) {
require.NoError(t, subs.Subscribe("petya@d1.com", "bob@d2.io", []byte("sig777")))
mashaList, err := subs.OfUser("masha@d1.com")
require.NoError(t, err)
petyaList, err := subs.OfUser("petya@d1.com")
require.NoError(t, err)
require.Len(t, mashaList, 2)
require.Len(t, petyaList, 1)
})
}
func TestSubscriptions_ToUser(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
subs := db.Subscriptions()
t.Run("empty subscribers", func(t *testing.T) {
list, err := subs.ToUser("bob@d2.io")
require.NoError(t, err)
require.Empty(t, list)
})
}
func TestSubscriptions_UpdateReference(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
subs := db.Subscriptions()
t.Run("increment new domain", func(t *testing.T) {
err := subs.UpdateReference("bob@d2.io", "d1.com", 1)
require.NoError(t, err)
})
t.Run("increment existing domain multiple times", func(t *testing.T) {
require.NoError(t, subs.UpdateReference("bob@d2.io", "d1.com", 2))
require.NoError(t, subs.UpdateReference("bob@d2.io", "d1.com", 3))
})
t.Run("decrement domain partial", func(t *testing.T) {
require.NoError(t, subs.UpdateReference("bob@d2.io", "d1.com", -2))
})
t.Run("decrement to zero deletes key", func(t *testing.T) {
require.NoError(t, subs.UpdateReference("bob@d2.io", "d2.io", 1))
require.NoError(t, subs.UpdateReference("bob@d2.io", "d2.io", -1))
refs, err := subs.ListReferences("bob@d2.io")
require.NoError(t, err)
for _, ref := range refs {
require.NotEqual(t, "d2.io", ref.Domain)
}
})
t.Run("decrement below zero clamps to zero", func(t *testing.T) {
require.NoError(t, subs.UpdateReference("bob@d2.io", "d3.net", 1))
require.NoError(t, subs.UpdateReference("bob@d2.io", "d3.net", -10))
// Key should be deleted.
refs, err := subs.ListReferences("bob@d2.io")
require.NoError(t, err)
for _, ref := range refs {
require.NotEqual(t, "d3.net", ref.Domain)
}
})
t.Run("negative delta only", func(t *testing.T) {
err := subs.UpdateReference("bob@d2.io", "d4.org", -3)
require.NoError(t, err)
// Clamps to zero, key deleted.
})
t.Run("update total counter consistency", func(t *testing.T) {
// Create the user first so Get works.
require.NoError(t, db.Users().Create(database.User{Name: "charlie@d5.io"}))
require.NoError(t, subs.UpdateReference("charlie@d5.io", "d1.com", 5))
require.NoError(t, subs.UpdateReference("charlie@d5.io", "d2.io", 3))
require.NoError(t, subs.UpdateReference("charlie@d5.io", "d1.com", -2))
user, err := db.Users().Get("charlie@d5.io")
require.NoError(t, err)
require.Equal(t, 6, user.Subscribers)
})
}
func TestSubscriptions_ListReferences(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
subs := db.Subscriptions()
t.Run("empty references", func(t *testing.T) {
refs, err := subs.ListReferences("unknown@d1.com")
require.NoError(t, err)
require.Empty(t, refs)
})
t.Run("single reference", func(t *testing.T) {
err := subs.UpdateReference("ref-user@x.com", "d1.com", 5)
require.NoError(t, err)
refs, err := subs.ListReferences("ref-user@x.com")
require.NoError(t, err)
require.Len(t, refs, 1)
require.Equal(t, "d1.com", refs[0].Domain)
require.Equal(t, 5, refs[0].Count)
})
t.Run("multiple references sorted", func(t *testing.T) {
require.NoError(t, subs.UpdateReference("alice@d3.net", "d1.com", 3))
require.NoError(t, subs.UpdateReference("alice@d3.net", "d2.io", 7))
refs, err := subs.ListReferences("alice@d3.net")
require.NoError(t, err)
require.Len(t, refs, 2)
})
t.Run("deleted domain not in list", func(t *testing.T) {
require.NoError(t, subs.UpdateReference("carol@d4.org", "temp.io", 1))
require.NoError(t, subs.UpdateReference("carol@d4.org", "temp.io", -1))
refs, err := subs.ListReferences("carol@d4.org")
require.NoError(t, err)
for _, ref := range refs {
require.NotEqual(t, "temp.io", ref.Domain)
}
})
}
func TestSubscriptions_AfterClosePanics(t *testing.T) {
path := t.TempDir() + "/db"
db, err := New(path)
require.NoError(t, err)
subs := db.Subscriptions()
require.NoError(t, subs.Subscribe("masha@d1.com", "bob@d2.io", []byte("sig")))
require.NoError(t, db.Close())
require.Panics(t, func() {
_, _ = subs.OfUser("masha@d1.com")
})
require.Panics(t, func() {
_ = subs.Subscribe("masha@d1.com", "carol@d4.org", []byte("sig"))
})
require.Panics(t, func() {
_ = subs.Unsubscribe("masha@d1.com", "bob@d2.io")
})
require.Panics(t, func() {
_, _ = subs.ListReferences("masha@d1.com")
})
}
+29 -9
View File
@@ -2,7 +2,9 @@ package pebble
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/d1nch8g/mesh/database"
@@ -25,10 +27,14 @@ func (u *users) Create(user database.User) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(user); err != nil {
return err
return fmt.Errorf("encode user %q: %w", user.Name, err)
}
return u.db.Set(k, buf.Bytes(), pebble.Sync)
if err := u.db.Set(k, buf.Bytes(), pebble.Sync); err != nil {
return fmt.Errorf("store user %q: %w", user.Name, err)
}
return nil
}
func (u *users) Get(name string) (database.User, error) {
@@ -38,13 +44,19 @@ func (u *users) Get(name string) (database.User, error) {
if err == pebble.ErrNotFound {
return database.User{}, database.ErrNotFound
}
return database.User{}, err
return database.User{}, fmt.Errorf("read user %q: %w", name, err)
}
defer closer.Close()
var user database.User
if err := gob.NewDecoder(bytes.NewReader(data)).Decode(&user); err != nil {
return database.User{}, err
return database.User{}, fmt.Errorf("decode user %q: %w", name, err)
}
totalKey := key(name, subsPrefix, "total")
if data, closer, err := u.db.Get(totalKey); err == nil {
user.Subscribers = int(binary.LittleEndian.Uint64(data))
closer.Close()
}
return user, nil
@@ -58,7 +70,7 @@ func (u *users) Update(name string, user database.User) error {
if err == pebble.ErrNotFound {
return database.ErrNotFound
}
return err
return fmt.Errorf("check user %q before update: %w", name, err)
}
closer.Close()
@@ -66,10 +78,14 @@ func (u *users) Update(name string, user database.User) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(user); err != nil {
return err
return fmt.Errorf("encode user %q: %w", name, err)
}
return u.db.Set(k, buf.Bytes(), pebble.Sync)
if err := u.db.Set(k, buf.Bytes(), pebble.Sync); err != nil {
return fmt.Errorf("store user %q: %w", name, err)
}
return nil
}
func (u *users) Delete(name string) error {
@@ -80,11 +96,15 @@ func (u *users) Delete(name string) error {
if err == pebble.ErrNotFound {
return database.ErrNotFound
}
return err
return fmt.Errorf("check user %q before delete: %w", name, err)
}
closer.Close()
return u.db.Delete(k, pebble.NoSync)
if err := u.db.Delete(k, pebble.NoSync); err != nil {
return fmt.Errorf("delete user %q: %w", name, err)
}
return nil
}
func (u *users) CheckExists(name string) bool {