93 lines
2.2 KiB
Go
93 lines
2.2 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"time"
|
|
|
|
"github.com/d1nch8g/jules/database"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type Notifications struct {
|
|
conn *sql.DB
|
|
}
|
|
|
|
func (n *Notifications) Push(ctx context.Context, notif *database.Notification) error {
|
|
_, err := n.conn.ExecContext(ctx, `
|
|
INSERT INTO notifications (id, user_id, scheduled_at, content)
|
|
VALUES ($1, $2, $3, $4)
|
|
`, notif.ID, notif.UserID, notif.ScheduledAt, notif.Content)
|
|
return err
|
|
}
|
|
|
|
func (n *Notifications) Pop(ctx context.Context, limit int) ([]database.Notification, error) {
|
|
now := time.Now().UTC().Truncate(time.Minute)
|
|
nextMinute := now.Add(time.Minute)
|
|
|
|
rows, err := n.conn.QueryContext(ctx, `
|
|
WITH batch AS (
|
|
SELECT id
|
|
FROM notifications
|
|
WHERE scheduled_at >= $1 AND scheduled_at < $2
|
|
ORDER BY scheduled_at
|
|
LIMIT $3
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
DELETE FROM notifications
|
|
WHERE id IN (SELECT id FROM batch)
|
|
RETURNING id, user_id, scheduled_at, content
|
|
`, now, nextMinute, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var notifs []database.Notification
|
|
for rows.Next() {
|
|
var n database.Notification
|
|
if err := rows.Scan(&n.ID, &n.UserID, &n.ScheduledAt, &n.Content); err != nil {
|
|
return nil, err
|
|
}
|
|
notifs = append(notifs, n)
|
|
}
|
|
return notifs, rows.Err()
|
|
}
|
|
|
|
func (n *Notifications) List(ctx context.Context, userID uuid.UUID) ([]database.Notification, error) {
|
|
rows, err := n.conn.QueryContext(ctx, `
|
|
SELECT id, user_id, scheduled_at, content
|
|
FROM notifications
|
|
WHERE user_id = $1
|
|
ORDER BY scheduled_at
|
|
`, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var notifs []database.Notification
|
|
for rows.Next() {
|
|
var notif database.Notification
|
|
if err := rows.Scan(¬if.ID, ¬if.UserID, ¬if.ScheduledAt, ¬if.Content); err != nil {
|
|
return nil, err
|
|
}
|
|
notifs = append(notifs, notif)
|
|
}
|
|
return notifs, rows.Err()
|
|
}
|
|
|
|
func (n *Notifications) Delete(ctx context.Context, id uuid.UUID) error {
|
|
result, err := n.conn.ExecContext(ctx, `
|
|
DELETE FROM notifications WHERE id = $1
|
|
`, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rows, _ := result.RowsAffected()
|
|
if rows == 0 {
|
|
return database.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|