tests stability of main orchestrator improved, additional edge cases are well-tested
This commit is contained in:
+172
-66
@@ -149,7 +149,6 @@ func TestNew(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Mocks (остаются без изменений)
|
||||
type mockChat struct {
|
||||
sendErr error
|
||||
receiveCh chan chat.Message
|
||||
@@ -276,45 +275,6 @@ func TestRun(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestRunWorkers_Extended(t *testing.T) {
|
||||
t.Run("worker exits when messages channel closed", func(t *testing.T) {
|
||||
messages := make(chan chat.Message)
|
||||
notifications := make(chan database.Notification)
|
||||
|
||||
close(messages)
|
||||
close(notifications)
|
||||
|
||||
e := &Engine{Parameters: &Parameters{NumWorkers: 1}}
|
||||
e.processMessage = func(ctx context.Context, msg chat.Message) {}
|
||||
e.processNotification = func(ctx context.Context, n database.Notification) {}
|
||||
|
||||
e.runWorkers(context.Background(), messages, notifications)
|
||||
})
|
||||
|
||||
t.Run("worker stops on context done", func(t *testing.T) {
|
||||
messages := make(chan chat.Message)
|
||||
notifications := make(chan database.Notification)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
var stopped atomic.Bool
|
||||
e := &Engine{Parameters: &Parameters{NumWorkers: 1}}
|
||||
e.processMessage = func(ctx context.Context, msg chat.Message) {}
|
||||
e.processNotification = func(ctx context.Context, n database.Notification) {}
|
||||
|
||||
go func() {
|
||||
e.runWorkers(ctx, messages, notifications)
|
||||
stopped.Store(true)
|
||||
}()
|
||||
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
cancel()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
assert.True(t, stopped.Load())
|
||||
})
|
||||
}
|
||||
|
||||
func TestConsumeNotifications_Extended(t *testing.T) {
|
||||
t.Run("context done during notification emit", func(t *testing.T) {
|
||||
mockNotifs := &mockNotifications{
|
||||
@@ -385,31 +345,6 @@ func TestConsumeNotifications_Extended(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestConsumeChatMessages_Extended(t *testing.T) {
|
||||
t.Run("context done during message forward", func(t *testing.T) {
|
||||
ch := make(chan chat.Message, 1)
|
||||
ch <- chat.Message{ID: "1", Text: "hello"}
|
||||
|
||||
e := &Engine{
|
||||
Parameters: &Parameters{
|
||||
Chats: []chat.Chat{&mockChat{receiveCh: ch}},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
out := e.consumeChatMessages(ctx)
|
||||
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case _, ok := <-out:
|
||||
assert.False(t, ok)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("channel not closed")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestConsumeNotifications_PopError(t *testing.T) {
|
||||
mockNotifs := &mockNotifications{
|
||||
popFunc: func(ctx context.Context, limit int) ([]database.Notification, error) {
|
||||
@@ -429,7 +364,7 @@ func TestConsumeNotifications_PopError(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
defer cancel()
|
||||
|
||||
out := e.consumeNotifications(ctx)
|
||||
@@ -443,3 +378,174 @@ func TestConsumeNotifications_PopError(t *testing.T) {
|
||||
assert.Contains(t, buf.String(), "failed to pop notifications")
|
||||
assert.Contains(t, buf.String(), "db connection lost")
|
||||
}
|
||||
|
||||
func TestRunWorkers_ProcessCalls(t *testing.T) {
|
||||
t.Run("processMessage called for message", func(t *testing.T) {
|
||||
messages := make(chan chat.Message, 1)
|
||||
notifications := make(chan database.Notification)
|
||||
|
||||
var called atomic.Bool
|
||||
ready := make(chan struct{})
|
||||
|
||||
e := &Engine{Parameters: &Parameters{NumWorkers: 1}}
|
||||
e.processMessage = func(ctx context.Context, msg chat.Message) {
|
||||
called.Store(true)
|
||||
}
|
||||
e.processNotification = func(ctx context.Context, n database.Notification) {}
|
||||
|
||||
go func() {
|
||||
ready <- struct{}{}
|
||||
e.runWorkers(context.Background(), messages, notifications)
|
||||
}()
|
||||
|
||||
<-ready
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
messages <- chat.Message{ID: "123", Text: "hello"}
|
||||
close(messages)
|
||||
close(notifications)
|
||||
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
assert.True(t, called.Load())
|
||||
})
|
||||
|
||||
t.Run("processNotification called for notification", func(t *testing.T) {
|
||||
messages := make(chan chat.Message)
|
||||
notifications := make(chan database.Notification, 1)
|
||||
|
||||
var called atomic.Bool
|
||||
ready := make(chan struct{})
|
||||
|
||||
e := &Engine{Parameters: &Parameters{NumWorkers: 1}}
|
||||
e.processMessage = func(ctx context.Context, msg chat.Message) {}
|
||||
e.processNotification = func(ctx context.Context, n database.Notification) {
|
||||
called.Store(true)
|
||||
}
|
||||
|
||||
go func() {
|
||||
ready <- struct{}{}
|
||||
e.runWorkers(context.Background(), messages, notifications)
|
||||
}()
|
||||
|
||||
<-ready
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
notifications <- database.Notification{ID: uuid.New(), Content: "test"}
|
||||
close(messages)
|
||||
close(notifications)
|
||||
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
assert.True(t, called.Load())
|
||||
})
|
||||
}
|
||||
|
||||
func TestRunWorkers_MessageChannelClosed(t *testing.T) {
|
||||
messages := make(chan chat.Message)
|
||||
notifications := make(chan database.Notification)
|
||||
close(messages)
|
||||
close(notifications)
|
||||
|
||||
var msgProcessed atomic.Bool
|
||||
e := &Engine{Parameters: &Parameters{NumWorkers: 1}}
|
||||
e.processMessage = func(ctx context.Context, msg chat.Message) {
|
||||
msgProcessed.Store(true)
|
||||
}
|
||||
e.processNotification = func(ctx context.Context, n database.Notification) {}
|
||||
|
||||
e.runWorkers(context.Background(), messages, notifications)
|
||||
assert.False(t, msgProcessed.Load())
|
||||
}
|
||||
|
||||
func TestRunWorkers_NotificationChannelClosed(t *testing.T) {
|
||||
messages := make(chan chat.Message)
|
||||
notifications := make(chan database.Notification)
|
||||
close(messages)
|
||||
close(notifications)
|
||||
|
||||
var notifProcessed atomic.Bool
|
||||
e := &Engine{Parameters: &Parameters{NumWorkers: 1}}
|
||||
e.processMessage = func(ctx context.Context, msg chat.Message) {}
|
||||
e.processNotification = func(ctx context.Context, n database.Notification) {
|
||||
notifProcessed.Store(true)
|
||||
}
|
||||
|
||||
e.runWorkers(context.Background(), messages, notifications)
|
||||
assert.False(t, notifProcessed.Load())
|
||||
}
|
||||
|
||||
func TestConsumeChatMessages_ContextDoneDuringSend(t *testing.T) {
|
||||
ch := make(chan chat.Message, 1)
|
||||
ch <- chat.Message{ID: "1", Text: "hello"}
|
||||
|
||||
e := &Engine{
|
||||
Parameters: &Parameters{
|
||||
Chats: []chat.Chat{&mockChat{receiveCh: ch}},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
out := e.consumeChatMessages(ctx)
|
||||
|
||||
// Ждём, чтобы горутина точно заблокировалась на отправке
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
// Канал должен закрыться
|
||||
select {
|
||||
case _, ok := <-out:
|
||||
assert.False(t, ok)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("channel not closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunWorkers_ContextDone(t *testing.T) {
|
||||
messages := make(chan chat.Message)
|
||||
notifications := make(chan database.Notification)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
var stopped atomic.Bool
|
||||
ready := make(chan struct{})
|
||||
|
||||
e := &Engine{Parameters: &Parameters{NumWorkers: 1}}
|
||||
e.processMessage = func(ctx context.Context, msg chat.Message) {}
|
||||
e.processNotification = func(ctx context.Context, n database.Notification) {}
|
||||
|
||||
go func() {
|
||||
ready <- struct{}{}
|
||||
e.runWorkers(ctx, messages, notifications)
|
||||
stopped.Store(true)
|
||||
}()
|
||||
|
||||
<-ready
|
||||
time.Sleep(10 * time.Millisecond) // даём воркеру войти в select
|
||||
cancel()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
assert.True(t, stopped.Load())
|
||||
}
|
||||
|
||||
func TestConsumeChatMessages_ContextDone(t *testing.T) {
|
||||
ch := make(chan chat.Message)
|
||||
defer close(ch)
|
||||
|
||||
e := &Engine{
|
||||
Parameters: &Parameters{
|
||||
Chats: []chat.Chat{&mockChat{receiveCh: ch}},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
out := e.consumeChatMessages(ctx)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case _, ok := <-out:
|
||||
assert.False(t, ok)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("channel not closed")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user