mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(chatd): fix flaky TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica (#22893)
Fixes https://github.com/coder/internal/issues/1371 ## Root causes Two independent races cause this test to flake at ~2–3/1000: ### 1. Title-generation requests racing with the streaming request counter `maybeGenerateChatTitle` fires in a `context.WithoutCancel` goroutine (line 2130) and makes a **non-streaming** request to the mock OpenAI handler. The test handler was not filtering by request type, so these title requests incremented the `requestCount` atomic — throwing off the coordination logic that uses `requestCount == 1` to identify the first streaming request and hold it open until shutdown. **Fix:** Guard the test handler to return a canned response for non-streaming requests before touching `requestCount`. ### 2. Phantom acquire: `AcquireChat` commits in Postgres but Go sees `context.Canceled` During `Close()`, the main loop's `select` can randomly pick `acquireTicker.C` over `ctx.Done()` (Go spec: when multiple cases are ready, one is chosen uniformly at random). This calls `processOnce(ctx)` with an already-canceled context. In the pq driver, `QueryContext` does **not** check `ctx.Err()` up front. Instead it calls `watchCancel(ctx)` which spawns a goroutine monitoring `ctx.Done()`, then sends the query on the existing connection. When `ctx` is already canceled, a race ensues: - **pq's watchCancel goroutine** immediately sees `<-done`, opens a *new* TCP connection to Postgres, and sends a cancel request. - **The query** is sent concurrently on the existing connection. Because the `AcquireChat` UPDATE is fast (sub-millisecond, single row with `SKIP LOCKED`), it often commits before the cancel arrives via the second connection. Meanwhile in `database/sql`, `initContextClose` spawns an `awaitDone` goroutine that fires immediately (context is already canceled), stores `contextDone`, and calls `rs.close(ctx.Err())` — which races with `Row.Scan` → `rows.Next()`. If `awaitDone` wins, `Next()` sees `contextDone` is set and returns false, causing Scan to return `context.Canceled` (or `ErrNoRows`). **Result:** Postgres committed the UPDATE (chat is now `running` with serverA's worker ID), but Go sees an error and never spawns a goroutine to process it. The chat is stuck as `running` with no worker. If the previous `processChat` cleanup already set the chat back to `pending`, this phantom acquire flips it back to `running` — which is exactly what the debug logs showed: after `Close()` returns, the DB shows `status=running` with serverA's worker ID. **Fix:** Three guards in `processOnce`: 1. Early `ctx.Err()` check — catches the common case where `select` picked the ticker after cancellation. 2. `context.WithoutCancel(ctx)` for `AcquireChat` — prevents the pq `watchCancel` race entirely, ensuring the driver sees the query result if Postgres executed it. 3. Post-acquire `ctx.Err()` check — if the context was canceled while `AcquireChat` ran (or between the early check and the call), immediately release the chat back to `pending`. ## Verification Passes 2000/2000 iterations (previously flaked at ~2–3/1000): ``` go test -run "TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica" \ -count=2000 -timeout 1800s -failfast ./coderd/chatd/ ```
This commit is contained in:
+47
-2
@@ -1020,8 +1020,30 @@ func (p *Server) start(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (p *Server) processOnce(ctx context.Context) {
|
||||
// Try to acquire a pending chat.
|
||||
chat, err := p.db.AcquireChat(ctx, database.AcquireChatParams{
|
||||
// Bail out early if the server is shutting down. The main
|
||||
// loop's select can randomly pick the ticker over ctx.Done(),
|
||||
// so we must guard against acquiring a chat we cannot process.
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Try to acquire a pending chat. We detach from the server
|
||||
// lifetime to prevent a phantom-acquire race: when the server
|
||||
// context is canceled, the pq driver's watchCancel goroutine
|
||||
// races with the actual query on the wire. The UPDATE can
|
||||
// commit in Postgres (setting the chat to "running") before
|
||||
// the cancel request arrives via a second TCP connection, yet
|
||||
// the Go driver still returns context.Canceled to the caller
|
||||
// because the awaitDone goroutine in database/sql closes the
|
||||
// Rows before Scan reads them. This leaves the chat stuck as
|
||||
// "running" with no goroutine to process it. Using a context
|
||||
// that cannot be canceled ensures the driver sees the query
|
||||
// result if Postgres executed it.
|
||||
acquireCtx, acquireCancel := context.WithTimeout(
|
||||
context.WithoutCancel(ctx), 10*time.Second,
|
||||
)
|
||||
defer acquireCancel()
|
||||
chat, err := p.db.AcquireChat(acquireCtx, database.AcquireChatParams{
|
||||
StartedAt: time.Now(),
|
||||
WorkerID: p.workerID,
|
||||
})
|
||||
@@ -1033,6 +1055,29 @@ func (p *Server) processOnce(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// If the server context was canceled while we were acquiring,
|
||||
// release the chat back to pending immediately so another
|
||||
// replica can pick it up.
|
||||
if ctx.Err() != nil {
|
||||
releaseCtx, releaseCancel := context.WithTimeout(
|
||||
context.WithoutCancel(ctx), 10*time.Second,
|
||||
)
|
||||
defer releaseCancel()
|
||||
_, updateErr := p.db.UpdateChatStatus(releaseCtx, database.UpdateChatStatusParams{
|
||||
ID: chat.ID,
|
||||
Status: database.ChatStatusPending,
|
||||
WorkerID: uuid.NullUUID{},
|
||||
StartedAt: sql.NullTime{},
|
||||
HeartbeatAt: sql.NullTime{},
|
||||
LastError: sql.NullString{},
|
||||
})
|
||||
if updateErr != nil {
|
||||
p.logger.Error(ctx, "failed to release chat acquired during shutdown",
|
||||
slog.F("chat_id", chat.ID), slog.Error(updateErr))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Process the chat (don't block the main loop).
|
||||
p.inflight.Add(1)
|
||||
go func() {
|
||||
|
||||
@@ -1571,6 +1571,12 @@ func TestCloseDuringShutdownContextCanceledShouldRetryOnNewReplica(t *testing.T)
|
||||
var requestCount atomic.Int32
|
||||
streamStarted := make(chan struct{})
|
||||
openAIURL := chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
|
||||
// Ignore non-streaming requests (e.g. title generation) so
|
||||
// they don't interfere with the request counter used to
|
||||
// coordinate the streaming chat flow.
|
||||
if !req.Stream {
|
||||
return chattest.OpenAINonStreamingResponse("shutdown-retry")
|
||||
}
|
||||
if requestCount.Add(1) == 1 {
|
||||
chunks := make(chan chattest.OpenAIChunk, 1)
|
||||
go func() {
|
||||
|
||||
Reference in New Issue
Block a user