fix: export chatd.Start to separate server lifecycle (#24761)

chatd.New() no longer auto-starts the acquire/wake loop.
Callers that want chat processing call server.Start()
explicitly. Tests that want a passive server skip Start();
heartbeat, stream janitor, and stale recovery still run.

Closes coder/internal#1502
This commit is contained in:
Mathias Fredriksson
2026-04-29 13:54:49 +03:00
committed by GitHub
parent f9068c2afa
commit dd49a818f9
5 changed files with 103 additions and 35 deletions
+1 -1
View File
@@ -794,7 +794,7 @@ func New(options *Options) *API {
WebpushDispatcher: options.WebPushDispatcher,
UsageTracker: options.WorkspaceUsageTracker,
PrometheusRegistry: options.PrometheusRegistry,
})
}).Start()
gitSyncLogger := options.Logger.Named("gitsync")
refresher := gitsync.NewRefresher(
api.resolveGitProvider,
+21 -20
View File
@@ -135,7 +135,8 @@ var (
// Server handles background processing of pending chats.
type Server struct {
cancel context.CancelFunc
closed chan struct{}
ctx context.Context
wg sync.WaitGroup
inflight sync.WaitGroup
inflightMu sync.Mutex
@@ -3679,7 +3680,6 @@ func New(cfg Config) *Server {
p := &Server{
cancel: cancel,
closed: make(chan struct{}),
db: cfg.Database,
workerID: workerID,
logger: cfg.Logger.Named("processor"),
@@ -3750,33 +3750,34 @@ func New(cfg Config) *Server {
}
p.configCacheUnsubscribe = cancelConfigSub
}
go p.start(ctx)
return p
}
p.ctx = ctx
func (p *Server) start(ctx context.Context) {
defer close(p.closed)
// Recover stale chats on startup and periodically thereafter
// to handle chats orphaned by crashed or redeployed workers.
// Use debugService() (not existingDebugService) so the service
// is initialized eagerly on startup. This ensures stale debug
// rows left by a previous crash are finalized even when no
// request has triggered lazy initialization yet.
// Recover stale chats on startup.
p.recoverStaleChats(ctx)
if debugSvc := p.debugService(); debugSvc != nil {
_, err := debugSvc.FinalizeStale(ctx)
if err != nil {
if _, err := debugSvc.FinalizeStale(ctx); err != nil {
p.logger.Warn(ctx, "failed to finalize stale chat debug rows", slog.Error(err))
}
}
// Single heartbeat loop for all chats on this replica.
go p.heartbeatLoop(ctx)
// Spawn background goroutines that all servers need.
p.wg.Go(func() { p.heartbeatLoop(ctx) })
p.wg.Go(func() { p.streamJanitorLoop(ctx) })
go p.streamJanitorLoop(ctx)
return p
}
// Start runs the background acquire/wake loop that picks up
// pending chats and processes them. Callers that want a passive
// server (e.g. tests) can skip this call; heartbeat, stream
// janitor, and stale recovery still run.
func (p *Server) Start() *Server {
p.wg.Go(func() { p.acquireLoop(p.ctx) })
return p
}
func (p *Server) acquireLoop(ctx context.Context) {
acquireTicker := p.clock.NewTicker(
p.pendingChatAcquireInterval,
"chatd",
@@ -8111,7 +8112,7 @@ func (p *Server) Close() error {
unsub()
}
p.cancel()
<-p.closed
p.wg.Wait()
p.drainInflight()
return nil
}
+75 -14
View File
@@ -2082,7 +2082,7 @@ func TestSendMessageInterruptBehaviorQueuesAndInterruptsWhenBusy(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
replica := newTestServer(t, db, ps, uuid.New())
replica := newStartedTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, org, model := seedChatDependencies(ctx, t, db)
@@ -2234,11 +2234,9 @@ func TestEditMessageUpdatesAndTruncatesAndClearsQueue(t *testing.T) {
require.NoError(t, err)
require.Len(t, queued, 0)
// The wake channel may trigger immediate processing after EditMessage,
// transitioning the chat from pending to running then error before we
// read the DB. Wait for any in-flight processing to settle.
// Note: WaitUntilIdleForTest must be called from the test goroutine
// (not inside require.Eventually) to avoid a WaitGroup Add/Wait race.
// WaitUntilIdleForTest drains the debug-cleanup goroutine
// from EditMessage. Must be called from the test goroutine
// (not inside require.Eventually) to avoid Add/Wait race.
chatd.WaitUntilIdleForTest(replica)
var chatFromDB database.Chat
require.Eventually(t, func() bool {
@@ -2433,7 +2431,7 @@ func TestPromoteQueuedAllowsAlreadyQueuedMessageWhenUsageLimitReached(t *testing
t.Parallel()
db, ps := dbtestutil.NewDB(t)
replica := newTestServer(t, db, ps, uuid.New())
replica := newStartedTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, org, model := seedChatDependencies(ctx, t, db)
@@ -3706,6 +3704,7 @@ func TestRecoverStaleChatsPeriodically(t *testing.T) {
PendingChatAcquireInterval: testutil.WaitLong,
InFlightChatStaleAfter: staleAfter,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -3800,6 +3799,7 @@ func TestRecoverStaleRequiresActionChat(t *testing.T) {
PendingChatAcquireInterval: testutil.WaitLong,
InFlightChatStaleAfter: staleAfter,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -3895,6 +3895,7 @@ func TestWaitingChatsAreNotRecoveredAsStale(t *testing.T) {
PendingChatAcquireInterval: testutil.WaitLong,
InFlightChatStaleAfter: 500 * time.Millisecond,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -3991,10 +3992,7 @@ func TestSubscribeSnapshotIncludesStatusEvent(t *testing.T) {
require.True(t, ok)
t.Cleanup(cancel)
// The first event in the snapshot must be a status event.
// The exact status depends on timing: CreateChat sets
// pending, but the wake signal may trigger processing
// before Subscribe is called.
// Passive server: status is always Pending.
require.NotEmpty(t, snapshot)
require.Equal(t, codersdk.ChatStreamEventTypeStatus, snapshot[0].Type)
require.NotNil(t, snapshot[0].Status)
@@ -4787,7 +4785,7 @@ func TestSubscribeNoPubsubNoDuplicateMessageParts(t *testing.T) {
// Use nil pubsub to force the no-pubsub path.
db, _ := dbtestutil.NewDB(t)
replica := newTestServer(t, db, nil, uuid.New())
replica := newStartedTestServer(t, db, nil, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, org, model := seedChatDependencies(ctx, t, db)
@@ -5533,6 +5531,7 @@ func TestHeartbeatBumpsWorkspaceUsage(t *testing.T) {
ChatHeartbeatInterval: 100 * time.Millisecond,
UsageTracker: tracker,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -5657,6 +5656,7 @@ func TestHeartbeatNoWorkspaceNoBump(t *testing.T) {
InFlightChatStaleAfter: testutil.WaitLong,
ChatHeartbeatInterval: 100 * time.Millisecond,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -5724,6 +5724,8 @@ func waitForChatProcessed(
chatd.WaitUntilIdleForTest(server)
}
// newTestServer creates a passive server that never calls
// processOnce on its own.
func newTestServer(
t *testing.T,
db database.Store,
@@ -5746,6 +5748,57 @@ func newTestServer(
return server
}
func TestPassiveServerDoesNotProcess(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
user, org, model := seedChatDependencies(ctx, t, db)
server := newTestServer(t, db, ps, uuid.New())
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OrganizationID: org.ID,
OwnerID: user.ID,
Title: "should-stay-pending",
InitialUserContent: []codersdk.ChatMessagePart{{Type: codersdk.ChatMessagePartTypeText, Text: "hello"}},
ModelConfigID: model.ID,
})
require.NoError(t, err)
chatd.WaitUntilIdleForTest(server)
// Re-read from DB to catch any unexpected state transition.
stored, err := db.GetChatByID(ctx, chat.ID)
require.NoError(t, err)
require.Equal(t, database.ChatStatusPending, stored.Status)
}
// newStartedTestServer creates a server with Start() called.
// Uses a long acquire interval so processing is triggered by
// wake signals, not polling.
func newStartedTestServer(
t *testing.T,
db database.Store,
ps dbpubsub.Pubsub,
replicaID uuid.UUID,
) *chatd.Server {
t.Helper()
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
server := chatd.New(chatd.Config{
Logger: logger,
Database: db,
ReplicaID: replicaID,
Pubsub: ps,
PendingChatAcquireInterval: testutil.WaitLong,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
return server
}
// newDebugEnabledTestServer creates a passive test server with
// AlwaysEnableDebugLogs=true so that IsEnabled(ctx, chatID, ownerID)
// always returns true regardless of runtime admin config. This lets
@@ -5799,6 +5852,7 @@ func newActiveTestServer(
o(&cfg)
}
server := chatd.New(cfg)
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -6065,10 +6119,10 @@ func seedWorkspaceWithAgent(
Transition: database.WorkspaceTransitionStart,
JobID: pj.ID,
})
agent := dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{
dbAgent := dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{
ResourceID: res.ID,
})
return ws, agent
return ws, dbAgent
}
func setOpenAIProviderBaseURL(
@@ -6137,6 +6191,7 @@ func TestInterruptChatDoesNotSendWebPushNotification(t *testing.T) {
InFlightChatStaleAfter: testutil.WaitSuperLong,
WebpushDispatcher: mockPush,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -6249,6 +6304,7 @@ func TestSuccessfulChatSendsWebPushWithNavigationData(t *testing.T) {
InFlightChatStaleAfter: testutil.WaitSuperLong,
WebpushDispatcher: mockPush,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -6334,6 +6390,7 @@ func TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica(t *testing.T)
PendingChatAcquireInterval: 10 * time.Millisecond,
InFlightChatStaleAfter: testutil.WaitLong,
})
serverA.Start()
t.Cleanup(func() {
require.NoError(t, serverA.Close())
})
@@ -6388,6 +6445,7 @@ func TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica(t *testing.T)
PendingChatAcquireInterval: 10 * time.Millisecond,
InFlightChatStaleAfter: testutil.WaitLong,
})
serverB.Start()
t.Cleanup(func() {
require.NoError(t, serverB.Close())
})
@@ -6439,6 +6497,7 @@ func TestSuccessfulChatSendsWebPushWithSummary(t *testing.T) {
InFlightChatStaleAfter: testutil.WaitSuperLong,
WebpushDispatcher: mockPush,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -6500,6 +6559,7 @@ func TestSuccessfulChatSendsWebPushFallbackWithoutSummaryForEmptyAssistantText(t
InFlightChatStaleAfter: testutil.WaitSuperLong,
WebpushDispatcher: mockPush,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -6866,6 +6926,7 @@ func TestInterruptChatPersistsPartialResponse(t *testing.T) {
PendingChatAcquireInterval: 10 * time.Millisecond,
InFlightChatStaleAfter: testutil.WaitSuperLong,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
+1
View File
@@ -131,6 +131,7 @@ func newInternalTestServerWithLoggerAndClock(
PendingChatAcquireInterval: testutil.WaitLong,
ProviderAPIKeys: keys,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
+5
View File
@@ -58,6 +58,7 @@ func newTestServer(
SubscribeFn: entchatd.NewMultiReplicaSubscribeFn(entchatd.MultiReplicaSubscribeConfig{DialerFn: dialer, Clock: clock}),
PendingChatAcquireInterval: testutil.WaitSuperLong,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -80,6 +81,7 @@ func newActiveWorkerServer(
PendingChatAcquireInterval: 10 * time.Millisecond,
InFlightChatStaleAfter: testutil.WaitSuperLong,
})
server.Start()
t.Cleanup(func() {
require.NoError(t, server.Close())
})
@@ -1308,6 +1310,7 @@ func TestSubscribeRelayDialCanceledOnFastCompletion(t *testing.T) {
PendingChatAcquireInterval: time.Hour,
InFlightChatStaleAfter: testutil.WaitSuperLong,
})
worker.Start()
t.Cleanup(func() {
require.NoError(t, worker.Close())
})
@@ -1467,6 +1470,7 @@ func TestSubscribeRelayDrainWithinGraceLeavesBufferRetained(t *testing.T) {
InFlightChatStaleAfter: testutil.WaitSuperLong,
Clock: workerClock,
})
worker.Start()
t.Cleanup(func() {
require.NoError(t, worker.Close())
})
@@ -1662,6 +1666,7 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
PendingChatAcquireInterval: time.Second,
InFlightChatStaleAfter: testutil.WaitSuperLong,
})
worker.Start()
t.Cleanup(func() {
require.NoError(t, worker.Close())
})