email leveldb implementation
This commit is contained in:
+12
-12
@@ -19,7 +19,7 @@ type DB interface {
|
||||
Users() Users
|
||||
Subscriptions() Subscriptions
|
||||
Files() Files
|
||||
Emails() Emails
|
||||
Email() Email
|
||||
Messages() Messages
|
||||
Posts() Posts
|
||||
Reactions() Reactions
|
||||
@@ -72,22 +72,22 @@ type Files interface {
|
||||
Size(user string, id uuid.UUID) (int64, error)
|
||||
}
|
||||
|
||||
type Emails interface {
|
||||
Put(user string, letter Letter) (id uuid.UUID, err error)
|
||||
Get(user string, id uuid.UUID) (Letter, error)
|
||||
List(user string, limit, offset int) ([]Letter, error)
|
||||
MarkSeen(user string, id uuid.UUID) error
|
||||
Delete(user string, id uuid.UUID) error
|
||||
type Email interface {
|
||||
Put(user string, letter Letter) error
|
||||
Get(user, correspondent string, date time.Time) (Letter, error)
|
||||
List(user string, limit int, before time.Time) ([]Letter, error)
|
||||
MarkSeen(user, correspondent string, date time.Time) error
|
||||
Delete(user, correspondent string, date time.Time) error
|
||||
}
|
||||
|
||||
type Letter struct {
|
||||
ID uuid.UUID `json:"id" gob:"-"`
|
||||
From string `json:"from" gob:"1"`
|
||||
Subject string `json:"subject" gob:"2"`
|
||||
Date time.Time `json:"date" gob:"-"`
|
||||
To string `json:"to" gob:"2"`
|
||||
Subject string `json:"subject" gob:"3"`
|
||||
Date time.Time `json:"date" gob:"4"`
|
||||
Body []byte `json:"body" gob:"-"`
|
||||
Seen bool `json:"seen" gob:"3"`
|
||||
Attachments []Attachment `json:"attachments" gob:"4"`
|
||||
Seen bool `json:"seen" gob:"5"`
|
||||
Attachments []Attachment `json:"attachments" gob:"6"`
|
||||
}
|
||||
|
||||
type Attachment struct {
|
||||
|
||||
+161
-1
@@ -1,9 +1,169 @@
|
||||
package leveldb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/d1nch8g/mesh/database"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
type emails struct {
|
||||
type email struct {
|
||||
db *leveldb.DB
|
||||
}
|
||||
|
||||
func (e *email) Put(user string, letter database.Letter) error {
|
||||
ts := formatTime(letter.Date)
|
||||
|
||||
var correspondent string
|
||||
if letter.To == user {
|
||||
correspondent = letter.From
|
||||
} else {
|
||||
correspondent = letter.To
|
||||
}
|
||||
|
||||
metaKey := key(user, prefixEmailMeta, ts, correspondent)
|
||||
bodyKey := key(user, prefixEmailBody, ts, correspondent)
|
||||
|
||||
var metaBuf bytes.Buffer
|
||||
if err := gob.NewEncoder(&metaBuf).Encode(letter); err != nil {
|
||||
return fmt.Errorf("encode letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
defer batch.Reset()
|
||||
|
||||
batch.Put(metaKey, metaBuf.Bytes())
|
||||
batch.Put(bodyKey, letter.Body)
|
||||
|
||||
if err := e.db.Write(batch, woSync); err != nil {
|
||||
return fmt.Errorf("store letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *email) Get(user, correspondent string, date time.Time) (database.Letter, error) {
|
||||
ts := formatTime(date)
|
||||
|
||||
metaKey := key(user, prefixEmailMeta, ts, correspondent)
|
||||
bodyKey := key(user, prefixEmailBody, ts, correspondent)
|
||||
|
||||
data, err := e.db.Get(metaKey, nil)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return database.Letter{}, database.ErrNotFound
|
||||
}
|
||||
return database.Letter{}, fmt.Errorf("read letter meta for %q: %w", user, err)
|
||||
}
|
||||
|
||||
var letter database.Letter
|
||||
if err = gob.NewDecoder(bytes.NewReader(data)).Decode(&letter); err != nil {
|
||||
return database.Letter{}, fmt.Errorf("decode letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
body, err := e.db.Get(bodyKey, nil)
|
||||
if err == nil {
|
||||
letter.Body = make([]byte, len(body))
|
||||
copy(letter.Body, body)
|
||||
}
|
||||
|
||||
return letter, nil
|
||||
}
|
||||
|
||||
func (e *email) List(user string, limit int, before time.Time) ([]database.Letter, error) {
|
||||
prefix := key(user, prefixEmailMeta)
|
||||
|
||||
iter := e.db.NewIterator(util.BytesPrefix(prefix), nil)
|
||||
defer iter.Release()
|
||||
|
||||
var letters []database.Letter
|
||||
for iter.Next() {
|
||||
var letter database.Letter
|
||||
if err := gob.NewDecoder(bytes.NewReader(iter.Value())).Decode(&letter); err != nil {
|
||||
return nil, fmt.Errorf("decode letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
if !before.IsZero() && !letter.Date.Before(before) {
|
||||
continue
|
||||
}
|
||||
|
||||
letters = append(letters, letter)
|
||||
if len(letters) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return letters, nil
|
||||
}
|
||||
|
||||
func (e *email) MarkSeen(user, correspondent string, date time.Time) error {
|
||||
ts := formatTime(date)
|
||||
metaKey := key(user, prefixEmailMeta, ts, correspondent)
|
||||
|
||||
data, err := e.db.Get(metaKey, nil)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return database.ErrNotFound
|
||||
}
|
||||
return fmt.Errorf("read letter meta for %q: %w", user, err)
|
||||
}
|
||||
|
||||
var letter database.Letter
|
||||
if err = gob.NewDecoder(bytes.NewReader(data)).Decode(&letter); err != nil {
|
||||
return fmt.Errorf("decode letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
if letter.Seen {
|
||||
return nil
|
||||
}
|
||||
letter.Seen = true
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err = gob.NewEncoder(&buf).Encode(letter); err != nil {
|
||||
return fmt.Errorf("encode letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
if err = e.db.Put(metaKey, buf.Bytes(), woSync); err != nil {
|
||||
return fmt.Errorf("mark seen for %q: %w", user, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *email) Delete(user, correspondent string, date time.Time) error {
|
||||
ts := formatTime(date)
|
||||
metaKey := key(user, prefixEmailMeta, ts, correspondent)
|
||||
|
||||
_, err := e.db.Get(metaKey, nil)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return database.ErrNotFound
|
||||
}
|
||||
return fmt.Errorf("check letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
bodyKey := key(user, prefixEmailBody, ts, correspondent)
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
defer batch.Reset()
|
||||
|
||||
batch.Delete(metaKey)
|
||||
batch.Delete(bodyKey)
|
||||
|
||||
if err = e.db.Write(batch, woSync); err != nil {
|
||||
return fmt.Errorf("delete letter for %q: %w", user, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func formatTime(t time.Time) []byte {
|
||||
var buf [8]byte
|
||||
binary.BigEndian.PutUint64(buf[:], uint64(t.UnixNano()))
|
||||
return buf[:]
|
||||
}
|
||||
|
||||
@@ -0,0 +1,262 @@
|
||||
package leveldb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/d1nch8g/mesh/database"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEmail_Put(t *testing.T) {
|
||||
db, cleanup := openDB(t)
|
||||
defer cleanup()
|
||||
|
||||
email := db.Email()
|
||||
user := "masha@d1.com"
|
||||
|
||||
letter := database.Letter{
|
||||
From: "alice@d1.com",
|
||||
To: user,
|
||||
Subject: "Hello",
|
||||
Body: []byte("hello world"),
|
||||
Date: time.Now(),
|
||||
}
|
||||
|
||||
t.Run("put incoming letter", func(t *testing.T) {
|
||||
err := email.Put(user, letter)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("put outgoing letter", func(t *testing.T) {
|
||||
outgoing := database.Letter{
|
||||
From: user,
|
||||
To: "bob@d2.io",
|
||||
Subject: "Reply",
|
||||
Body: []byte("thanks"),
|
||||
Date: time.Now(),
|
||||
}
|
||||
err := email.Put(user, outgoing)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("put letter with empty body", func(t *testing.T) {
|
||||
empty := database.Letter{
|
||||
From: "alice@d1.com",
|
||||
To: user,
|
||||
Date: time.Now(),
|
||||
}
|
||||
err := email.Put(user, empty)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("put letter with attachments", func(t *testing.T) {
|
||||
withAtt := database.Letter{
|
||||
From: "alice@d1.com",
|
||||
To: user,
|
||||
Subject: "Files",
|
||||
Body: []byte("check these"),
|
||||
Date: time.Now(),
|
||||
Attachments: []database.Attachment{
|
||||
{Filename: "doc.pdf", Size: 1024, MimeType: "application/pdf"},
|
||||
},
|
||||
}
|
||||
err := email.Put(user, withAtt)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEmail_Get(t *testing.T) {
|
||||
db, cleanup := openDB(t)
|
||||
defer cleanup()
|
||||
|
||||
email := db.Email()
|
||||
user := "masha@d1.com"
|
||||
now := time.Now()
|
||||
|
||||
letter := database.Letter{
|
||||
From: "alice@d1.com",
|
||||
To: user,
|
||||
Subject: "Test",
|
||||
Body: []byte("test body"),
|
||||
Date: now,
|
||||
}
|
||||
err := email.Put(user, letter)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("get existing letter", func(t *testing.T) {
|
||||
got, err := email.Get(user, "alice@d1.com", now)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, letter.From, got.From)
|
||||
require.Equal(t, letter.To, got.To)
|
||||
require.Equal(t, letter.Subject, got.Subject)
|
||||
require.Equal(t, letter.Body, got.Body)
|
||||
})
|
||||
|
||||
t.Run("get non-existing letter", func(t *testing.T) {
|
||||
_, err := email.Get(user, "nonexistent@d5.io", now)
|
||||
require.ErrorIs(t, err, database.ErrNotFound)
|
||||
})
|
||||
|
||||
t.Run("get letter empty body", func(t *testing.T) {
|
||||
emptyTime := time.Now()
|
||||
err := email.Put(user, database.Letter{
|
||||
From: "bob@d2.io",
|
||||
To: user,
|
||||
Date: emptyTime,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := email.Get(user, "bob@d2.io", emptyTime)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, got.Body)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEmail_List(t *testing.T) {
|
||||
db, cleanup := openDB(t)
|
||||
defer cleanup()
|
||||
|
||||
email := db.Email()
|
||||
user := "masha@d1.com"
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
err := email.Put(user, database.Letter{
|
||||
From: "sender@d.com",
|
||||
To: user,
|
||||
Subject: "Subject",
|
||||
Body: []byte("body"),
|
||||
Date: time.Now().Add(-time.Duration(i) * time.Second),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Run("list all", func(t *testing.T) {
|
||||
list, err := email.List(user, 10, time.Time{})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, list, 5)
|
||||
})
|
||||
|
||||
t.Run("list with limit", func(t *testing.T) {
|
||||
list, err := email.List(user, 2, time.Time{})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, list, 2)
|
||||
})
|
||||
|
||||
t.Run("list before time", func(t *testing.T) {
|
||||
pivot := time.Now().Add(-2 * time.Second)
|
||||
list, err := email.List(user, 10, pivot)
|
||||
require.NoError(t, err)
|
||||
for _, l := range list {
|
||||
require.True(t, l.Date.Before(pivot))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("list empty user", func(t *testing.T) {
|
||||
list, err := email.List("unknown@d1.com", 10, time.Time{})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, list)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEmail_MarkSeen(t *testing.T) {
|
||||
db, cleanup := openDB(t)
|
||||
defer cleanup()
|
||||
|
||||
email := db.Email()
|
||||
user := "masha@d1.com"
|
||||
now := time.Now()
|
||||
|
||||
err := email.Put(user, database.Letter{
|
||||
From: "sender@d.com",
|
||||
To: user,
|
||||
Subject: "Unread",
|
||||
Body: []byte("body"),
|
||||
Date: now,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("mark seen", func(t *testing.T) {
|
||||
err := email.MarkSeen(user, "sender@d.com", now)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := email.Get(user, "sender@d.com", now)
|
||||
require.NoError(t, err)
|
||||
require.True(t, got.Seen)
|
||||
})
|
||||
|
||||
t.Run("mark seen twice is idempotent", func(t *testing.T) {
|
||||
err := email.MarkSeen(user, "sender@d.com", now)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("mark seen non-existing returns ErrNotFound", func(t *testing.T) {
|
||||
err := email.MarkSeen(user, "nonexistent@d5.io", now)
|
||||
require.ErrorIs(t, err, database.ErrNotFound)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEmail_Delete(t *testing.T) {
|
||||
db, cleanup := openDB(t)
|
||||
defer cleanup()
|
||||
|
||||
email := db.Email()
|
||||
user := "masha@d1.com"
|
||||
now := time.Now()
|
||||
|
||||
err := email.Put(user, database.Letter{
|
||||
From: "sender@d.com",
|
||||
To: user,
|
||||
Subject: "Delete me",
|
||||
Body: []byte("body"),
|
||||
Date: now,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("delete existing", func(t *testing.T) {
|
||||
err := email.Delete(user, "sender@d.com", now)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = email.Get(user, "sender@d.com", now)
|
||||
require.ErrorIs(t, err, database.ErrNotFound)
|
||||
})
|
||||
|
||||
t.Run("delete non-existing returns ErrNotFound", func(t *testing.T) {
|
||||
err := email.Delete(user, "nonexistent@d5.io", now)
|
||||
require.ErrorIs(t, err, database.ErrNotFound)
|
||||
})
|
||||
|
||||
t.Run("delete twice returns ErrNotFound", func(t *testing.T) {
|
||||
err := email.Delete(user, "sender@d.com", now)
|
||||
require.ErrorIs(t, err, database.ErrNotFound)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEmail_AfterCloseReturnsError(t *testing.T) {
|
||||
path := t.TempDir() + "/db"
|
||||
db, err := New(path)
|
||||
require.NoError(t, err)
|
||||
|
||||
email := db.Email()
|
||||
now := time.Now()
|
||||
err = email.Put("masha@d1.com", database.Letter{
|
||||
From: "alice@d1.com",
|
||||
To: "masha@d1.com",
|
||||
Date: now,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NotPanics(t, func() {
|
||||
email.Get("masha@d1.com", "alice@d1.com", now)
|
||||
email.List("masha@d1.com", 10, time.Time{})
|
||||
email.MarkSeen("masha@d1.com", "alice@d1.com", now)
|
||||
email.Delete("masha@d1.com", "alice@d1.com", now)
|
||||
email.Put("masha@d1.com", database.Letter{
|
||||
From: "bob@d2.io",
|
||||
To: "masha@d1.com",
|
||||
Date: time.Now(),
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -18,6 +18,8 @@ const (
|
||||
prefixSubsOut = "subsout"
|
||||
prefixSubsIn = "subsin"
|
||||
prefixUsers = "users"
|
||||
prefixEmailMeta = "emailmeta"
|
||||
prefixEmailBody = "emailbody"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -30,7 +32,7 @@ type DB struct {
|
||||
users *users
|
||||
subscriptions *subscriptions
|
||||
files *files
|
||||
// emails *emails
|
||||
email *email
|
||||
// messages *messages
|
||||
// posts *posts
|
||||
// reactions *reactions
|
||||
@@ -49,7 +51,7 @@ func New(path string) (*DB, error) {
|
||||
users: &users{db: db},
|
||||
subscriptions: &subscriptions{db: db, flushInterval: time.Second},
|
||||
files: &files{db: db},
|
||||
// emails: &emails{db: ldb},
|
||||
email: &email{db: db},
|
||||
// messages: &messages{db: ldb},
|
||||
// posts: &posts{db: ldb},
|
||||
// reactions: &reactions{db: ldb},
|
||||
@@ -93,8 +95,8 @@ func key(user, domain string, args ...any) []byte {
|
||||
func (d *DB) Users() database.Users { return d.users }
|
||||
func (d *DB) Subscriptions() database.Subscriptions { return d.subscriptions }
|
||||
func (d *DB) Files() database.Files { return d.files }
|
||||
func (d *DB) Email() database.Email { return d.email }
|
||||
|
||||
// func (d *DB) Emails() database.Emails { return d.emails }
|
||||
// func (d *DB) Messages() database.Messages { return d.messages }
|
||||
// func (d *DB) Posts() database.Posts { return d.posts }
|
||||
// func (d *DB) Reactions() database.Reactions { return d.reactions }
|
||||
|
||||
@@ -32,9 +32,9 @@ func TestGetters(t *testing.T) {
|
||||
defer db.Close()
|
||||
|
||||
require.NotNil(t, db.Users())
|
||||
// require.NotNil(t, db.Subscriptions())
|
||||
require.NotNil(t, db.Subscriptions())
|
||||
require.NotNil(t, db.Files())
|
||||
// require.NotNil(t, db.Emails())
|
||||
require.NotNil(t, db.Email())
|
||||
// require.NotNil(t, db.Messages())
|
||||
// require.NotNil(t, db.Posts())
|
||||
// require.NotNil(t, db.Reactions())
|
||||
|
||||
Reference in New Issue
Block a user