From 4d3bfa5fab37327c8169de5612068019f4791bda Mon Sep 17 00:00:00 2001 From: Michael Suchacz <203725896+ibetitsmike@users.noreply.github.com> Date: Tue, 2 Jun 2026 12:44:45 +0200 Subject: [PATCH] fix(coderd/x/chatd): stabilize advisor stream test (#25781) `TestAdvisorHappyPath_RootChat` could subscribe after the active test server had already processed the chat and published transient advisor deltas, leaving the live delta collector empty. Use a passive chatd test server until the live subscriber and collector are registered, then start processing and wait for the expected advisor deltas before canceling the stream. Closes coder/internal#1548 Generated by Coder Agents.
Implementation notes The failing assertion covered stream-only advisor `ResultDelta` events. `CreateChat` signals the processor, so an already-started server can publish those deltas before `Subscribe` registers its local stream subscriber. The test now creates the chat on a passive server, subscribes, starts the collector, then calls `Start()`.
--- coderd/x/chatd/chatd_test.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/coderd/x/chatd/chatd_test.go b/coderd/x/chatd/chatd_test.go index 9e67c10230..353769dd02 100644 --- a/coderd/x/chatd/chatd_test.go +++ b/coderd/x/chatd/chatd_test.go @@ -26,6 +26,7 @@ import ( mcpserver "github.com/mark3labs/mcp-go/server" "github.com/prometheus/client_golang/prometheus" "github.com/sqlc-dev/pqtype" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "golang.org/x/xerrors" @@ -9914,7 +9915,7 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { MaxUsesPerRun: 3, MaxOutputTokens: 16384, }) - server := newActiveTestServer(t, db, ps) + server := newTestServer(t, db, ps, uuid.New()) chat, err := server.CreateChat(ctx, chatd.CreateOptions{ OrganizationID: org.ID, @@ -9927,13 +9928,7 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { }) require.NoError(t, err) - // Subscribe before the worker commits any durable messages so we - // observe the advisor tool-result deltas live. Buffered parts are - // claimed by their committed durable message ID at publishMessage - // time and dropped from snapshots of late-connecting subscribers, so - // a post-completion Subscribe() would no longer see streaming - // deltas. Collecting events from the live channel covers the - // streaming UX contract this test exists to verify. + // Advisor deltas are transient; a late subscriber misses them. _, liveEvents, cancelLive, ok := server.Subscribe(ctx, chat.ID, nil, 0) require.True(t, ok) var ( @@ -9969,6 +9964,8 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { } }() + server.Start() + require.Eventually(t, func() bool { got, getErr := db.GetChatByID(ctx, chat.ID) if getErr != nil { @@ -10023,17 +10020,15 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) { require.True(t, parentSawAdvisorResult, "parent must see the advisor reply in its continuation call") - // Stop the live collector and assert it captured the streaming - // advisor deltas during processing. Late subscribers no longer - // see committed parts because publishMessage claims them out of - // new snapshots, so the assertion must use the live collector. + require.EventuallyWithT(t, func(c *assert.CollectT) { + livePartsMu.Lock() + defer livePartsMu.Unlock() + assert.Equal(c, advisorDeltas, liveAdvisorDeltas, + "advisor nested text deltas must stream into the parent tool card") + }, testutil.WaitLong, testutil.IntervalFast) + cancelLive() <-liveCollectorDone - livePartsMu.Lock() - collectedAdvisorDeltas := append([]string(nil), liveAdvisorDeltas...) - livePartsMu.Unlock() - require.Equal(t, advisorDeltas, collectedAdvisorDeltas, - "advisor nested text deltas must stream into the parent tool card") persisted, err := db.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{ ChatID: chat.ID,