261 lines
6.0 KiB
Go
261 lines
6.0 KiB
Go
package leveldb
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/gob"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
"m8sh.su/x/m8sh/database"
|
|
)
|
|
|
|
type reactions struct {
|
|
db *leveldb.DB
|
|
counters sync.Map
|
|
flushInterval time.Duration
|
|
}
|
|
|
|
func (r *reactions) SetAllowed(owner string, date time.Time, emojis []string) error {
|
|
k := key(owner, prefixReactionsAllowed, formatTime(date))
|
|
|
|
var buf bytes.Buffer
|
|
if err := gob.NewEncoder(&buf).Encode(emojis); err != nil {
|
|
return fmt.Errorf("encode allowed reactions for %q: %w", owner, err)
|
|
}
|
|
|
|
if err := r.db.Put(k, buf.Bytes(), woSync); err != nil {
|
|
return fmt.Errorf("store allowed reactions for %q: %w", owner, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *reactions) GetAllowed(owner string, date time.Time) ([]string, error) {
|
|
k := key(owner, prefixReactionsAllowed, formatTime(date))
|
|
|
|
data, err := r.db.Get(k, nil)
|
|
if err != nil {
|
|
if err == leveldb.ErrNotFound {
|
|
return nil, database.ErrNotFound
|
|
}
|
|
return nil, fmt.Errorf("read allowed reactions for %q: %w", owner, err)
|
|
}
|
|
|
|
var emojis []string
|
|
if err = gob.NewDecoder(bytes.NewReader(data)).Decode(&emojis); err != nil {
|
|
return nil, fmt.Errorf("decode allowed reactions for %q: %w", owner, err)
|
|
}
|
|
|
|
return emojis, nil
|
|
}
|
|
|
|
func (r *reactions) Add(user, author, domain string, date time.Time, emoji string, sig []byte) error {
|
|
ts := formatTime(date)
|
|
|
|
domesticKey := key(user, prefixReactionsOut, author, domain, ts, emoji)
|
|
foreignKey := key(domain, prefixReactionsIn, author, ts, user, emoji)
|
|
|
|
batch := new(leveldb.Batch)
|
|
defer batch.Reset()
|
|
|
|
batch.Put(domesticKey, sig)
|
|
batch.Put(foreignKey, sig)
|
|
|
|
if err := r.db.Write(batch, woSync); err != nil {
|
|
return fmt.Errorf("store reaction for %q: %w", user, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *reactions) Remove(user, author, domain string, date time.Time, emoji string) error {
|
|
ts := formatTime(date)
|
|
|
|
domesticKey := key(user, prefixReactionsOut, author, domain, ts, emoji)
|
|
foreignKey := key(domain, prefixReactionsIn, author, ts, user, emoji)
|
|
|
|
_, err := r.db.Get(domesticKey, nil)
|
|
if err != nil {
|
|
if err == leveldb.ErrNotFound {
|
|
return database.ErrNotFound
|
|
}
|
|
return fmt.Errorf("check reaction for %q: %w", user, err)
|
|
}
|
|
|
|
batch := new(leveldb.Batch)
|
|
defer batch.Reset()
|
|
|
|
batch.Delete(domesticKey)
|
|
batch.Delete(foreignKey)
|
|
|
|
if err := r.db.Write(batch, woSync); err != nil {
|
|
return fmt.Errorf("delete reaction for %q: %w", user, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *reactions) OfUser(user string, limit, offset int) ([]database.ReactionToForeign, error) {
|
|
prefix := key(user, prefixReactionsOut)
|
|
|
|
iter := r.db.NewIterator(util.BytesPrefix(prefix), nil)
|
|
defer iter.Release()
|
|
|
|
var reactions []database.ReactionToForeign
|
|
skipped := 0
|
|
for iter.Next() {
|
|
if skipped < offset {
|
|
skipped++
|
|
continue
|
|
}
|
|
|
|
parts := bytes.Split(iter.Key(), []byte(":"))
|
|
author := string(parts[len(parts)-4])
|
|
domain := string(parts[len(parts)-3])
|
|
dateBytes := parts[len(parts)-2]
|
|
emoji := string(parts[len(parts)-1])
|
|
|
|
ts := binary.BigEndian.Uint64(dateBytes)
|
|
date := time.Unix(0, int64(ts))
|
|
|
|
reactions = append(reactions, database.ReactionToForeign{
|
|
Domain: domain,
|
|
Author: author,
|
|
Date: date,
|
|
Emoji: emoji,
|
|
})
|
|
|
|
if len(reactions) >= limit {
|
|
break
|
|
}
|
|
}
|
|
|
|
return reactions, nil
|
|
}
|
|
|
|
func (r *reactions) ToContent(author, domain string, date time.Time, limit, offset int) ([]database.ReactionOfDomestic, error) {
|
|
prefix := key(domain, prefixReactionsIn, author, formatTime(date))
|
|
|
|
iter := r.db.NewIterator(util.BytesPrefix(prefix), nil)
|
|
defer iter.Release()
|
|
|
|
var reactions []database.ReactionOfDomestic
|
|
skipped := 0
|
|
for iter.Next() {
|
|
if skipped < offset {
|
|
skipped++
|
|
continue
|
|
}
|
|
|
|
parts := bytes.Split(iter.Key(), []byte(":"))
|
|
user := string(parts[len(parts)-2])
|
|
emoji := string(parts[len(parts)-1])
|
|
|
|
sig := make([]byte, len(iter.Value()))
|
|
copy(sig, iter.Value())
|
|
|
|
reactions = append(reactions, database.ReactionOfDomestic{
|
|
User: user,
|
|
Emoji: emoji,
|
|
Signature: sig,
|
|
})
|
|
|
|
if len(reactions) >= limit {
|
|
break
|
|
}
|
|
}
|
|
|
|
return reactions, nil
|
|
}
|
|
|
|
func (r *reactions) UpdateReference(owner string, date time.Time, domain, emoji string, delta int) error {
|
|
ts := formatTime(date)
|
|
domainKey := key(owner, prefixReactionsRef, ts, "perdomain", domain, emoji)
|
|
totalKey := key(owner, prefixReactionsRef, ts, "total", emoji)
|
|
|
|
r.addHot(domainKey, delta)
|
|
r.addHot(totalKey, delta)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *reactions) addHot(k []byte, delta int) {
|
|
cnt := &counter{}
|
|
|
|
entry, ok := r.counters.LoadOrStore(string(k), cnt)
|
|
if !ok {
|
|
cnt.mu.Lock()
|
|
data, err := r.db.Get(k, nil)
|
|
if err == nil && len(data) == 8 {
|
|
cnt.val = binary.LittleEndian.Uint64(data)
|
|
}
|
|
cnt.mu.Unlock()
|
|
} else {
|
|
cnt = entry.(*counter)
|
|
}
|
|
|
|
cnt.mu.Lock()
|
|
val := max(int64(cnt.val)+int64(delta), 0)
|
|
cnt.val = uint64(val)
|
|
|
|
if !cnt.jobRunning {
|
|
cnt.jobRunning = true
|
|
go r.flushCounter(k, cnt)
|
|
}
|
|
cnt.mu.Unlock()
|
|
}
|
|
|
|
func (r *reactions) flushCounter(k []byte, ctrl *counter) {
|
|
time.Sleep(r.flushInterval)
|
|
|
|
ctrl.mu.Lock()
|
|
defer ctrl.mu.Unlock()
|
|
|
|
if ctrl.val == 0 {
|
|
r.db.Delete(k, woSync)
|
|
} else {
|
|
var buf [8]byte
|
|
binary.LittleEndian.PutUint64(buf[:], ctrl.val)
|
|
r.db.Put(k, buf[:], woSync)
|
|
ctrl.val = 0
|
|
}
|
|
|
|
ctrl.jobRunning = false
|
|
}
|
|
|
|
func (r *reactions) ListReactions(owner string, date time.Time) ([]database.ReactionCounter, error) {
|
|
prefix := key(owner, prefixReactionsRef, formatTime(date), "total")
|
|
|
|
iter := r.db.NewIterator(util.BytesPrefix(prefix), nil)
|
|
defer iter.Release()
|
|
|
|
var result []database.ReactionCounter
|
|
for iter.Next() {
|
|
parts := bytes.Split(iter.Key(), []byte(":"))
|
|
emoji := string(parts[len(parts)-1])
|
|
|
|
diskVal := binary.LittleEndian.Uint64(iter.Value())
|
|
count := diskVal
|
|
|
|
if entry, ok := r.counters.Load(string(iter.Key())); ok {
|
|
cnt := entry.(*counter)
|
|
cnt.mu.RLock()
|
|
count += cnt.val
|
|
cnt.mu.RUnlock()
|
|
}
|
|
|
|
if count > 0 {
|
|
result = append(result, database.ReactionCounter{
|
|
Emoji: emoji,
|
|
Count: count,
|
|
})
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|