implemented chats storage

This commit is contained in:
d1nch8g
2026-05-11 02:35:33 +07:00
parent f150764f80
commit 6b6b82b5c0
6 changed files with 557 additions and 35 deletions
+22 -13
View File
@@ -20,7 +20,7 @@ type DB interface {
Subscriptions() Subscriptions
Files() Files
Email() Email
Messages() Messages
Chat() Chat
Posts() Posts
Reactions() Reactions
Comments() Comments
@@ -98,21 +98,30 @@ type Attachment struct {
Body uuid.UUID `json:"body" gob:"5"`
}
type Messages interface {
Put(msg Message) (id uuid.UUID, err error)
Get(id uuid.UUID) (Message, error)
List(user string, limit, offset int) ([]Message, error)
Delete(id uuid.UUID) error
type Chat interface {
Put(user string, msg Message) error
Dialogues(user string, limit int, after time.Time) ([]Dialogue, error)
Messages(user, contact string, limit int, after time.Time) ([]Message, error)
MarkSeen(user, contact string, date time.Time) error
Delete(user, contact string, date time.Time) error
}
type Dialogue struct {
Contact string `json:"contact"`
LastDate time.Time `json:"lastDate"`
LastContent []byte `json:"lastContent"`
UnreadCount int `json:"unreadCount"`
}
type Message struct {
ID uuid.UUID `json:"id" gob:"-"`
From string `json:"from" gob:"1"`
To string `json:"to" gob:"2"`
Group []string `json:"group" gob:"3"`
Date time.Time `json:"date" gob:"-"`
Payload []byte `json:"payload" gob:"4"`
Signature []byte `json:"signature" gob:"5"`
From string `json:"from" gob:"1"`
To string `json:"to" gob:"2"`
Group []string `json:"group" gob:"3"`
Date time.Time `json:"date" gob:"-"`
Content []byte `json:"content" gob:"5"`
Attachments []Attachment `json:"attachments" gob:"-"`
Seen bool `json:"seen" gob:"4"`
Signature []byte `json:"signature" gob:"6"`
}
type Posts interface {
+184
View File
@@ -0,0 +1,184 @@
package leveldb
import (
"bytes"
"encoding/gob"
"fmt"
"time"
"github.com/d1nch8g/mesh/database"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
type chat struct {
db *leveldb.DB
}
func (c *chat) Put(user string, msg database.Message) error {
ts := formatTime(msg.Date)
contact := msg.To
if msg.To == user {
contact = msg.From
}
metaKey := key(user, prefixChatMeta, contact, ts)
attachKey := key(user, prefixChatAttach, contact, ts)
dialogKey := key(user, prefixChatDialog, contact)
var metaBuf bytes.Buffer
if err := gob.NewEncoder(&metaBuf).Encode(msg); err != nil {
return fmt.Errorf("encode message for %q: %w", user, err)
}
batch := new(leveldb.Batch)
defer batch.Reset()
batch.Put(metaKey, metaBuf.Bytes())
if len(msg.Attachments) > 0 {
var attachBuf bytes.Buffer
if err := gob.NewEncoder(&attachBuf).Encode(msg.Attachments); err != nil {
return fmt.Errorf("encode attachments for %q: %w", user, err)
}
batch.Put(attachKey, attachBuf.Bytes())
}
dialogue := database.Dialogue{
Contact: contact,
LastDate: msg.Date,
LastContent: msg.Content,
UnreadCount: 1,
}
existing, err := c.db.Get(dialogKey, nil)
if err == nil {
var prev database.Dialogue
if gob.NewDecoder(bytes.NewReader(existing)).Decode(&prev) == nil {
dialogue.UnreadCount = prev.UnreadCount + 1
}
}
var dialogBuf bytes.Buffer
if err := gob.NewEncoder(&dialogBuf).Encode(dialogue); err != nil {
return fmt.Errorf("encode dialogue for %q: %w", user, err)
}
batch.Put(dialogKey, dialogBuf.Bytes())
if err := c.db.Write(batch, woSync); err != nil {
return fmt.Errorf("store message for %q: %w", user, err)
}
return nil
}
func (c *chat) Dialogues(user string, limit int, after time.Time) ([]database.Dialogue, error) {
prefix := key(user, prefixChatDialog)
iter := c.db.NewIterator(util.BytesPrefix(prefix), nil)
defer iter.Release()
var dialogues []database.Dialogue
for iter.Next() {
var dialogue database.Dialogue
if err := gob.NewDecoder(bytes.NewReader(iter.Value())).Decode(&dialogue); err != nil {
return nil, fmt.Errorf("decode dialogue for %q: %w", user, err)
}
if !after.IsZero() && !dialogue.LastDate.After(after) {
continue
}
dialogues = append(dialogues, dialogue)
if len(dialogues) >= limit {
break
}
}
return dialogues, nil
}
func (c *chat) Messages(user, contact string, limit int, after time.Time) ([]database.Message, error) {
prefix := key(user, prefixChatMeta, contact)
iter := c.db.NewIterator(util.BytesPrefix(prefix), nil)
defer iter.Release()
var messages []database.Message
for iter.Next() {
var msg database.Message
if err := gob.NewDecoder(bytes.NewReader(iter.Value())).Decode(&msg); err != nil {
return nil, fmt.Errorf("decode message for %q: %w", user, err)
}
if !after.IsZero() && !msg.Date.After(after) {
continue
}
messages = append(messages, msg)
if len(messages) >= limit {
break
}
}
return messages, nil
}
func (c *chat) MarkSeen(user, contact string, date time.Time) error {
ts := formatTime(date)
metaKey := key(user, prefixChatMeta, contact, ts)
data, err := c.db.Get(metaKey, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return database.ErrNotFound
}
return fmt.Errorf("read message for %q: %w", user, err)
}
var msg database.Message
if err = gob.NewDecoder(bytes.NewReader(data)).Decode(&msg); err != nil {
return fmt.Errorf("decode message for %q: %w", user, err)
}
if msg.Seen {
return nil
}
msg.Seen = true
var buf bytes.Buffer
if err = gob.NewEncoder(&buf).Encode(msg); err != nil {
return fmt.Errorf("encode message for %q: %w", user, err)
}
if err = c.db.Put(metaKey, buf.Bytes(), woSync); err != nil {
return fmt.Errorf("mark seen for %q: %w", user, err)
}
return nil
}
func (c *chat) Delete(user, contact string, date time.Time) error {
ts := formatTime(date)
metaKey := key(user, prefixChatMeta, contact, ts)
attachKey := key(user, prefixChatAttach, contact, ts)
_, err := c.db.Get(metaKey, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return database.ErrNotFound
}
return fmt.Errorf("check message for %q: %w", user, err)
}
batch := new(leveldb.Batch)
defer batch.Reset()
batch.Delete(metaKey)
batch.Delete(attachKey)
if err = c.db.Write(batch, woSync); err != nil {
return fmt.Errorf("delete message for %q: %w", user, err)
}
return nil
}
+335
View File
@@ -0,0 +1,335 @@
package leveldb
import (
"testing"
"time"
"github.com/d1nch8g/mesh/database"
"github.com/stretchr/testify/require"
)
func TestChat_Put(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
chat := db.Chat()
user := "masha@d1.com"
t.Run("put incoming message", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: "bob@d2.io",
To: user,
Content: []byte("hello"),
Date: time.Now(),
})
require.NoError(t, err)
})
t.Run("put outgoing message", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: user,
To: "alice@d3.net",
Content: []byte("hey"),
Date: time.Now(),
})
require.NoError(t, err)
})
t.Run("put message with attachments", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: "bob@d2.io",
To: user,
Content: []byte("check this"),
Date: time.Now(),
Attachments: []database.Attachment{
{Filename: "img.png", Size: 512, MimeType: "image/png"},
},
})
require.NoError(t, err)
})
t.Run("put empty content", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: "bob@d2.io",
To: user,
Date: time.Now(),
})
require.NoError(t, err)
})
t.Run("put message with signature", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: "bob@d2.io",
To: user,
Content: []byte("signed"),
Date: time.Now(),
Signature: []byte("sig123"),
})
require.NoError(t, err)
})
t.Run("put group message", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: "bob@d2.io",
To: user,
Group: []string{user, "alice@d3.net", "carol@d4.org"},
Content: []byte("group chat"),
Date: time.Now(),
})
require.NoError(t, err)
})
}
func TestChat_Dialogues(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
chat := db.Chat()
user := "masha@d1.com"
t.Run("empty dialogues", func(t *testing.T) {
list, err := chat.Dialogues(user, 10, time.Time{})
require.NoError(t, err)
require.Empty(t, list)
})
t.Run("single dialogue", func(t *testing.T) {
now := time.Now()
err := chat.Put(user, database.Message{
From: "bob@d2.io",
To: user,
Content: []byte("hello"),
Date: now,
})
require.NoError(t, err)
list, err := chat.Dialogues(user, 10, time.Time{})
require.NoError(t, err)
require.Len(t, list, 1)
require.Equal(t, "bob@d2.io", list[0].Contact)
require.Equal(t, []byte("hello"), list[0].LastContent)
require.Equal(t, 1, list[0].UnreadCount)
})
t.Run("multiple messages increment unread", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: "bob@d2.io",
To: user,
Content: []byte("second"),
Date: time.Now(),
})
require.NoError(t, err)
list, err := chat.Dialogues(user, 10, time.Time{})
require.NoError(t, err)
require.Len(t, list, 1)
require.Equal(t, 2, list[0].UnreadCount)
})
t.Run("filter after time", func(t *testing.T) {
past := time.Now().Add(-time.Hour)
err := chat.Put(user, database.Message{
From: "carol@d4.org",
To: user,
Content: []byte("old"),
Date: past,
})
require.NoError(t, err)
list, err := chat.Dialogues(user, 10, time.Now().Add(-time.Minute))
require.NoError(t, err)
for _, d := range list {
require.NotEqual(t, "carol@d4.org", d.Contact)
}
})
t.Run("limit respected", func(t *testing.T) {
list, err := chat.Dialogues(user, 1, time.Time{})
require.NoError(t, err)
require.Len(t, list, 1)
})
t.Run("user isolation", func(t *testing.T) {
list, err := chat.Dialogues("other@d5.io", 10, time.Time{})
require.NoError(t, err)
require.Empty(t, list)
})
}
func TestChat_Messages(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
chat := db.Chat()
user := "masha@d1.com"
contact := "bob@d2.io"
t.Run("empty messages", func(t *testing.T) {
list, err := chat.Messages(user, contact, 10, time.Time{})
require.NoError(t, err)
require.Empty(t, list)
})
t.Run("messages for contact", func(t *testing.T) {
err := chat.Put(user, database.Message{
From: contact,
To: user,
Content: []byte("msg1"),
Date: time.Now(),
})
require.NoError(t, err)
list, err := chat.Messages(user, contact, 10, time.Time{})
require.NoError(t, err)
require.Len(t, list, 1)
require.Equal(t, []byte("msg1"), list[0].Content)
})
t.Run("filter after time", func(t *testing.T) {
contact := "filter-test@x.com"
now := time.Now()
err := chat.Put(user, database.Message{
From: contact,
To: user,
Content: []byte("recent"),
Date: now,
})
require.NoError(t, err)
past := now.Add(-time.Hour)
err = chat.Put(user, database.Message{
From: contact,
To: user,
Content: []byte("old"),
Date: past,
})
require.NoError(t, err)
list, err := chat.Messages(user, contact, 10, now.Add(-time.Minute))
require.NoError(t, err)
require.Len(t, list, 1)
require.Equal(t, []byte("recent"), list[0].Content)
})
t.Run("limit respected", func(t *testing.T) {
for i := 0; i < 5; i++ {
err := chat.Put(user, database.Message{
From: contact,
To: user,
Content: []byte("msg"),
Date: time.Now().Add(time.Duration(i) * time.Second),
})
require.NoError(t, err)
}
list, err := chat.Messages(user, contact, 2, time.Time{})
require.NoError(t, err)
require.Len(t, list, 2)
})
t.Run("contact isolation", func(t *testing.T) {
list, err := chat.Messages(user, "unknown@d5.io", 10, time.Time{})
require.NoError(t, err)
require.Empty(t, list)
})
}
func TestChat_MarkSeen(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
chat := db.Chat()
user := "masha@d1.com"
contact := "bob@d2.io"
now := time.Now()
err := chat.Put(user, database.Message{
From: contact,
To: user,
Content: []byte("unread"),
Date: now,
})
require.NoError(t, err)
t.Run("mark seen", func(t *testing.T) {
err := chat.MarkSeen(user, contact, now)
require.NoError(t, err)
list, err := chat.Messages(user, contact, 1, time.Time{})
require.NoError(t, err)
require.True(t, list[0].Seen)
})
t.Run("mark seen twice is idempotent", func(t *testing.T) {
err := chat.MarkSeen(user, contact, now)
require.NoError(t, err)
})
t.Run("mark seen non-existing returns ErrNotFound", func(t *testing.T) {
err := chat.MarkSeen(user, "unknown@d5.io", now)
require.ErrorIs(t, err, database.ErrNotFound)
})
}
func TestChat_Delete(t *testing.T) {
db, cleanup := openDB(t)
defer cleanup()
chat := db.Chat()
user := "masha@d1.com"
contact := "bob@d2.io"
now := time.Now()
err := chat.Put(user, database.Message{
From: contact,
To: user,
Content: []byte("delete me"),
Date: now,
})
require.NoError(t, err)
t.Run("delete existing", func(t *testing.T) {
err := chat.Delete(user, contact, now)
require.NoError(t, err)
list, err := chat.Messages(user, contact, 10, time.Time{})
require.NoError(t, err)
require.Empty(t, list)
})
t.Run("delete non-existing returns ErrNotFound", func(t *testing.T) {
err := chat.Delete(user, "unknown@d5.io", now)
require.ErrorIs(t, err, database.ErrNotFound)
})
t.Run("delete twice returns ErrNotFound", func(t *testing.T) {
err := chat.Delete(user, contact, now)
require.ErrorIs(t, err, database.ErrNotFound)
})
}
func TestChat_AfterClose(t *testing.T) {
path := t.TempDir() + "/db"
db, err := New(path)
require.NoError(t, err)
chat := db.Chat()
err = chat.Put("masha@d1.com", database.Message{
From: "bob@d2.io",
To: "masha@d1.com",
Content: []byte("data"),
Date: time.Now(),
})
require.NoError(t, err)
require.NoError(t, db.Close())
require.NotPanics(t, func() {
chat.Put("masha@d1.com", database.Message{From: "x", To: "y", Date: time.Now()})
chat.Dialogues("masha@d1.com", 10, time.Time{})
chat.Messages("masha@d1.com", "bob@d2.io", 10, time.Time{})
chat.MarkSeen("masha@d1.com", "bob@d2.io", time.Now())
chat.Delete("masha@d1.com", "bob@d2.io", time.Now())
})
}
+15 -12
View File
@@ -11,15 +11,18 @@ import (
)
const (
prefixFileMeta = "filemeta"
prefixFileChunk = "filechunk"
prefixSubsTotal = "substotal"
prefixSubsRef = "subsref"
prefixSubsOut = "subsout"
prefixSubsIn = "subsin"
prefixUsers = "users"
prefixEmailMeta = "emailmeta"
prefixEmailBody = "emailbody"
prefixFileMeta = "filemeta"
prefixFileChunk = "filechunk"
prefixSubsTotal = "substotal"
prefixSubsRef = "subsref"
prefixSubsOut = "subsout"
prefixSubsIn = "subsin"
prefixUsers = "users"
prefixEmailMeta = "emailmeta"
prefixEmailBody = "emailbody"
prefixChatMeta = "chatmeta"
prefixChatAttach = "chatattach"
prefixChatDialog = "chatdialog"
)
var (
@@ -33,7 +36,7 @@ type DB struct {
subscriptions *subscriptions
files *files
email *email
// messages *messages
chat *chat
// posts *posts
// reactions *reactions
// comments *comments
@@ -52,7 +55,7 @@ func New(path string) (*DB, error) {
subscriptions: &subscriptions{db: db, flushInterval: time.Second},
files: &files{db: db},
email: &email{db: db},
// messages: &messages{db: ldb},
chat: &chat{db: db},
// posts: &posts{db: ldb},
// reactions: &reactions{db: ldb},
// comments: &comments{db: ldb},
@@ -96,8 +99,8 @@ func (d *DB) Users() database.Users { return d.users }
func (d *DB) Subscriptions() database.Subscriptions { return d.subscriptions }
func (d *DB) Files() database.Files { return d.files }
func (d *DB) Email() database.Email { return d.email }
func (d *DB) Chat() database.Chat { return d.chat }
// func (d *DB) Messages() database.Messages { return d.messages }
// func (d *DB) Posts() database.Posts { return d.posts }
// func (d *DB) Reactions() database.Reactions { return d.reactions }
// func (d *DB) Comments() database.Comments { return d.comments }
+1 -1
View File
@@ -35,7 +35,7 @@ func TestGetters(t *testing.T) {
require.NotNil(t, db.Subscriptions())
require.NotNil(t, db.Files())
require.NotNil(t, db.Email())
// require.NotNil(t, db.Messages())
require.NotNil(t, db.Chat())
// require.NotNil(t, db.Posts())
// require.NotNil(t, db.Reactions())
// require.NotNil(t, db.Comments())
-9
View File
@@ -1,9 +0,0 @@
package leveldb
import (
"github.com/syndtr/goleveldb/leveldb"
)
type messages struct {
db *leveldb.DB
}