finished implementing processing module, not too clean, but at least something

This commit is contained in:
d1nch8g
2026-04-19 22:07:59 +03:00
parent bfc9c1b4f5
commit b1484611bc
6 changed files with 182 additions and 134 deletions
@@ -40,7 +40,7 @@ CREATE TABLE contacts (
CREATE TABLE notifications (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
initiator_id UUID REFERENCES users(id) ON DELETE SET NULL,
initiator_id UUID REFERENCES users(id) ON DELETE CASCADE,
scheduled_at TIMESTAMP NOT NULL,
content TEXT NOT NULL
);
+124 -107
View File
@@ -7,10 +7,9 @@ import (
"slices"
"time"
"github.com/d1nch8g/jules/chat"
"github.com/d1nch8g/jules/database"
"github.com/d1nch8g/jules/engine/actions"
"github.com/d1nch8g/jules/engine/log"
"github.com/d1nch8g/jules/engine/jtime"
"github.com/d1nch8g/jules/engine/prompt"
"github.com/google/uuid"
)
@@ -32,7 +31,7 @@ func (e *Engine) validateActions(ctx context.Context, actionSlice []any, promptC
case actions.AddNotification:
found := slices.ContainsFunc(promptCtx.Contacts, func(c database.Contact) bool {
return c.Name == action.Target
return action.Target == c.Name || action.Target == "self"
})
if !found {
errs = append(errs, fmt.Errorf("contact target %s is invalid, provide exact name", action.Target))
@@ -87,137 +86,155 @@ func (e *Engine) validateActions(ctx context.Context, actionSlice []any, promptC
return errors.Join(errs...)
}
func (e *Engine) executeActions(ctx context.Context, actionSlice []any) error {
func (e *Engine) runActions(ctx context.Context, actionSlice []any, user *database.User, promptCtx *prompt.Context) error {
for _, actionAny := range actionSlice {
switch action := actionAny.(type) {
case actions.BindChat:
// TODO
targetUser, err := e.Database.Users().Get(ctx, uuid.MustParse(action.UUID), database.UserLookupByBindCode)
if err != nil {
return err
}
err = e.Database.Users().Delete(ctx, user.ID)
if err != nil {
return err
}
var errs []error
for _, chat := range promptCtx.Chats {
errs = append(errs, e.Database.Chats().Attach(ctx, targetUser.ID, chat.Platform, chat.Identifier))
}
for _, contact := range promptCtx.Contacts {
errs = append(errs, e.Database.Contacts().Add(ctx, &database.Contact{
OwnerID: targetUser.ID,
TargetID: contact.TargetID,
Name: contact.Name,
}))
}
for _, fact := range promptCtx.Facts {
errs = append(errs, e.Database.Facts().Add(ctx, targetUser.ID, fact.Value))
}
for _, notif := range promptCtx.IncomingNotifications {
if notif.InitiatorID == user.ID {
notif.InitiatorID = targetUser.ID
}
errs = append(errs, e.Database.Notifications().Push(ctx, &database.Notification{
ID: notif.ID,
UserID: targetUser.ID,
InitiatorID: notif.InitiatorID,
ScheduledAt: notif.ScheduledAt,
Content: notif.Content,
}))
}
for _, notif := range promptCtx.OutgoingNotificaions {
errs = append(errs, e.Database.Notifications().Push(ctx, &database.Notification{
ID: notif.ID,
UserID: notif.UserID,
InitiatorID: targetUser.ID,
ScheduledAt: notif.ScheduledAt,
Content: notif.Content,
}))
}
if err = errors.Join(errs...); err != nil {
return err
}
user = targetUser
case actions.Message:
// TODO
var platformID string
for _, chat := range promptCtx.Chats {
if action.Platform == chat.Platform {
platformID = chat.Identifier
}
}
if err := e.Chats[action.Platform].Send(ctx, platformID, action.Text); err != nil {
return err
}
case actions.Wait:
// TODO
time.Sleep(time.Duration(action.Ms) * time.Millisecond)
case actions.UpdateLang:
// TODO
user.Language = action.Lang
if err := e.Database.Users().Update(ctx, user); err != nil {
return err
}
case actions.UpdateTZ:
// TODO
user.Timezone = action.TZ
if err := e.Database.Users().Update(ctx, user); err != nil {
return err
}
case actions.SetChat:
// TODO
user.PreferredChat = action.Chat
if err := e.Database.Users().Update(ctx, user); err != nil {
return err
}
case actions.AddFact:
// TODO
if err := e.Database.Facts().Add(ctx, user.ID, action.Value); err != nil {
return err
}
case actions.RemoveFact:
// TODO
if err := e.Database.Facts().Delete(ctx, user.ID, action.Value); err != nil {
return err
}
case actions.AddContact:
// TODO
contactUser, err := e.Database.Users().Get(ctx, uuid.MustParse(action.UUID), database.UserLookupByContactCode)
if err != nil {
return err
}
err = e.Database.Contacts().Add(ctx, &database.Contact{
OwnerID: user.ID,
TargetID: contactUser.ID,
Name: action.Name,
})
if err != nil {
return err
}
case actions.AddNotification:
// TODO
t, _ := jtime.ToUTC(action.Time, user.Timezone)
initiatorID := user.ID
if action.Target != "self" {
for _, contact := range promptCtx.Contacts {
initiatorID = contact.TargetID
}
}
err := e.Database.Notifications().Push(ctx, &database.Notification{
ID: uuid.New(),
UserID: user.ID,
InitiatorID: initiatorID,
ScheduledAt: t,
Content: action.Content,
})
if err != nil {
return err
}
case actions.RemoveNotification:
// TODO
err := e.Database.Notifications().Delete(ctx, uuid.MustParse(action.UUID))
if err != nil {
return err
}
case actions.Search:
// TODO
// TODO: currently not supported
}
}
return nil
}
func (e *Engine) executeBindChat(ctx context.Context, user *database.User, act actions.BindChat, msg chat.Message) error {
targetUUID, err := uuid.Parse(act.UUID)
if err != nil {
return fmt.Errorf("unable to parse uuid: %w", err)
}
}
func (e *Engine) executeWait(act actions.Wait) {
time.Sleep(time.Duration(act.Ms) * time.Millisecond)
}
func (e *Engine) executeUpdateLang(ctx context.Context, user *database.User, act actions.UpdateLang) error {
user.Language = act.Lang
return e.Database.Users().Update(ctx, user)
}
func (e *Engine) executeUpdateTZ(ctx context.Context, user *database.User, act actions.UpdateTZ) error {
user.Timezone = act.TZ
return e.Database.Users().Update(ctx, user)
}
func (e *Engine) executeSetChat(ctx context.Context, user *database.User, act actions.SetChat) error {
user.PreferredChat = act.Chat
return e.Database.Users().Update(ctx, user)
}
func (e *Engine) executeAddFact(ctx context.Context, user *database.User, act actions.AddFact) error {
return e.Database.Facts().Add(ctx, user.ID, act.Value)
}
func (e *Engine) executeRemoveFact(ctx context.Context, user *database.User, act actions.RemoveFact) error {
return e.Database.Facts().Delete(ctx, user.ID, act.Value)
}
func (e *Engine) executeAddContact(ctx context.Context, user *database.User, act actions.AddContact) error {
targetUUID, err := uuid.Parse(act.UUID)
if err != nil {
return err
}
targetUser, err := e.Database.Users().GetByTempCode(ctx, targetUUID)
if err != nil {
// if errors.Is()
}
return e.Database.Contacts().Add(ctx, &database.Contact{
OwnerID: user.ID,
TargetID: targetUUID,
Name: act.Name,
})
}
func (e *Engine) executeAddNotification(ctx context.Context, user *database.User, act actions.AddNotification) error {
scheduledAt, err := toUTC(act.Time, user.Timezone)
if err != nil {
return err
}
var targetID uuid.UUID
if act.Target == "self" {
targetID = user.ID
} else {
// TODO: resolve contact name to UUID
targetID = user.ID
}
return e.Database.Notifications().Push(ctx, &database.Notification{
ID: uuid.New(),
UserID: targetID,
InitiatorID: user.ID,
ScheduledAt: scheduledAt,
Content: act.Content,
})
}
func (e *Engine) executeRemoveNotification(ctx context.Context, act actions.RemoveNotification) error {
id, err := uuid.Parse(act.UUID)
if err != nil {
return err
}
return e.Database.Notifications().Delete(ctx, id)
}
func (e *Engine) executeSearch(ctx context.Context, user *database.User, act actions.Search, le *log.Event) (string, error) {
return e.Searcher.Search(ctx, act.Query)
}
func (e *Engine) executeMessage(ctx context.Context, user *database.User, act actions.Message, le *log.Event) error {
// Отправка сообщения через нужный мессенджер
return nil
}
+13 -3
View File
@@ -3,6 +3,7 @@ package log
import (
"context"
"log/slog"
"slices"
"time"
"github.com/d1nch8g/jules/chat"
@@ -13,9 +14,7 @@ import (
type Event struct {
ctx context.Context
attrs []slog.Attr
level slog.Level
start time.Time
msg string
}
func FromMessage(ctx context.Context, msg chat.Message) *Event {
@@ -61,7 +60,8 @@ func (e *Event) Context(ctx *prompt.Context) *Event {
slog.String("user_lang", ctx.UserLanguage),
slog.String("user_tz", ctx.UserTimezone),
slog.String("user_chat", ctx.UserPreferredChat),
slog.String("user_tmp_code", ctx.UserTemporaryCode.String()),
slog.String("user_bind_code", ctx.UserBindCode.String()),
slog.String("user_contact_code", ctx.UserContactCode.String()),
)
for _, chat := range ctx.Chats {
@@ -102,6 +102,16 @@ func (e *Event) LLMResponse(content string) *Event {
return e
}
func (e *Event) Actions(actions []any) *Event {
e.attrs = slices.DeleteFunc(e.attrs, func(a slog.Attr) bool {
return a.Key == "action"
})
for _, action := range actions {
e.attrs = append(e.attrs, slog.Any("action", action))
}
return e
}
func (e *Event) Info(msg string) {
e.log(slog.LevelInfo, msg)
}
+17 -2
View File
@@ -9,6 +9,7 @@ import (
"github.com/d1nch8g/jules/chat"
"github.com/d1nch8g/jules/database"
"github.com/d1nch8g/jules/engine/actions"
"github.com/d1nch8g/jules/engine/prompt"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
@@ -60,7 +61,8 @@ func TestEvent_Context(t *testing.T) {
UserLanguage: "ru",
UserTimezone: "Europe/Moscow",
UserPreferredChat: "telegram",
UserTemporaryCode: uuid.New(),
UserBindCode: uuid.New(),
UserContactCode: uuid.New(),
Chats: []database.Chat{
{Platform: "telegram"},
{Platform: "whatsapp"},
@@ -85,7 +87,7 @@ func TestEvent_Context(t *testing.T) {
e := FromMessage(context.Background(), chat.Message{}).Context(ctx)
assert.Len(t, e.attrs, 16)
assert.Len(t, e.attrs, 17)
}
func TestEvent_Context_Nil(t *testing.T) {
@@ -176,3 +178,16 @@ func TestEvent_LLMResponse(t *testing.T) {
assert.Equal(t, "llm_response", e.attrs[4].Key)
assert.Equal(t, "second", e.attrs[4].Value.String())
}
func TestEvent_Actions(t *testing.T) {
e := FromMessage(context.Background(), chat.Message{})
actions := []any{
actions.AddFact{Type: "add_fact", Value: "test fact"},
actions.Message{Type: "message", Platform: "telegram", Text: "hello"},
}
e.Actions(actions)
assert.Len(t, e.attrs, 6) // 4 from message + 2 actions
}
+9 -5
View File
@@ -80,8 +80,12 @@ func (e *Engine) process(ctx context.Context, le *log.Event, user *database.User
return
}
//TODO: finish
_ = actions
err = e.runActions(ctx, actions, user, promptContext)
if err != nil {
le.Error("failed to run actions", err)
return
}
le.Info("successfully processed")
}
func (e *Engine) collectPromptContext(ctx context.Context, user *database.User, source, message string) (*prompt.Context, error) {
@@ -153,15 +157,15 @@ func (e *Engine) executePrompt(ctx context.Context, promptCtx *prompt.Context, t
if err != nil {
promptCtx.Error = errors.Join(promptCtx.Error, err)
le.LLMResponse(result)
le.Warn("failed to parse response provided by llm")
le.Warn("failed to parse actions by llm")
continue
}
err = e.validateActions(ctx, actions, *promptCtx)
if err != nil {
promptCtx.Error = errors.Join(promptCtx.Error, err)
le.LLMResponse(result)
le.Warn("failed to validate llm input")
le.Actions(actions)
le.Warn("failed to validate actions")
continue
}
+18 -16
View File
@@ -89,6 +89,7 @@ FORMAT: Keep facts short. One fact per value. "mom name is Irina", "Likes fitnes
- If you received errors - try to figure out wether you can fix them by yourself, or you need to inform user (invalid UUID/non existing user)
- BIND CODE is private and used for "bind_chat". CONTACT CODE is public and used for "add_contact".
- Always send any UUID (bind or contact) in a SEPARATE message, not bundled with other text.
- NEVER use indentation to make responses faster
=== AVAILABLE ACTIONS ===
{"type": "message", "platform": "telegram", "text": "short response"}
@@ -100,13 +101,14 @@ FORMAT: Keep facts short. One fact per value. "mom name is Irina", "Likes fitnes
{"type": "remove_notification", "uuid": "123e4567-e89b-12d3-a456-426614174000"}
{"type": "add_contact", "uuid": "123e4567-e89b-12d3-a456-426614174000", "name": "Contact Name"}
{"type": "bind_chat", "uuid": "123e4567-e89b-12d3-a456-426614174000"}
{"type": "search", "query": "search query"}
{"type": "update_lang", "lang": "ru"}
{"type": "update_tz", "tz": "Europe/Moscow"}
{"type": "set_chat", "chat": "telegram"}
`
// {"type": "search", "query": "search query"} // temporary disabled
type Context struct {
UserLanguage string
UserTimezone string
@@ -132,21 +134,6 @@ func Build(ctx Context) string {
b.WriteString(masterPrompt)
b.WriteString("\n=== USER CONTEXT ===\n")
fmt.Fprintf(&b, "Language: %s\n", ctx.UserLanguage)
currentTime := jtime.CurrentLocalTime(ctx.UserTimezone)
loc, _ := time.LoadLocation(ctx.UserTimezone)
if loc == nil {
loc = time.UTC
}
weekday := time.Now().In(loc).Format("Monday")
fmt.Fprintf(&b, "Time: %s (%s)\n", currentTime, weekday)
fmt.Fprintf(&b, "User chat (selected): %s\n", ctx.UserPreferredChat)
fmt.Fprintf(&b, "Bind code: %s\n", ctx.UserBindCode.String())
fmt.Fprintf(&b, "Contact code: %s\n", ctx.UserContactCode.String())
if len(ctx.Chats) > 0 {
b.WriteString("Connected chats:\n")
for _, chat := range ctx.Chats {
@@ -192,6 +179,21 @@ func Build(ctx Context) string {
}
}
b.WriteString("\n=== USER CONTEXT ===\n")
fmt.Fprintf(&b, "Language: %s\n", ctx.UserLanguage)
currentTime := jtime.CurrentLocalTime(ctx.UserTimezone)
loc, _ := time.LoadLocation(ctx.UserTimezone)
if loc == nil {
loc = time.UTC
}
weekday := time.Now().In(loc).Format("Monday")
fmt.Fprintf(&b, "Time: %s (%s)\n", currentTime, weekday)
fmt.Fprintf(&b, "User chat (selected): %s\n", ctx.UserPreferredChat)
fmt.Fprintf(&b, "Bind code: %s\n", ctx.UserBindCode.String())
fmt.Fprintf(&b, "Contact code: %s\n", ctx.UserContactCode.String())
if ctx.Error != nil {
b.WriteString("\nExecution errors:\n")
b.WriteString(ctx.Error.Error())