fix(coderd/x/chatd): stabilize advisor stream test (#25781)

`TestAdvisorHappyPath_RootChat` could subscribe after the active test
server had already processed the chat and published transient advisor
deltas, leaving the live delta collector empty.

Use a passive chatd test server until the live subscriber and collector
are registered, then start processing and wait for the expected advisor
deltas before canceling the stream.

Closes coder/internal#1548

Generated by Coder Agents.

<details>
<summary>Implementation notes</summary>

The failing assertion covered stream-only advisor `ResultDelta` events.
`CreateChat` signals the processor, so an already-started server can
publish those deltas before `Subscribe` registers its local stream
subscriber. The test now creates the chat on a passive server,
subscribes, starts the collector, then calls `Start()`.

</details>
This commit is contained in:
Michael Suchacz
2026-06-02 12:44:45 +02:00
committed by GitHub
parent dd22086734
commit 4d3bfa5fab
+12 -17
View File
@@ -26,6 +26,7 @@ import (
mcpserver "github.com/mark3labs/mcp-go/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/sqlc-dev/pqtype"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/xerrors"
@@ -9914,7 +9915,7 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) {
MaxUsesPerRun: 3,
MaxOutputTokens: 16384,
})
server := newActiveTestServer(t, db, ps)
server := newTestServer(t, db, ps, uuid.New())
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OrganizationID: org.ID,
@@ -9927,13 +9928,7 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) {
})
require.NoError(t, err)
// Subscribe before the worker commits any durable messages so we
// observe the advisor tool-result deltas live. Buffered parts are
// claimed by their committed durable message ID at publishMessage
// time and dropped from snapshots of late-connecting subscribers, so
// a post-completion Subscribe() would no longer see streaming
// deltas. Collecting events from the live channel covers the
// streaming UX contract this test exists to verify.
// Advisor deltas are transient; a late subscriber misses them.
_, liveEvents, cancelLive, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
var (
@@ -9969,6 +9964,8 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) {
}
}()
server.Start()
require.Eventually(t, func() bool {
got, getErr := db.GetChatByID(ctx, chat.ID)
if getErr != nil {
@@ -10023,17 +10020,15 @@ func TestAdvisorHappyPath_RootChat(t *testing.T) {
require.True(t, parentSawAdvisorResult,
"parent must see the advisor reply in its continuation call")
// Stop the live collector and assert it captured the streaming
// advisor deltas during processing. Late subscribers no longer
// see committed parts because publishMessage claims them out of
// new snapshots, so the assertion must use the live collector.
require.EventuallyWithT(t, func(c *assert.CollectT) {
livePartsMu.Lock()
defer livePartsMu.Unlock()
assert.Equal(c, advisorDeltas, liveAdvisorDeltas,
"advisor nested text deltas must stream into the parent tool card")
}, testutil.WaitLong, testutil.IntervalFast)
cancelLive()
<-liveCollectorDone
livePartsMu.Lock()
collectedAdvisorDeltas := append([]string(nil), liveAdvisorDeltas...)
livePartsMu.Unlock()
require.Equal(t, advisorDeltas, collectedAdvisorDeltas,
"advisor nested text deltas must stream into the parent tool card")
persisted, err := db.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{
ChatID: chat.ID,