From fee5cc5e5bd025ad9e887ab55fe9caae9f9e5ed4 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 10 Mar 2026 07:22:39 -0700 Subject: [PATCH] fix(chatd): fix flaky TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica (#22893) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://github.com/coder/internal/issues/1371 ## Root causes Two independent races cause this test to flake at ~2–3/1000: ### 1. Title-generation requests racing with the streaming request counter `maybeGenerateChatTitle` fires in a `context.WithoutCancel` goroutine (line 2130) and makes a **non-streaming** request to the mock OpenAI handler. The test handler was not filtering by request type, so these title requests incremented the `requestCount` atomic — throwing off the coordination logic that uses `requestCount == 1` to identify the first streaming request and hold it open until shutdown. **Fix:** Guard the test handler to return a canned response for non-streaming requests before touching `requestCount`. ### 2. Phantom acquire: `AcquireChat` commits in Postgres but Go sees `context.Canceled` During `Close()`, the main loop's `select` can randomly pick `acquireTicker.C` over `ctx.Done()` (Go spec: when multiple cases are ready, one is chosen uniformly at random). This calls `processOnce(ctx)` with an already-canceled context. In the pq driver, `QueryContext` does **not** check `ctx.Err()` up front. Instead it calls `watchCancel(ctx)` which spawns a goroutine monitoring `ctx.Done()`, then sends the query on the existing connection. When `ctx` is already canceled, a race ensues: - **pq's watchCancel goroutine** immediately sees `<-done`, opens a *new* TCP connection to Postgres, and sends a cancel request. - **The query** is sent concurrently on the existing connection. Because the `AcquireChat` UPDATE is fast (sub-millisecond, single row with `SKIP LOCKED`), it often commits before the cancel arrives via the second connection. Meanwhile in `database/sql`, `initContextClose` spawns an `awaitDone` goroutine that fires immediately (context is already canceled), stores `contextDone`, and calls `rs.close(ctx.Err())` — which races with `Row.Scan` → `rows.Next()`. If `awaitDone` wins, `Next()` sees `contextDone` is set and returns false, causing Scan to return `context.Canceled` (or `ErrNoRows`). **Result:** Postgres committed the UPDATE (chat is now `running` with serverA's worker ID), but Go sees an error and never spawns a goroutine to process it. The chat is stuck as `running` with no worker. If the previous `processChat` cleanup already set the chat back to `pending`, this phantom acquire flips it back to `running` — which is exactly what the debug logs showed: after `Close()` returns, the DB shows `status=running` with serverA's worker ID. **Fix:** Three guards in `processOnce`: 1. Early `ctx.Err()` check — catches the common case where `select` picked the ticker after cancellation. 2. `context.WithoutCancel(ctx)` for `AcquireChat` — prevents the pq `watchCancel` race entirely, ensuring the driver sees the query result if Postgres executed it. 3. Post-acquire `ctx.Err()` check — if the context was canceled while `AcquireChat` ran (or between the early check and the call), immediately release the chat back to `pending`. ## Verification Passes 2000/2000 iterations (previously flaked at ~2–3/1000): ``` go test -run "TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica" \ -count=2000 -timeout 1800s -failfast ./coderd/chatd/ ``` --- coderd/chatd/chatd.go | 49 ++++++++++++++++++++++++++++++++++++-- coderd/chatd/chatd_test.go | 6 +++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/coderd/chatd/chatd.go b/coderd/chatd/chatd.go index 76c447cec9..566cbd0675 100644 --- a/coderd/chatd/chatd.go +++ b/coderd/chatd/chatd.go @@ -1020,8 +1020,30 @@ func (p *Server) start(ctx context.Context) { } func (p *Server) processOnce(ctx context.Context) { - // Try to acquire a pending chat. - chat, err := p.db.AcquireChat(ctx, database.AcquireChatParams{ + // Bail out early if the server is shutting down. The main + // loop's select can randomly pick the ticker over ctx.Done(), + // so we must guard against acquiring a chat we cannot process. + if ctx.Err() != nil { + return + } + + // Try to acquire a pending chat. We detach from the server + // lifetime to prevent a phantom-acquire race: when the server + // context is canceled, the pq driver's watchCancel goroutine + // races with the actual query on the wire. The UPDATE can + // commit in Postgres (setting the chat to "running") before + // the cancel request arrives via a second TCP connection, yet + // the Go driver still returns context.Canceled to the caller + // because the awaitDone goroutine in database/sql closes the + // Rows before Scan reads them. This leaves the chat stuck as + // "running" with no goroutine to process it. Using a context + // that cannot be canceled ensures the driver sees the query + // result if Postgres executed it. + acquireCtx, acquireCancel := context.WithTimeout( + context.WithoutCancel(ctx), 10*time.Second, + ) + defer acquireCancel() + chat, err := p.db.AcquireChat(acquireCtx, database.AcquireChatParams{ StartedAt: time.Now(), WorkerID: p.workerID, }) @@ -1033,6 +1055,29 @@ func (p *Server) processOnce(ctx context.Context) { return } + // If the server context was canceled while we were acquiring, + // release the chat back to pending immediately so another + // replica can pick it up. + if ctx.Err() != nil { + releaseCtx, releaseCancel := context.WithTimeout( + context.WithoutCancel(ctx), 10*time.Second, + ) + defer releaseCancel() + _, updateErr := p.db.UpdateChatStatus(releaseCtx, database.UpdateChatStatusParams{ + ID: chat.ID, + Status: database.ChatStatusPending, + WorkerID: uuid.NullUUID{}, + StartedAt: sql.NullTime{}, + HeartbeatAt: sql.NullTime{}, + LastError: sql.NullString{}, + }) + if updateErr != nil { + p.logger.Error(ctx, "failed to release chat acquired during shutdown", + slog.F("chat_id", chat.ID), slog.Error(updateErr)) + } + return + } + // Process the chat (don't block the main loop). p.inflight.Add(1) go func() { diff --git a/coderd/chatd/chatd_test.go b/coderd/chatd/chatd_test.go index 7fac515ecd..de93e366a9 100644 --- a/coderd/chatd/chatd_test.go +++ b/coderd/chatd/chatd_test.go @@ -1571,6 +1571,12 @@ func TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica(t *testing.T) var requestCount atomic.Int32 streamStarted := make(chan struct{}) openAIURL := chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse { + // Ignore non-streaming requests (e.g. title generation) so + // they don't interfere with the request counter used to + // coordinate the streaming chat flow. + if !req.Stream { + return chattest.OpenAINonStreamingResponse("shutdown-retry") + } if requestCount.Add(1) == 1 { chunks := make(chan chattest.OpenAIChunk, 1) go func() {