fix(chatd): preserve context.Canceled in persistStep during shutdown (#22890)

## Problem

When a chat worker shuts down gracefully (e.g. Kubernetes pod SIGTERM)
while a tool is executing (like `wait_agent` polling for a subagent),
the chat gets stuck in `waiting` status forever — no other worker will
pick it up.

### Root Cause

`persistStep` in `chatd.go` unconditionally returned
`chatloop.ErrInterrupted` for **any** canceled context:

```go
if persistCtx.Err() != nil {
    return chatloop.ErrInterrupted  // BUG: doesn't check WHY the context was canceled
}
```

During shutdown, the context cause is `context.Canceled` (not
`ErrInterrupted`). But because `persistStep` returned `ErrInterrupted`,
the error handling in `processChat` hit the `ErrInterrupted` check first
(line 2011) and set status to `waiting` — the `isShutdownCancellation`
check (line 2017) was never reached:

```go
// Checked FIRST — matches because persistStep returned ErrInterrupted
if errors.Is(err, chatloop.ErrInterrupted) {
    status = database.ChatStatusWaiting  // Stuck forever
    return
}
// NEVER REACHED during shutdown
if isShutdownCancellation(ctx, chatCtx, err) {
    status = database.ChatStatusPending  // Would have been correct
    return
}
```

### Trigger scenario (from production logs)

1. Chat spawns a subagent via `spawn_agent`, then calls `wait_agent`
2. `wait_agent` blocks in `awaitSubagentCompletion` polling loop
3. Worker pod receives SIGTERM → `Close()` cancels server context
4. Context cancellation propagates to `awaitSubagentCompletion` →
returns `context.Canceled`
5. Tool execution completes, `persistStep` is called with canceled
context
6. `persistStep` returns `ErrInterrupted` (wrong!) → status set to
`waiting` (stuck!)

## Fix

Check `context.Cause()` before deciding which error to return:

```go
if persistCtx.Err() != nil {
    if errors.Is(context.Cause(persistCtx), chatloop.ErrInterrupted) {
        return chatloop.ErrInterrupted  // Intentional interruption
    }
    return persistCtx.Err()  // Shutdown → context.Canceled
}
```

This preserves `context.Canceled` for shutdown, allowing
`isShutdownCancellation` to match and set status to `pending` so another
worker retries the chat.

## Test

Added `TestRun_ShutdownDuringToolExecutionReturnsContextCanceled` which:
1. Streams a tool call to a blocking tool (simulating `wait_agent`)
2. Cancels the server context (simulating shutdown) while the tool
blocks
3. Verifies `Run` returns `context.Canceled`, NOT `ErrInterrupted`
This commit is contained in:
Kyle Carberry
2026-03-10 06:01:45 -07:00
committed by GitHub
parent b898e45ec4
commit f35b99a4fa
2 changed files with 108 additions and 7 deletions
+14 -7
View File
@@ -2216,14 +2216,21 @@ func (p *Server) runChat(
modelConfigContextLimit := modelConfig.ContextLimit
persistStep := func(persistCtx context.Context, step chatloop.PersistedStep) error {
// If the chat context has been canceled (e.g. by an
// EditMessage call), bail out before inserting any
// messages. This closes the race window between
// EditMessage committing its transaction (which deletes
// messages after the edit point) and the cancellation
// propagating to the processing loop.
// If the chat context has been canceled, bail out before
// inserting any messages. We distinguish the cause so that
// the caller can tell an intentional interruption (e.g.
// EditMessage, user stop) from a server shutdown:
// - ErrInterrupted cause → return ErrInterrupted
// (processChat sets status = waiting).
// - Any other cause (e.g. context.Canceled during
// Close()) → return the original context error so
// isShutdownCancellation can match and set status =
// pending, allowing another replica to retry.
if persistCtx.Err() != nil {
return chatloop.ErrInterrupted
if errors.Is(context.Cause(persistCtx), chatloop.ErrInterrupted) {
return chatloop.ErrInterrupted
}
return persistCtx.Err()
}
// Split the step content into assistant blocks and tool
+94
View File
@@ -2,6 +2,7 @@ package chatloop //nolint:testpackage // Uses internal symbols.
import (
"context"
"errors"
"iter"
"strings"
"sync"
@@ -9,6 +10,7 @@ import (
"charm.land/fantasy"
fantasyanthropic "charm.land/fantasy/providers/anthropic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
)
@@ -405,6 +407,98 @@ func TestRun_PersistStepErrorPropagates(t *testing.T) {
require.ErrorContains(t, err, "database write failed")
}
// TestRun_ShutdownDuringToolExecutionReturnsContextCanceled verifies that
// when the parent context is canceled (simulating server shutdown) while
// a tool is blocked, Run returns context.Canceled — not ErrInterrupted.
// This matters because the caller uses the error type to decide whether
// to set chat status to "pending" (retryable on another worker) vs
// "waiting" (stuck forever).
func TestRun_ShutdownDuringToolExecutionReturnsContextCanceled(t *testing.T) {
t.Parallel()
toolStarted := make(chan struct{})
// Model returns a single tool call, then finishes.
model := &loopTestModel{
provider: "fake",
streamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
return streamFromParts([]fantasy.StreamPart{
{Type: fantasy.StreamPartTypeToolInputStart, ID: "tc-block", ToolCallName: "blocking_tool"},
{Type: fantasy.StreamPartTypeToolInputDelta, ID: "tc-block", Delta: `{}`},
{Type: fantasy.StreamPartTypeToolInputEnd, ID: "tc-block"},
{
Type: fantasy.StreamPartTypeToolCall,
ID: "tc-block",
ToolCallName: "blocking_tool",
ToolCallInput: `{}`,
},
{Type: fantasy.StreamPartTypeFinish, FinishReason: fantasy.FinishReasonToolCalls},
}), nil
},
}
// Tool that blocks until its context is canceled, simulating
// a long-running operation like wait_agent.
blockingTool := fantasy.NewAgentTool(
"blocking_tool",
"blocks until context canceled",
func(ctx context.Context, _ struct{}, _ fantasy.ToolCall) (fantasy.ToolResponse, error) {
close(toolStarted)
<-ctx.Done()
return fantasy.ToolResponse{}, ctx.Err()
},
)
// Simulate the server context (parent) and chat context
// (child). Canceling the parent simulates graceful shutdown.
serverCtx, serverCancel := context.WithCancel(context.Background())
defer serverCancel()
serverCancelDone := make(chan struct{})
go func() {
defer close(serverCancelDone)
<-toolStarted
t.Logf("tool started, canceling server context to simulate shutdown")
serverCancel()
}()
// persistStep mirrors the FIXED chatd.go code: it only returns
// ErrInterrupted when the context was actually canceled due to
// an interruption (cause is ErrInterrupted). For shutdown
// (plain context.Canceled), it returns the original error so
// callers can distinguish the two.
persistStep := func(persistCtx context.Context, _ PersistedStep) error {
if persistCtx.Err() != nil {
if errors.Is(context.Cause(persistCtx), ErrInterrupted) {
return ErrInterrupted
}
return persistCtx.Err()
}
return nil
}
err := Run(serverCtx, RunOptions{
Model: model,
Messages: []fantasy.Message{
textMessage(fantasy.MessageRoleUser, "run the blocking tool"),
},
Tools: []fantasy.AgentTool{blockingTool},
MaxSteps: 3,
PersistStep: persistStep,
})
// Wait for the cancel goroutine to finish to aid flake
// diagnosis if the test ever hangs.
<-serverCancelDone
require.Error(t, err)
// The error must NOT be ErrInterrupted — it should propagate
// as context.Canceled so the caller can distinguish shutdown
// from user interruption. Use assert (not require) so both
// checks are evaluated even if the first fails.
assert.NotErrorIs(t, err, ErrInterrupted, "shutdown cancellation must not be converted to ErrInterrupted")
assert.ErrorIs(t, err, context.Canceled, "shutdown should propagate as context.Canceled")
}
func hasAnthropicEphemeralCacheControl(message fantasy.Message) bool {
if len(message.ProviderOptions) == 0 {
return false