Files
m8sh/database/leveldb/subscriptions.go
T
2026-05-11 12:17:48 +07:00

214 lines
4.4 KiB
Go

package leveldb
import (
"bytes"
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/d1nch8g/mesh/database"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
type counter struct {
val uint64
jobRunning bool
mu sync.RWMutex
}
type subscriptions struct {
db *leveldb.DB
counters sync.Map
flushInterval time.Duration
}
func (s *subscriptions) Subscribe(user, targetEmail string, sig []byte) error {
batch := new(leveldb.Batch)
defer batch.Reset()
batch.Put(key(user, prefixSubsOut, targetEmail), sig)
batch.Put(key(targetEmail, prefixSubsIn, user), sig)
if err := s.db.Write(batch, woSync); err != nil {
return fmt.Errorf("commit subscription %q -> %q: %w", user, targetEmail, err)
}
return nil
}
func (s *subscriptions) Unsubscribe(user, targetEmail string) error {
outKey := key(user, prefixSubsOut, targetEmail)
_, err := s.db.Get(outKey, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return database.ErrNotFound
}
return fmt.Errorf("check subscription %q -> %q: %w", user, targetEmail, err)
}
batch := new(leveldb.Batch)
defer batch.Reset()
batch.Delete(outKey)
batch.Delete(key(targetEmail, prefixSubsIn, user))
if err = s.db.Write(batch, woSync); err != nil {
return fmt.Errorf("commit unsubscription %q -> %q: %w", user, targetEmail, err)
}
return nil
}
func (s *subscriptions) OfDomestic(user string, limit, offset int) ([]database.Subscription, error) {
prefix := key(user, prefixSubsOut)
iter := s.db.NewIterator(util.BytesPrefix(prefix), nil)
defer iter.Release()
var subs []database.Subscription
skipped := 0
for iter.Next() {
if skipped < offset {
skipped++
continue
}
keyParts := bytes.Split(iter.Key(), []byte(":"))
targetEmail := string(keyParts[len(keyParts)-1])
sig := make([]byte, len(iter.Value()))
copy(sig, iter.Value())
subs = append(subs, database.Subscription{
User: targetEmail,
Signature: sig,
})
if len(subs) >= limit {
break
}
}
return subs, nil
}
func (s *subscriptions) ToForeign(targetEmail string, limit, offset int) ([]database.Subscription, error) {
prefix := key(targetEmail, prefixSubsIn)
iter := s.db.NewIterator(util.BytesPrefix(prefix), nil)
defer iter.Release()
var subs []database.Subscription
skipped := 0
for iter.Next() {
if skipped < offset {
skipped++
continue
}
keyParts := bytes.Split(iter.Key(), []byte(":"))
subscriberName := string(keyParts[len(keyParts)-1])
sig := make([]byte, len(iter.Value()))
copy(sig, iter.Value())
subs = append(subs, database.Subscription{
User: subscriberName,
Signature: sig,
})
if len(subs) >= limit {
break
}
}
return subs, nil
}
func (s *subscriptions) UpdateReference(user, domain string, delta int) error {
domainKey := key(user, prefixSubsRef, domain)
totalKey := key(user, prefixSubsTotal)
s.addHot(domainKey, delta)
s.addHot(totalKey, delta)
return nil
}
func (s *subscriptions) addHot(k []byte, delta int) {
cnt := &counter{}
entry, ok := s.counters.LoadOrStore(string(k), cnt)
if !ok {
cnt.mu.Lock()
data, err := s.db.Get(k, nil)
if err == nil && len(data) == 8 {
cnt.val = binary.LittleEndian.Uint64(data)
}
cnt.mu.Unlock()
} else {
cnt = entry.(*counter)
}
cnt.mu.Lock()
val := max(int64(cnt.val)+int64(delta), 0)
cnt.val = uint64(val)
if !cnt.jobRunning {
cnt.jobRunning = true
go s.flushCounter(k, cnt)
}
cnt.mu.Unlock()
}
func (s *subscriptions) flushCounter(key []byte, cnt *counter) {
time.Sleep(s.flushInterval)
cnt.mu.Lock()
defer cnt.mu.Unlock()
if cnt.val == 0 {
s.db.Delete(key, woSync)
} else {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], cnt.val)
s.db.Put(key, buf[:], woSync)
cnt.val = 0 // Reset after persisting.
}
cnt.jobRunning = false
}
func (s *subscriptions) ListReferences(user string) ([]database.SubscriptionReference, error) {
prefix := key(user, prefixSubsRef)
iter := s.db.NewIterator(util.BytesPrefix(prefix), nil)
defer iter.Release()
var refs []database.SubscriptionReference
for iter.Next() {
domain := string(iter.Key()[len(prefix)+1:])
diskCount := binary.LittleEndian.Uint64(iter.Value())
if entry, ok := s.counters.Load(string(iter.Key())); ok {
cnt := entry.(*counter)
cnt.mu.RLock()
diskCount += cnt.val
cnt.mu.RUnlock()
}
if diskCount == 0 {
continue
}
refs = append(refs, database.SubscriptionReference{
Domain: domain,
Count: diskCount,
})
}
return refs, nil
}