partial postgres implementation

This commit is contained in:
d1nch8g
2026-04-14 21:14:01 +03:00
parent 5d1298258e
commit f3ae27de73
16 changed files with 411 additions and 98 deletions
+12
View File
@@ -0,0 +1,12 @@
{
"sqltools.connections": [
{
"ssh": "Disabled",
"previewLimit": 50,
"server": "localhost",
"driver": "PostgreSQL",
"connectString": "postgres://user:password@localhost:5432/db?sslmode=disable",
"name": "jules-local"
}
]
}
+3 -2
View File
@@ -4,8 +4,6 @@ install:
go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.11.4
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
go install github.com/go-bindata/go-bindata/...@latest
go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest
gen:
@@ -27,3 +25,6 @@ lint:
test:
go test -timeout=30s -count=1 -cover ./...
clean:
sudo rm -rf /tmp/pg
+2 -7
View File
@@ -7,9 +7,8 @@ import (
type Config struct {
TelegramBotToken string
PostgresConnectionString string
PostgresConnString string
DeepSeekAPIKey string
DeepSeekBaseURL string
BraveAPIKey string
}
@@ -18,16 +17,12 @@ func Load() (*Config, error) {
TelegramBotToken: os.Getenv("TELEGRAM_BOT_TOKEN"),
DeepSeekAPIKey: os.Getenv("DEEPSEEK_API_KEY"),
BraveAPIKey: os.Getenv("BRAVE_API_KEY"),
PostgresConnectionString: getEnv("POSTGRES_CONNECTION", "postgres://user:password@localhost:5432/db?sslmode=disable"),
DeepSeekBaseURL: getEnv("DEEPSEEK_BASE_URL", "https://api.deepseek.com"),
PostgresConnString: getEnv("POSTGRES_CONN_STRING", "postgres://user:password@localhost:5432/db?sslmode=disable"),
}
if cfg.TelegramBotToken == "" {
return nil, errors.New("TELEGRAM_BOT_TOKEN is required")
}
if cfg.DeepSeekAPIKey == "" {
return nil, errors.New("DEEPSEEK_API_KEY is required")
}
if cfg.BraveAPIKey == "" {
return nil, errors.New("BRAVE_API_KEY is required")
}
+24 -15
View File
@@ -30,15 +30,6 @@ func TestLoad(t *testing.T) {
wantErr: true,
errMsg: "TELEGRAM_BOT_TOKEN is required",
},
{
name: "missing deepseek key",
env: map[string]string{
"TELEGRAM_BOT_TOKEN": "tg",
"BRAVE_API_KEY": "br",
},
wantErr: true,
errMsg: "DEEPSEEK_API_KEY is required",
},
{
name: "missing brave key",
env: map[string]string{
@@ -49,7 +40,7 @@ func TestLoad(t *testing.T) {
errMsg: "BRAVE_API_KEY is required",
},
{
name: "defaults applied",
name: "default postgres conn string",
env: map[string]string{
"TELEGRAM_BOT_TOKEN": "tg",
"DEEPSEEK_API_KEY": "ds",
@@ -58,13 +49,12 @@ func TestLoad(t *testing.T) {
wantErr: false,
},
{
name: "override defaults",
name: "override postgres conn string",
env: map[string]string{
"TELEGRAM_BOT_TOKEN": "tg",
"DEEPSEEK_API_KEY": "ds",
"BRAVE_API_KEY": "br",
"POSTGRES_CONNECTION": "pg://custom",
"DEEPSEEK_BASE_URL": "https://custom",
"POSTGRES_CONN_STRING": "pg://custom",
},
wantErr: false,
},
@@ -92,8 +82,27 @@ func TestLoad(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if cfg.TelegramBotToken != tt.env["TELEGRAM_BOT_TOKEN"] {
t.Error("TelegramBotToken mismatch")
expectedTG := tt.env["TELEGRAM_BOT_TOKEN"]
if cfg.TelegramBotToken != expectedTG {
t.Errorf("TelegramBotToken: expected %q, got %q", expectedTG, cfg.TelegramBotToken)
}
expectedDS := tt.env["DEEPSEEK_API_KEY"]
if cfg.DeepSeekAPIKey != expectedDS {
t.Errorf("DeepSeekAPIKey: expected %q, got %q", expectedDS, cfg.DeepSeekAPIKey)
}
expectedBR := tt.env["BRAVE_API_KEY"]
if cfg.BraveAPIKey != expectedBR {
t.Errorf("BraveAPIKey: expected %q, got %q", expectedBR, cfg.BraveAPIKey)
}
expectedPG := tt.env["POSTGRES_CONN_STRING"]
if expectedPG == "" {
expectedPG = "postgres://user:password@localhost:5432/db?sslmode=disable"
}
if cfg.PostgresConnString != expectedPG {
t.Errorf("PostgresConnString: expected %q, got %q", expectedPG, cfg.PostgresConnString)
}
})
}
+51 -39
View File
@@ -1,94 +1,106 @@
package db
package database
import (
"context"
"errors"
"time"
"github.com/google/uuid"
)
var (
ErrNotFound = errors.New("entity not found")
ErrAlreadyExists = errors.New("entity already exists")
)
// User represents a Jules user.
type User struct {
ID string
PrefferedChat string
ID uuid.UUID
PreferredChat string
Language string
Timezone string
}
// Chat links a user to an external messaging platform.
type Chat struct {
UserID string
UserID uuid.UUID
Platform string // "telegram", "email", "whatsapp"
Identifier string // @username, email, phone
}
// Metadata stores facts Jules knows about a user.
type Metadata struct {
UserID string
ID uuid.UUID
UserID uuid.UUID
CreatedAt time.Time
Value string // "mom's name is Irina", "sleeps at 23:30"
Value string
}
// Contact represents a relationship between two Jules users.
type Contact struct {
OwnerID string // User who owns this contact
TargetID string // Target user ID
OwnerID uuid.UUID // User who owns this contact
TargetID uuid.UUID // Target user ID
Name string // "mom", "brother", "Lena"
}
// Notification is a scheduled reminder or check-in.
type Notification struct {
ID string
UserID string
ID uuid.UUID
UserID uuid.UUID
ScheduledAt time.Time
Content string // "call mom", "morning workout"
Content string // "call mom", "morning workout", "make a compliment", "talk to random person"
}
// Action records an interaction between Jules and a user.
type Action struct {
UserID string
ID uuid.UUID
UserID uuid.UUID
Type string // "user_msg", "jules_msg", "call", "ping_contact"
Content string
CreatedAt time.Time
}
// UserRepository manages User persistence.
type UserRepository interface {
// Users manages user persistence.
type Users interface {
Create(ctx context.Context) (*User, error)
Get(ctx context.Context, id string) (*User, error)
Delete(ctx context.Context, id string) error
Get(ctx context.Context, id uuid.UUID) (*User, error)
Update(ctx context.Context, user *User) error
Delete(ctx context.Context, id uuid.UUID) error
}
// ChatRepository manages Chat persistence.
type ChatRepository interface {
Attach(ctx context.Context, userID, platform, identifier string) error
GetUserID(ctx context.Context, platform, identifier string) (string, error)
GetChats(ctx context.Context, userID string) ([]Chat, error)
Detach(ctx context.Context, userID, platform string) error
// Chats manages chat persistence.
type Chats interface {
Attach(ctx context.Context, userID uuid.UUID, platform, identifier string) error
GetUserID(ctx context.Context, platform, identifier string) (uuid.UUID, error)
List(ctx context.Context, userID uuid.UUID) ([]Chat, error)
Detach(ctx context.Context, userID uuid.UUID, platform string) error
}
// MetadataRepository manages Metadata persistence.
type MetadataRepository interface {
Add(ctx context.Context, userID, value string) error
GetAll(ctx context.Context, userID string) ([]Metadata, error)
Delete(ctx context.Context, userID, value string) error
DeleteOlderThan(ctx context.Context, userID string, t time.Time) error
// MetadataStore manages metadata persistence.
type MetadataStore interface {
Add(ctx context.Context, userID uuid.UUID, value string) error
List(ctx context.Context, userID uuid.UUID) ([]Metadata, error)
Delete(ctx context.Context, id uuid.UUID) error
DeleteOlderThan(ctx context.Context, userID uuid.UUID, t time.Time) error
}
// ContactRepository manages Contact persistence.
type ContactRepository interface {
// Contacts manages contact persistence.
type Contacts interface {
Add(ctx context.Context, contact *Contact) error
Get(ctx context.Context, ownerID, name string) (*Contact, error)
GetAll(ctx context.Context, ownerID string) ([]Contact, error)
Delete(ctx context.Context, ownerID, name string) error
Get(ctx context.Context, ownerID uuid.UUID, name string) (*Contact, error)
List(ctx context.Context, ownerID uuid.UUID) ([]Contact, error)
Delete(ctx context.Context, ownerID uuid.UUID, name string) error
}
// NotificationRepository manages the notification queue.
type NotificationRepository interface {
// Notifications manages the notification queue.
type Notifications interface {
Push(ctx context.Context, n *Notification) error
Pop(ctx context.Context, limit int) ([]Notification, error)
}
// ActionRepository manages the action log.
type ActionRepository interface {
// Actions manages the action log.
type Actions interface {
Log(ctx context.Context, a *Action) error
GetRecent(ctx context.Context, userID string, limit int) ([]Action, error)
DeleteOlderThan(ctx context.Context, userID string, t time.Time) error
Recent(ctx context.Context, userID uuid.UUID, limit int) ([]Action, error)
DeleteOlderThan(ctx context.Context, userID uuid.UUID, t time.Time) error
}
+39
View File
@@ -0,0 +1,39 @@
package postgres
import (
"database/sql"
"embed"
"errors"
"fmt"
"io/fs"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/golang-migrate/migrate/v4/source/iofs"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
func runMigrations(conn *sql.DB) error {
files, _ := fs.Sub(migrationsFS, "migrations")
sourceDriver, err := iofs.New(files, ".")
if err != nil {
return fmt.Errorf("create iofs driver: %w", err)
}
driver, err := postgres.WithInstance(conn, &postgres.Config{})
if err != nil {
return fmt.Errorf("create postgres driver: %w", err)
}
m, _ := migrate.NewWithInstance("iofs", sourceDriver, "postgres://", driver)
err = m.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
return fmt.Errorf("run up migrations: %w", err)
}
return nil
}
+53
View File
@@ -0,0 +1,53 @@
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
preferred_chat TEXT NOT NULL DEFAULT 'telegram',
language TEXT NOT NULL DEFAULT 'en',
timezone TEXT NOT NULL DEFAULT 'UTC'
);
CREATE TABLE chats (
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
platform TEXT NOT NULL,
identifier TEXT NOT NULL,
PRIMARY KEY (user_id, platform)
);
CREATE TABLE metadata (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
value TEXT NOT NULL
);
CREATE INDEX idx_metadata_user_id ON metadata(user_id);
CREATE INDEX idx_metadata_created_at ON metadata(created_at);
CREATE TABLE contacts (
owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
target_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
name TEXT NOT NULL,
PRIMARY KEY (owner_id, target_id)
);
CREATE INDEX idx_contacts_owner_id ON contacts(owner_id);
CREATE TABLE notifications (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
scheduled_at TIMESTAMPTZ NOT NULL,
content TEXT NOT NULL
);
CREATE INDEX idx_notifications_scheduled_at ON notifications(scheduled_at);
CREATE INDEX idx_notifications_user_id ON notifications(user_id);
CREATE TABLE actions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
type TEXT NOT NULL CHECK (type IN ('user_msg', 'jules_msg', 'call', 'ping_contact')),
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_actions_user_id ON actions(user_id);
CREATE INDEX idx_actions_created_at ON actions(created_at);
+112
View File
@@ -0,0 +1,112 @@
package postgres
import (
"context"
"database/sql"
"embed"
"os"
"testing"
_ "github.com/lib/pq"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func getTestConnString() string {
connString := os.Getenv("POSTGRES_CONN_STRING")
if connString == "" {
connString = "postgres://user:password@localhost:5432/db?sslmode=disable"
}
return connString
}
func getTestConn(t *testing.T) *sql.DB {
conn, err := sql.Open("postgres", getTestConnString())
require.NoError(t, err)
ctx := context.Background()
require.NoError(t, conn.PingContext(ctx))
t.Cleanup(func() {
cleanTables(t, conn)
conn.Close()
})
return conn
}
func cleanTables(_ *testing.T, conn *sql.DB) {
tables := []string{"actions", "notifications", "contacts", "metadata", "chats", "users"}
for _, table := range tables {
_, _ = conn.ExecContext(context.Background(), "TRUNCATE TABLE "+table+" CASCADE")
}
}
func dropSchema(_ *testing.T, conn *sql.DB) {
tables := []string{"actions", "notifications", "contacts", "metadata", "chats", "users"}
for _, table := range tables {
_, _ = conn.ExecContext(context.Background(), "DROP TABLE IF EXISTS "+table+" CASCADE")
}
_, _ = conn.ExecContext(context.Background(), "DROP TABLE IF EXISTS schema_migrations")
}
func TestRunMigrations_Success(t *testing.T) {
conn := getTestConn(t)
dropSchema(t, conn)
err := runMigrations(conn)
assert.NoError(t, err)
var count int
err = conn.QueryRowContext(context.Background(), "SELECT COUNT(*) FROM users").Scan(&count)
assert.NoError(t, err)
}
func TestRunMigrations_AlreadyApplied(t *testing.T) {
conn := getTestConn(t)
dropSchema(t, conn)
err := runMigrations(conn)
require.NoError(t, err)
err = runMigrations(conn)
assert.NoError(t, err)
}
func TestRunMigrations_InvalidConn(t *testing.T) {
conn, err := sql.Open("postgres", "postgres://invalid:5432/db")
require.NoError(t, err)
defer conn.Close()
err = runMigrations(conn)
assert.Error(t, err)
}
func TestRunMigrations_FailedCreateIOFSDriver(t *testing.T) {
holder := migrationsFS
migrationsFS = embed.FS{}
conn, err := sql.Open("postgres", "postgres://invalid:5432/db")
require.NoError(t, err)
defer conn.Close()
err = runMigrations(conn)
assert.Error(t, err)
assert.Contains(t, err.Error(), "create iofs driver")
migrationsFS = holder
}
func TestRunMigrations_FailedUp(t *testing.T) {
conn := getTestConn(t)
dropSchema(t, conn)
_, err := conn.ExecContext(context.Background(), `CREATE TABLE users ()`)
require.NoError(t, err)
err = runMigrations(conn)
assert.Error(t, err)
assert.Contains(t, err.Error(), "run up migrations")
dropSchema(t, conn)
}
+40 -1
View File
@@ -1 +1,40 @@
package db
package postgres
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/lib/pq"
)
type DB struct {
conn *sql.DB
}
func New(connString string) (*DB, error) {
conn, _ := sql.Open("postgres", connString)
conn.SetMaxOpenConns(25)
conn.SetMaxIdleConns(5)
conn.SetConnMaxLifetime(5 * time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := conn.PingContext(ctx); err != nil {
conn.Close()
return nil, fmt.Errorf("ping database: %w", err)
}
if err := runMigrations(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("run migrations: %w", err)
}
return &DB{conn: conn}, nil
}
func (db *DB) Close() error {
return db.conn.Close()
}
+42 -1
View File
@@ -1 +1,42 @@
package db
package postgres
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNew_Success(t *testing.T) {
connString := getTestConnString()
db, err := New(connString)
require.NoError(t, err)
assert.NotNil(t, db)
defer db.Close()
}
func TestNew_PingFailed(t *testing.T) {
_, err := New("postgres://user:pass@localhost:54321/db?sslmode=disable")
assert.Error(t, err)
assert.Contains(t, err.Error(), "ping database")
}
func TestNew_RunMigrationsFailed(t *testing.T) {
// Берём отдельное соединение, чтобы сломать схему
conn := getTestConn(t)
dropSchema(t, conn)
// Создаём таблицу users вручную
_, err := conn.ExecContext(context.Background(), `CREATE TABLE users ()`)
require.NoError(t, err)
// Теперь New на другом соединении упадёт на runMigrations
_, err = New(getTestConnString())
assert.Error(t, err)
assert.Contains(t, err.Error(), "run migrations")
// Чистим за собой
dropSchema(t, conn)
}
+1 -12
View File
@@ -11,18 +11,7 @@ services:
- /tmp/pg:/var/lib/postgresql/data
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U p_user -d nf"]
test: ["CMD-SHELL", "pg_isready -U user -d db"]
interval: 30s
timeout: 10s
retries: 3
telegram_api:
image: ilya2ik/tgbotapiserver:latest
ports:
- "8081:8081"
environment:
SERVER_PORT: 8081
SERVER_HOST: 0.0.0.0
BOT_TOKEN: "test_token"
LOG_LEVEL: "info"
+5 -2
View File
@@ -4,11 +4,14 @@ go 1.26.1
require (
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1
github.com/golang-migrate/migrate/v4 v4.19.1
github.com/google/uuid v1.6.0
github.com/lib/pq v1.12.3
github.com/stretchr/testify v1.11.1
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+10 -4
View File
@@ -1,9 +1,15 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGiUaOtXxdrINDz1b0J1w0SzqDc=
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/golang-migrate/migrate/v4 v4.19.1 h1:OCyb44lFuQfYXYLx1SCxPZQGU7mcaZ7gH9yH4jSFbBA=
github.com/golang-migrate/migrate/v4 v4.19.1/go.mod h1:CTcgfjxhaUtsLipnLoQRWCrjYXycRz/g5+RWDuYgPrE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.12.3 h1:tTWxr2YLKwIvK90ZXEw8GP7UFHtcbTtty8zsI+YjrfQ=
github.com/lib/pq v1.12.3/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+6 -4
View File
@@ -72,8 +72,9 @@ func (c *Client) Search(ctx context.Context, query string) (string, error) {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("status %d: %s", resp.StatusCode, body)
var body []byte
body, err = io.ReadAll(resp.Body)
return "", errors.Join(fmt.Errorf("status %d: %s", resp.StatusCode, body), err)
}
var webResp webSearchResponse
@@ -103,8 +104,9 @@ func (c *Client) Search(ctx context.Context, query string) (string, error) {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("summarizer status %d: %s", resp.StatusCode, body)
var body []byte
body, err = io.ReadAll(resp.Body)
return "", errors.Join(fmt.Errorf("status %d: %s", resp.StatusCode, body), err)
}
var sumResp summarizerResponse
+1 -1
View File
@@ -121,7 +121,7 @@ func TestSearch_HTTPError_Summarizer(t *testing.T) {
client := New("key", server.URL)
_, err := client.Search(context.Background(), "query")
assert.Error(t, err)
assert.Contains(t, err.Error(), "summarizer status 500")
assert.Contains(t, err.Error(), "status 500")
}
func TestSearch_InvalidJSON_Summarizer(t *testing.T) {