diff --git a/coderd/x/chatd/chatd_test.go b/coderd/x/chatd/chatd_test.go index 9e67c10230..353769dd02 100644 --- a/coderd/x/chatd/chatd_test.go +++ b/coderd/x/chatd/chatd_test.go @@ -26,6 +26,7 @@ import ( mcpserver "github.com/mark3labs/mcp-go/server" "github.com/prometheus/client_golang/prometheus" "github.com/sqlc-dev/pqtype" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "golang.org/x/xerrors" @@ -9914,7 +9915,7 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { MaxUsesPerRun: 3, MaxOutputTokens: 16384, }) - server := newActiveTestServer(t, db, ps) + server := newTestServer(t, db, ps, uuid.New()) chat, err := server.CreateChat(ctx, chatd.CreateOptions{ OrganizationID: org.ID, @@ -9927,13 +9928,7 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { }) require.NoError(t, err) - // Subscribe before the worker commits any durable messages so we - // observe the advisor tool-result deltas live. Buffered parts are - // claimed by their committed durable message ID at publishMessage - // time and dropped from snapshots of late-connecting subscribers, so - // a post-completion Subscribe() would no longer see streaming - // deltas. Collecting events from the live channel covers the - // streaming UX contract this test exists to verify. + // Advisor deltas are transient; a late subscriber misses them. _, liveEvents, cancelLive, ok := server.Subscribe(ctx, chat.ID, nil, 0) require.True(t, ok) var ( @@ -9969,6 +9964,8 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { } }() + server.Start() + require.Eventually(t, func() bool { got, getErr := db.GetChatByID(ctx, chat.ID) if getErr != nil { @@ -10023,17 +10020,15 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { require.True(t, parentSawAdvisorResult, "parent must see the advisor reply in its continuation call") - // Stop the live collector and assert it captured the streaming - // advisor deltas during processing. Late subscribers no longer - // see committed parts because publishMessage claims them out of - // new snapshots, so the assertion must use the live collector. + require.EventuallyWithT(t, func(c *assert.CollectT) { + livePartsMu.Lock() + defer livePartsMu.Unlock() + assert.Equal(c, advisorDeltas, liveAdvisorDeltas, + "advisor nested text deltas must stream into the parent tool card") + }, testutil.WaitLong, testutil.IntervalFast) + cancelLive() <-liveCollectorDone - livePartsMu.Lock() - collectedAdvisorDeltas := append([]string(nil), liveAdvisorDeltas...) - livePartsMu.Unlock() - require.Equal(t, advisorDeltas, collectedAdvisorDeltas, - "advisor nested text deltas must stream into the parent tool card") persisted, err := db.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{ ChatID: chat.ID,