From dd49a818f9618c23005eef54407a6474a5be8496 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 29 Apr 2026 13:54:49 +0300 Subject: [PATCH] fix: export chatd.Start to separate server lifecycle (#24761) chatd.New() no longer auto-starts the acquire/wake loop. Callers that want chat processing call server.Start() explicitly. Tests that want a passive server skip Start(); heartbeat, stream janitor, and stale recovery still run. Closes coder/internal#1502 --- coderd/coderd.go | 2 +- coderd/x/chatd/chatd.go | 41 +++++------ coderd/x/chatd/chatd_test.go | 89 ++++++++++++++++++++---- coderd/x/chatd/subagent_internal_test.go | 1 + enterprise/coderd/x/chatd/chatd_test.go | 5 ++ 5 files changed, 103 insertions(+), 35 deletions(-) diff --git a/coderd/coderd.go b/coderd/coderd.go index 2a5b8aca76..c0b9b0d3ce 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -794,7 +794,7 @@ func New(options *Options) *API { WebpushDispatcher: options.WebPushDispatcher, UsageTracker: options.WorkspaceUsageTracker, PrometheusRegistry: options.PrometheusRegistry, - }) + }).Start() gitSyncLogger := options.Logger.Named("gitsync") refresher := gitsync.NewRefresher( api.resolveGitProvider, diff --git a/coderd/x/chatd/chatd.go b/coderd/x/chatd/chatd.go index d044cff7e8..6483b18559 100644 --- a/coderd/x/chatd/chatd.go +++ b/coderd/x/chatd/chatd.go @@ -135,7 +135,8 @@ var ( // Server handles background processing of pending chats. type Server struct { cancel context.CancelFunc - closed chan struct{} + ctx context.Context + wg sync.WaitGroup inflight sync.WaitGroup inflightMu sync.Mutex @@ -3679,7 +3680,6 @@ func New(cfg Config) *Server { p := &Server{ cancel: cancel, - closed: make(chan struct{}), db: cfg.Database, workerID: workerID, logger: cfg.Logger.Named("processor"), @@ -3750,33 +3750,34 @@ func New(cfg Config) *Server { } p.configCacheUnsubscribe = cancelConfigSub } - go p.start(ctx) - return p -} + p.ctx = ctx -func (p *Server) start(ctx context.Context) { - defer close(p.closed) - - // Recover stale chats on startup and periodically thereafter - // to handle chats orphaned by crashed or redeployed workers. - // Use debugService() (not existingDebugService) so the service - // is initialized eagerly on startup. This ensures stale debug - // rows left by a previous crash are finalized even when no - // request has triggered lazy initialization yet. + // Recover stale chats on startup. p.recoverStaleChats(ctx) if debugSvc := p.debugService(); debugSvc != nil { - _, err := debugSvc.FinalizeStale(ctx) - if err != nil { + if _, err := debugSvc.FinalizeStale(ctx); err != nil { p.logger.Warn(ctx, "failed to finalize stale chat debug rows", slog.Error(err)) } } - // Single heartbeat loop for all chats on this replica. - go p.heartbeatLoop(ctx) + // Spawn background goroutines that all servers need. + p.wg.Go(func() { p.heartbeatLoop(ctx) }) + p.wg.Go(func() { p.streamJanitorLoop(ctx) }) - go p.streamJanitorLoop(ctx) + return p +} +// Start runs the background acquire/wake loop that picks up +// pending chats and processes them. Callers that want a passive +// server (e.g. tests) can skip this call; heartbeat, stream +// janitor, and stale recovery still run. +func (p *Server) Start() *Server { + p.wg.Go(func() { p.acquireLoop(p.ctx) }) + return p +} + +func (p *Server) acquireLoop(ctx context.Context) { acquireTicker := p.clock.NewTicker( p.pendingChatAcquireInterval, "chatd", @@ -8111,7 +8112,7 @@ func (p *Server) Close() error { unsub() } p.cancel() - <-p.closed + p.wg.Wait() p.drainInflight() return nil } diff --git a/coderd/x/chatd/chatd_test.go b/coderd/x/chatd/chatd_test.go index f383495553..3509921456 100644 --- a/coderd/x/chatd/chatd_test.go +++ b/coderd/x/chatd/chatd_test.go @@ -2082,7 +2082,7 @@ func TestSendMessageInterruptBehaviorQueuesAndInterruptsWhenBusy(t *testing.T) { t.Parallel() db, ps := dbtestutil.NewDB(t) - replica := newTestServer(t, db, ps, uuid.New()) + replica := newStartedTestServer(t, db, ps, uuid.New()) ctx := testutil.Context(t, testutil.WaitLong) user, org, model := seedChatDependencies(ctx, t, db) @@ -2234,11 +2234,9 @@ func TestEditMessageUpdatesAndTruncatesAndClearsQueue(t *testing.T) { require.NoError(t, err) require.Len(t, queued, 0) - // The wake channel may trigger immediate processing after EditMessage, - // transitioning the chat from pending to running then error before we - // read the DB. Wait for any in-flight processing to settle. - // Note: WaitUntilIdleForTest must be called from the test goroutine - // (not inside require.Eventually) to avoid a WaitGroup Add/Wait race. + // WaitUntilIdleForTest drains the debug-cleanup goroutine + // from EditMessage. Must be called from the test goroutine + // (not inside require.Eventually) to avoid Add/Wait race. chatd.WaitUntilIdleForTest(replica) var chatFromDB database.Chat require.Eventually(t, func() bool { @@ -2433,7 +2431,7 @@ func TestPromoteQueuedAllowsAlreadyQueuedMessageWhenUsageLimitReached(t *testing t.Parallel() db, ps := dbtestutil.NewDB(t) - replica := newTestServer(t, db, ps, uuid.New()) + replica := newStartedTestServer(t, db, ps, uuid.New()) ctx := testutil.Context(t, testutil.WaitLong) user, org, model := seedChatDependencies(ctx, t, db) @@ -3706,6 +3704,7 @@ func TestRecoverStaleChatsPeriodically(t *testing.T) { PendingChatAcquireInterval: testutil.WaitLong, InFlightChatStaleAfter: staleAfter, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -3800,6 +3799,7 @@ func TestRecoverStaleRequiresActionChat(t *testing.T) { PendingChatAcquireInterval: testutil.WaitLong, InFlightChatStaleAfter: staleAfter, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -3895,6 +3895,7 @@ func TestWaitingChatsAreNotRecoveredAsStale(t *testing.T) { PendingChatAcquireInterval: testutil.WaitLong, InFlightChatStaleAfter: 500 * time.Millisecond, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -3991,10 +3992,7 @@ func TestSubscribeSnapshotIncludesStatusEvent(t *testing.T) { require.True(t, ok) t.Cleanup(cancel) - // The first event in the snapshot must be a status event. - // The exact status depends on timing: CreateChat sets - // pending, but the wake signal may trigger processing - // before Subscribe is called. + // Passive server: status is always Pending. require.NotEmpty(t, snapshot) require.Equal(t, codersdk.ChatStreamEventTypeStatus, snapshot[0].Type) require.NotNil(t, snapshot[0].Status) @@ -4787,7 +4785,7 @@ func TestSubscribeNoPubsubNoDuplicateMessageParts(t *testing.T) { // Use nil pubsub to force the no-pubsub path. db, _ := dbtestutil.NewDB(t) - replica := newTestServer(t, db, nil, uuid.New()) + replica := newStartedTestServer(t, db, nil, uuid.New()) ctx := testutil.Context(t, testutil.WaitLong) user, org, model := seedChatDependencies(ctx, t, db) @@ -5533,6 +5531,7 @@ func TestHeartbeatBumpsWorkspaceUsage(t *testing.T) { ChatHeartbeatInterval: 100 * time.Millisecond, UsageTracker: tracker, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -5657,6 +5656,7 @@ func TestHeartbeatNoWorkspaceNoBump(t *testing.T) { InFlightChatStaleAfter: testutil.WaitLong, ChatHeartbeatInterval: 100 * time.Millisecond, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -5724,6 +5724,8 @@ func waitForChatProcessed( chatd.WaitUntilIdleForTest(server) } +// newTestServer creates a passive server that never calls +// processOnce on its own. func newTestServer( t *testing.T, db database.Store, @@ -5746,6 +5748,57 @@ func newTestServer( return server } +func TestPassiveServerDoesNotProcess(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitLong) + db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure()) + user, org, model := seedChatDependencies(ctx, t, db) + + server := newTestServer(t, db, ps, uuid.New()) + chat, err := server.CreateChat(ctx, chatd.CreateOptions{ + OrganizationID: org.ID, + OwnerID: user.ID, + Title: "should-stay-pending", + InitialUserContent: []codersdk.ChatMessagePart{{Type: codersdk.ChatMessagePartTypeText, Text: "hello"}}, + ModelConfigID: model.ID, + }) + require.NoError(t, err) + + chatd.WaitUntilIdleForTest(server) + + // Re-read from DB to catch any unexpected state transition. + stored, err := db.GetChatByID(ctx, chat.ID) + require.NoError(t, err) + require.Equal(t, database.ChatStatusPending, stored.Status) +} + +// newStartedTestServer creates a server with Start() called. +// Uses a long acquire interval so processing is triggered by +// wake signals, not polling. +func newStartedTestServer( + t *testing.T, + db database.Store, + ps dbpubsub.Pubsub, + replicaID uuid.UUID, +) *chatd.Server { + t.Helper() + + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}) + server := chatd.New(chatd.Config{ + Logger: logger, + Database: db, + ReplicaID: replicaID, + Pubsub: ps, + PendingChatAcquireInterval: testutil.WaitLong, + }) + server.Start() + t.Cleanup(func() { + require.NoError(t, server.Close()) + }) + return server +} + // newDebugEnabledTestServer creates a passive test server with // AlwaysEnableDebugLogs=true so that IsEnabled(ctx, chatID, ownerID) // always returns true regardless of runtime admin config. This lets @@ -5799,6 +5852,7 @@ func newActiveTestServer( o(&cfg) } server := chatd.New(cfg) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -6065,10 +6119,10 @@ func seedWorkspaceWithAgent( Transition: database.WorkspaceTransitionStart, JobID: pj.ID, }) - agent := dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{ + dbAgent := dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{ ResourceID: res.ID, }) - return ws, agent + return ws, dbAgent } func setOpenAIProviderBaseURL( @@ -6137,6 +6191,7 @@ func TestInterruptChatDoesNotSendWebPushNotification(t *testing.T) { InFlightChatStaleAfter: testutil.WaitSuperLong, WebpushDispatcher: mockPush, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -6249,6 +6304,7 @@ func TestSuccessfulChatSendsWebPushWithNavigationData(t *testing.T) { InFlightChatStaleAfter: testutil.WaitSuperLong, WebpushDispatcher: mockPush, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -6334,6 +6390,7 @@ func TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica(t *testing.T) PendingChatAcquireInterval: 10 * time.Millisecond, InFlightChatStaleAfter: testutil.WaitLong, }) + serverA.Start() t.Cleanup(func() { require.NoError(t, serverA.Close()) }) @@ -6388,6 +6445,7 @@ func TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica(t *testing.T) PendingChatAcquireInterval: 10 * time.Millisecond, InFlightChatStaleAfter: testutil.WaitLong, }) + serverB.Start() t.Cleanup(func() { require.NoError(t, serverB.Close()) }) @@ -6439,6 +6497,7 @@ func TestSuccessfulChatSendsWebPushWithSummary(t *testing.T) { InFlightChatStaleAfter: testutil.WaitSuperLong, WebpushDispatcher: mockPush, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -6500,6 +6559,7 @@ func TestSuccessfulChatSendsWebPushFallbackWithoutSummaryForEmptyAssistantText(t InFlightChatStaleAfter: testutil.WaitSuperLong, WebpushDispatcher: mockPush, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -6866,6 +6926,7 @@ func TestInterruptChatPersistsPartialResponse(t *testing.T) { PendingChatAcquireInterval: 10 * time.Millisecond, InFlightChatStaleAfter: testutil.WaitSuperLong, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) diff --git a/coderd/x/chatd/subagent_internal_test.go b/coderd/x/chatd/subagent_internal_test.go index a1a24702b2..708233ee84 100644 --- a/coderd/x/chatd/subagent_internal_test.go +++ b/coderd/x/chatd/subagent_internal_test.go @@ -131,6 +131,7 @@ func newInternalTestServerWithLoggerAndClock( PendingChatAcquireInterval: testutil.WaitLong, ProviderAPIKeys: keys, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) diff --git a/enterprise/coderd/x/chatd/chatd_test.go b/enterprise/coderd/x/chatd/chatd_test.go index ef7f39c5c5..86a10e2ad0 100644 --- a/enterprise/coderd/x/chatd/chatd_test.go +++ b/enterprise/coderd/x/chatd/chatd_test.go @@ -58,6 +58,7 @@ func newTestServer( SubscribeFn: entchatd.NewMultiReplicaSubscribeFn(entchatd.MultiReplicaSubscribeConfig{DialerFn: dialer, Clock: clock}), PendingChatAcquireInterval: testutil.WaitSuperLong, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -80,6 +81,7 @@ func newActiveWorkerServer( PendingChatAcquireInterval: 10 * time.Millisecond, InFlightChatStaleAfter: testutil.WaitSuperLong, }) + server.Start() t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -1308,6 +1310,7 @@ func TestSubscribeRelayDialCanceledOnFastCompletion(t *testing.T) { PendingChatAcquireInterval: time.Hour, InFlightChatStaleAfter: testutil.WaitSuperLong, }) + worker.Start() t.Cleanup(func() { require.NoError(t, worker.Close()) }) @@ -1467,6 +1470,7 @@ func TestSubscribeRelayDrainWithinGraceLeavesBufferRetained(t *testing.T) { InFlightChatStaleAfter: testutil.WaitSuperLong, Clock: workerClock, }) + worker.Start() t.Cleanup(func() { require.NoError(t, worker.Close()) }) @@ -1662,6 +1666,7 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) { PendingChatAcquireInterval: time.Second, InFlightChatStaleAfter: testutil.WaitSuperLong, }) + worker.Start() t.Cleanup(func() { require.NoError(t, worker.Close()) })