diff --git a/coderd/x/chatd/chaterror/classify.go b/coderd/x/chatd/chaterror/classify.go index e426a55fb4..926b058a3b 100644 --- a/coderd/x/chatd/chaterror/classify.go +++ b/coderd/x/chatd/chaterror/classify.go @@ -22,6 +22,15 @@ type ClassifiedError struct { // RetryAfter is a normalized minimum retry delay derived from // provider response metadata when available. RetryAfter time.Duration + + // ChainBroken is true when the provider reported that the + // previous_response_id (or analogous chain anchor) is no longer + // retrievable. The chatloop retry path uses this signal to exit + // chain mode and replay full history before the next attempt. + // This is an internal signal; it is not surfaced as a separate + // codersdk.ChatErrorKind so the user-visible kind set stays + // stable. + ChainBroken bool } const responsesAPIDiagnosticMessage = "The chat continuation failed due to an " + @@ -165,6 +174,20 @@ func Classify(err error) ClassifiedError { return classified } + // Chain-broken detection runs before the generic rule table so a + // 404 carrying a chain anchor failure is not classified as a + // generic non-retryable error. The chatloop retry callback uses + // the ChainBroken flag to exit chain mode and replay full + // history. + if classified, ok := chainBrokenClassification( + lower, + provider, + statusCode, + structured, + ); ok { + return classified + } + deadline := errors.Is(err, context.DeadlineExceeded) || strings.Contains(lower, "context deadline exceeded") overloadedMatch := statusCode == 529 || containsAny(lower, overloadedPatterns...) authStrong := statusCode == 401 || containsAny(lower, authStrongPatterns...) @@ -276,6 +299,35 @@ func streamIncompleteMessage(provider string) string { return providerSubject(provider) + " stream closed unexpectedly before the response completed." } +// chainBrokenClassification recognizes the OpenAI error +// "Previous response with id ... not found" returned when a +// chained turn references a previous_response_id the provider no +// longer recognizes. +func chainBrokenClassification( + lowerMessage string, + provider string, + statusCode int, + structured providerErrorDetails, +) (ClassifiedError, bool) { + if !(strings.Contains(lowerMessage, "previous response with id") && + strings.Contains(lowerMessage, "not found")) { + return ClassifiedError{}, false + } + // This class of error has so far only been observed with OpenAI. + if provider == "" { + provider = "openai" + } + return normalizeClassification(ClassifiedError{ + Detail: structured.detail, + Kind: codersdk.ChatErrorKindGeneric, + Provider: provider, + Retryable: true, + StatusCode: statusCode, + RetryAfter: structured.retryAfter, + ChainBroken: true, + }), true +} + func responsesAPIDiagnostic(lowerMessage, detail string) (string, bool) { lowerDetail := strings.ToLower(detail) for _, match := range responsesAPIDiagnosticMatches { diff --git a/coderd/x/chatd/chaterror/classify_test.go b/coderd/x/chatd/chaterror/classify_test.go index c73eac709b..d5027af49a 100644 --- a/coderd/x/chatd/chaterror/classify_test.go +++ b/coderd/x/chatd/chaterror/classify_test.go @@ -746,6 +746,123 @@ func TestClassify_TruncatesProviderDetail(t *testing.T) { require.True(t, strings.HasSuffix(classified.Detail, "…")) } +func TestClassify_ChainBroken(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + wantChainBroken bool + wantRetryable bool + wantProvider string + wantStatusCode int + }{ + { + name: "OpenAIPreviousResponseNotFoundBareString", + err: xerrors.New( + "Previous response with id 'resp_abc' not found.", + ), + wantChainBroken: true, + wantRetryable: true, + wantProvider: "openai", + wantStatusCode: 0, + }, + { + name: "OpenAIPreviousResponseNotFoundProviderError", + err: testProviderError( + "Previous response with id 'resp_096c70c5bb8d52bc0069fa11e0630c81a3ba210cddfa75bae9' not found.", + 404, + nil, + ), + wantChainBroken: true, + wantRetryable: true, + wantProvider: "openai", + wantStatusCode: 404, + }, + { + name: "OpenAIPreviousResponseCaseInsensitive", + err: testProviderError( + "PREVIOUS RESPONSE WITH ID 'resp_abc' NOT FOUND.", + 404, + nil, + ), + wantChainBroken: true, + wantRetryable: true, + wantProvider: "openai", + wantStatusCode: 404, + }, + { + name: "PreviousResponseWithoutNotFoundIsNotChainBroken", + err: testProviderError( + "Previous response with id 'resp_abc' is invalid.", + 400, + nil, + ), + wantChainBroken: false, + }, + { + name: "UnrelatedNotFoundIsNotChainBroken", + err: testProviderError( + "resource not found", + 404, + nil, + ), + wantChainBroken: false, + }, + { + name: "UnrelatedInvalidRequestIsNotChainBroken", + err: testProviderError( + "", + 400, + nil, + testProviderResponseDump(`{"error":{"type":"invalid_request_error","message":"Image exceeds 5 MB maximum."}}`), + ), + wantChainBroken: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + classified := chaterror.Classify(tt.err) + require.Equal(t, tt.wantChainBroken, classified.ChainBroken, + "chain broken flag mismatch") + if !tt.wantChainBroken { + return + } + require.Equal(t, tt.wantRetryable, classified.Retryable, + "chain-broken errors must be retryable so the loop"+ + " can self-heal") + require.Equal(t, tt.wantProvider, classified.Provider) + require.Equal(t, tt.wantStatusCode, classified.StatusCode) + require.Equal(t, codersdk.ChatErrorKindGeneric, classified.Kind, + "chain-broken keeps the user-visible kind unchanged"+ + " so we don't add a new codersdk surface") + }) + } +} + +func TestClassify_ChainBrokenSurvivesWithClassification(t *testing.T) { + t.Parallel() + + original := chaterror.Classify(testProviderError( + "Previous response with id 'resp_abc' not found.", + 404, + nil, + )) + require.True(t, original.ChainBroken) + + wrapped := chaterror.WithClassification( + xerrors.New("transport blew up"), + original, + ) + round := chaterror.Classify(wrapped) + require.True(t, round.ChainBroken, + "WithClassification round-trips ChainBroken so the retry path"+ + " can detect it after re-classification") +} + func testProviderError( message string, statusCode int, diff --git a/coderd/x/chatd/chatloop/chatloop.go b/coderd/x/chatd/chatloop/chatloop.go index 205f8bfe77..21822c2311 100644 --- a/coderd/x/chatd/chatloop/chatloop.go +++ b/coderd/x/chatd/chatloop/chatloop.go @@ -161,11 +161,14 @@ type RunOptions struct { Compaction *CompactionOptions ReloadMessages func(context.Context) ([]fantasy.Message, error) DisableChainMode func() - // PrepareMessages is called before each LLM step with the - // current message history. If it returns non-nil, the returned - // slice replaces messages for this and all subsequent steps. + // PrepareMessages is called at least once before each LLM step + // with the current message history. If it returns non-nil, the + // returned slice replaces messages for this and all subsequent + // steps. // Used to inject system context that becomes available mid-loop // (e.g. AGENTS.md after create_workspace). + // NOTE: It may be called more than once per step in case of a + // retry, so callbacks should avoid duplicating messages. PrepareMessages func([]fantasy.Message) []fantasy.Message // OnRetry is called before each retry attempt when the LLM @@ -349,7 +352,6 @@ func Run(ctx context.Context, opts RunOptions) error { } tools := buildToolDefinitions(opts.Tools, opts.ActiveTools, opts.ProviderTools) - applyAnthropicCaching := shouldApplyAnthropicPromptCaching(opts.Model) messages := opts.Messages var lastUsage fantasy.Usage @@ -390,30 +392,10 @@ func Run(ctx context.Context, opts RunOptions) error { modelName := opts.Model.Model() opts.Metrics.StepsTotal.WithLabelValues(provider, modelName).Inc() stepStart := time.Now() - // Copy messages so that provider-specific caching - // mutations don't leak back to the caller's slice. - // copy copies Message structs by value, so field - // reassignments in addAnthropicPromptCaching only - // affect the prepared slice. - if opts.PrepareMessages != nil { - if updated := opts.PrepareMessages(messages); updated != nil { - messages = updated - } - } - prepared := make([]fantasy.Message, len(messages)) - copy(prepared, messages) - prepared, sanitizeStats := chatsanitize.SanitizeAnthropicProviderToolHistory(provider, prepared) - chatsanitize.LogAnthropicProviderToolSanitization( - ctx, opts.Logger, "pre_request", provider, modelName, sanitizeStats, - slog.F("step_index", step), - slog.F("total_steps", totalSteps), + var prepared []fantasy.Message + messages, prepared = prepareMessagesForRequest( + ctx, opts, messages, provider, modelName, step, totalSteps, ) - prepared = chatsanitize.ApplyAnthropicProviderToolGuard( - ctx, opts.Logger, provider, modelName, prepared, - ) - if applyAnthropicCaching { - addAnthropicPromptCaching(prepared) - } opts.Metrics.MessageCount.WithLabelValues(provider, modelName).Observe(float64(len(prepared))) opts.Metrics.PromptSizeBytes.WithLabelValues(provider, modelName).Observe(float64(EstimatePromptSize(prepared))) @@ -469,6 +451,33 @@ func Run(ctx context.Context, opts RunOptions) error { // classified payload handed to OnRetry. classified = classified.WithProvider(provider) opts.Metrics.RecordStreamRetry(provider, modelName, classified) + if classified.ChainBroken { + if chatopenai.HasPreviousResponseID(opts.ProviderOptions) { + opts.ProviderOptions = chatopenai.ClearPreviousResponseID(opts.ProviderOptions) + } + if chatopenai.HasPreviousResponseID(call.ProviderOptions) { + call.ProviderOptions = chatopenai.ClearPreviousResponseID(call.ProviderOptions) + } + if opts.DisableChainMode != nil { + opts.DisableChainMode() + } + if opts.ReloadMessages != nil { + reloaded, err := opts.ReloadMessages(ctx) + if err != nil { + opts.Logger.Warn(ctx, + "chain-broken recovery: reload messages failed", + slog.Error(err), + ) + } else { + // Reloaded history replaces the prompt prepared before + // the failed attempt, so run the same preparation + // pipeline used by normal provider requests. + messages, call.Prompt = prepareMessagesForRequest( + ctx, opts, reloaded, provider, modelName, step, totalSteps, + ) + } + } + } if opts.OnRetry != nil { opts.OnRetry(attempt, retryErr, classified, delay) } @@ -656,6 +665,43 @@ func Run(ctx context.Context, opts RunOptions) error { return nil } +// prepareMessagesForRequest applies the prompt preparation pipeline used +// immediately before sending messages to a provider. It returns the +// possibly updated canonical messages and an independent provider-ready +// prompt. +func prepareMessagesForRequest( + ctx context.Context, + opts RunOptions, + messages []fantasy.Message, + provider string, + modelName string, + step int, + totalSteps int, +) (canonical []fantasy.Message, prompt []fantasy.Message) { + canonical = messages + if opts.PrepareMessages != nil { + if updated := opts.PrepareMessages(canonical); updated != nil { + canonical = updated + } + } + // Copy messages so provider-specific caching mutations don't leak + // back to the canonical message slice. + prompt = slices.Clone(canonical) + prompt, sanitizeStats := chatsanitize.SanitizeAnthropicProviderToolHistory(provider, prompt) + chatsanitize.LogAnthropicProviderToolSanitization( + ctx, opts.Logger, "pre_request", provider, modelName, sanitizeStats, + slog.F("step_index", step), + slog.F("total_steps", totalSteps), + ) + prompt = chatsanitize.ApplyAnthropicProviderToolGuard( + ctx, opts.Logger, provider, modelName, prompt, + ) + if shouldApplyAnthropicPromptCaching(opts.Model) { + addAnthropicPromptCaching(prompt) + } + return canonical, prompt +} + // guardedAttempt owns an attempt-scoped context and startup guard // around a provider stream. release is idempotent and frees the // attempt-scoped timer/context. finish canonicalizes startup timeout diff --git a/coderd/x/chatd/chatloop/chatloop_internal_test.go b/coderd/x/chatd/chatloop/chatloop_internal_test.go new file mode 100644 index 0000000000..d45b551b62 --- /dev/null +++ b/coderd/x/chatd/chatloop/chatloop_internal_test.go @@ -0,0 +1,542 @@ +package chatloop + +import ( + "context" + "iter" + "sync" + "testing" + + "charm.land/fantasy" + fantasyanthropic "charm.land/fantasy/providers/anthropic" + fantasyopenai "charm.land/fantasy/providers/openai" + "github.com/stretchr/testify/require" + "golang.org/x/xerrors" + + "github.com/coder/coder/v2/coderd/x/chatd/chatopenai" + "github.com/coder/coder/v2/coderd/x/chatd/chattest" +) + +func TestRun_ChainBrokenRecovers(t *testing.T) { + t.Parallel() + + // Given: a chain-mode run whose previous provider_response_id is present in + // our database but no longer recognized by the provider for some reason + var ( + streamCalls int + secondCallOpt fantasy.ProviderOptions + secondPrompt []fantasy.Message + ) + model := &chattest.FakeModel{ + ProviderName: "openai", + StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + streamCalls++ + switch streamCalls { + case 1: + return nil, xerrors.New(chainBrokenErrorMessage) + default: + secondCallOpt = call.ProviderOptions + secondPrompt = call.Prompt + return finishingStream(), nil + } + }, + } + + disableCalls := 0 + reloadCalls := 0 + reloadedHistory := []fantasy.Message{ + {Role: "system", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "sys"}}}, + {Role: "user", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "hello"}}}, + {Role: "assistant", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "hi"}}}, + {Role: "user", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "follow up"}}}, + } + + chainFiltered := []fantasy.Message{ + {Role: "system", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "sys"}}}, + {Role: "user", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "follow up"}}}, + } + + // When: the first attempt fails with the chain-broken error + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 1, + ContextLimitFallback: 4096, + Messages: chainFiltered, + ProviderOptions: chainModeProviderOptions("resp_poisoned"), + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + DisableChainMode: func() { + disableCalls++ + }, + ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) { + reloadCalls++ + return reloadedHistory, nil + }, + }) + + // Then: DisableChainMode and ReloadMessages each run once and the + // retry attempt sends the full reloaded history without + // previous_response_id. + require.NoError(t, err) + require.Equal(t, 2, streamCalls, "exactly two stream attempts (one failure, one success)") + require.Equal(t, 1, disableCalls, "DisableChainMode called once on chain-broken recovery") + require.Equal(t, 1, reloadCalls, "ReloadMessages called once on chain-broken recovery") + + require.False(t, + chatopenai.HasPreviousResponseID(secondCallOpt), + "second attempt must not carry previous_response_id; it was poisoned", + ) + require.Equal(t, reloadedHistory, secondPrompt, + "second attempt must use full reloaded history, not chain-filtered prompt", + ) +} + +func TestRun_ChainBrokenRecoveryPreparesReloadedMessages(t *testing.T) { + t.Parallel() + + var ( + streamCalls int + prepareCalls int + secondCallOpt fantasy.ProviderOptions + secondPrompt []fantasy.Message + ) + model := &chattest.FakeModel{ + ProviderName: "openai", + StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + streamCalls++ + switch streamCalls { + case 1: + return nil, xerrors.New(chainBrokenErrorMessage) + default: + secondCallOpt = call.ProviderOptions + secondPrompt = call.Prompt + return finishingStream(), nil + } + }, + } + + reloadedHistory := []fantasy.Message{ + textMessage(fantasy.MessageRoleUser, "full history"), + } + + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 1, + ContextLimitFallback: 4096, + Messages: []fantasy.Message{ + textMessage(fantasy.MessageRoleUser, "chain-filtered"), + }, + ProviderOptions: chainModeProviderOptions("resp_poisoned"), + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + DisableChainMode: func() {}, + ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) { + return reloadedHistory, nil + }, + PrepareMessages: func(msgs []fantasy.Message) []fantasy.Message { + prepareCalls++ + return append(msgs, textMessage(fantasy.MessageRoleSystem, "prepared")) + }, + }) + + require.NoError(t, err) + require.Equal(t, 2, streamCalls) + require.Equal(t, 2, prepareCalls, + "reloaded history must be prepared before the retry") + require.False(t, chatopenai.HasPreviousResponseID(secondCallOpt)) + requireTextPrompt(t, secondPrompt, "full history") + requireTextPrompt(t, secondPrompt, "prepared") +} + +func TestRun_ChainBrokenRecoveryAppliesProviderPromptPrep(t *testing.T) { + t.Parallel() + + var ( + streamCalls int + secondCallOpt fantasy.ProviderOptions + secondPrompt []fantasy.Message + ) + model := &chattest.FakeModel{ + ProviderName: fantasyanthropic.Name, + StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + streamCalls++ + switch streamCalls { + case 1: + return nil, xerrors.New(chainBrokenErrorMessage) + default: + secondCallOpt = call.ProviderOptions + secondPrompt = call.Prompt + return finishingStream(), nil + } + }, + } + + reloadedHistory := []fantasy.Message{ + textMessage(fantasy.MessageRoleSystem, "sys-1"), + textMessage(fantasy.MessageRoleSystem, "sys-2"), + textMessage(fantasy.MessageRoleUser, "hello"), + textMessage(fantasy.MessageRoleAssistant, "hi"), + textMessage(fantasy.MessageRoleUser, "follow up"), + } + + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 1, + ContextLimitFallback: 4096, + Messages: []fantasy.Message{ + textMessage(fantasy.MessageRoleSystem, "sys-2"), + textMessage(fantasy.MessageRoleUser, "follow up"), + }, + ProviderOptions: chainModeProviderOptions("resp_poisoned"), + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + DisableChainMode: func() {}, + ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) { + return reloadedHistory, nil + }, + }) + + require.NoError(t, err) + require.Equal(t, 2, streamCalls) + require.False(t, chatopenai.HasPreviousResponseID(secondCallOpt)) + require.Len(t, secondPrompt, 5) + require.False(t, hasAnthropicEphemeralCacheControl(secondPrompt[0])) + require.True(t, hasAnthropicEphemeralCacheControl(secondPrompt[1])) + require.False(t, hasAnthropicEphemeralCacheControl(secondPrompt[2])) + require.True(t, hasAnthropicEphemeralCacheControl(secondPrompt[3])) + require.True(t, hasAnthropicEphemeralCacheControl(secondPrompt[4])) +} + +func TestRun_ChainBrokenReloadWithoutDisableChainModeIsExplicit(t *testing.T) { + t.Parallel() + + var ( + streamCalls int + prepareCalls int + reloadCalls int + secondCallOpt fantasy.ProviderOptions + secondPrompt []fantasy.Message + ) + model := &chattest.FakeModel{ + ProviderName: "openai", + StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + streamCalls++ + switch streamCalls { + case 1: + return nil, xerrors.New(chainBrokenErrorMessage) + default: + secondCallOpt = call.ProviderOptions + secondPrompt = call.Prompt + return finishingStream(), nil + } + }, + } + + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 1, + ContextLimitFallback: 4096, + Messages: []fantasy.Message{ + textMessage(fantasy.MessageRoleUser, "chain-filtered"), + }, + ProviderOptions: chainModeProviderOptions("resp_poisoned"), + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) { + reloadCalls++ + return []fantasy.Message{ + textMessage(fantasy.MessageRoleUser, "full history"), + }, nil + }, + PrepareMessages: func(msgs []fantasy.Message) []fantasy.Message { + prepareCalls++ + return append(msgs, textMessage(fantasy.MessageRoleSystem, "prepared")) + }, + // DisableChainMode is intentionally nil. This covers callers + // whose ReloadMessages does not depend on chain-mode state. + }) + + require.NoError(t, err) + require.Equal(t, 2, streamCalls) + require.Equal(t, 1, reloadCalls) + require.Equal(t, 2, prepareCalls) + require.False(t, chatopenai.HasPreviousResponseID(secondCallOpt)) + requireTextPrompt(t, secondPrompt, "full history") + requireTextPrompt(t, secondPrompt, "prepared") +} + +func TestRun_ChainBrokenComposesWithPostStepChainExit(t *testing.T) { + t.Parallel() + + // Given a chain-mode run whose recovery succeeds and yields a + // tool call so the step loop continues + var ( + mu sync.Mutex + streamCalls int + capturedOpts []fantasy.ProviderOptions + ) + model := &chattest.FakeModel{ + ProviderName: "openai", + StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + mu.Lock() + streamCalls++ + attempt := streamCalls + capturedOpts = append(capturedOpts, call.ProviderOptions) + mu.Unlock() + + switch attempt { + case 1: + // Initial chained attempt: 404 from provider. + return nil, xerrors.New(chainBrokenErrorMessage) + case 2: + // Recovery succeeded; emit a tool call so the + // step loop continues to a second step. + return streamFromParts([]fantasy.StreamPart{ + {Type: fantasy.StreamPartTypeToolInputStart, ID: "tc-1", ToolCallName: "read_file"}, + {Type: fantasy.StreamPartTypeToolInputDelta, ID: "tc-1", Delta: `{"path":"main.go"}`}, + {Type: fantasy.StreamPartTypeToolInputEnd, ID: "tc-1"}, + { + Type: fantasy.StreamPartTypeToolCall, + ID: "tc-1", + ToolCallName: "read_file", + ToolCallInput: `{"path":"main.go"}`, + }, + {Type: fantasy.StreamPartTypeFinish, FinishReason: fantasy.FinishReasonToolCalls}, + }), nil + default: + // Step 1: end the run. + return finishingStream(), nil + } + }, + } + + // When the second step builds its call from opts.ProviderOptions + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 3, + ContextLimitFallback: 4096, + Messages: []fantasy.Message{ + textMessage(fantasy.MessageRoleUser, "hi"), + }, + Tools: []fantasy.AgentTool{ + newNoopTool("read_file"), + }, + ProviderOptions: chainModeProviderOptions("resp_poisoned"), + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + DisableChainMode: func() {}, + ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) { + return []fantasy.Message{ + textMessage(fantasy.MessageRoleUser, "hi"), + }, nil + }, + }) + + // Then it must not re-send the poisoned previous_response_id + // because chain-broken recovery cleared both the current call and + // subsequent step options. + require.NoError(t, err) + require.Equal(t, 3, streamCalls, + "expected three stream calls: chain-broken failure, recovered tool-call step, follow-up step") + for i, providerOpts := range capturedOpts[1:] { + require.False(t, + chatopenai.HasPreviousResponseID(providerOpts), + "every stream call after recovery (index %d) must have cleared previous_response_id", + i+1, + ) + } +} + +func TestRun_ChainBrokenReloadFailureStillClearsChain(t *testing.T) { + t.Parallel() + + // Given: a chain-mode run whose ReloadMessages callback errors + var ( + streamCalls int + prepareCalls int + secondCallOpt fantasy.ProviderOptions + secondPrompt []fantasy.Message + ) + model := &chattest.FakeModel{ + ProviderName: "openai", + StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + streamCalls++ + switch streamCalls { + case 1: + return nil, xerrors.New(chainBrokenErrorMessage) + default: + secondCallOpt = call.ProviderOptions + secondPrompt = call.Prompt + return finishingStream(), nil + } + }, + } + + disableCalls := 0 + chainFiltered := []fantasy.Message{ + {Role: "system", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "sys"}}}, + {Role: "user", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "follow up"}}}, + } + + // When: the chain-broken error fires + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 1, + ContextLimitFallback: 4096, + Messages: chainFiltered, + ProviderOptions: chainModeProviderOptions("resp_poisoned"), + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + DisableChainMode: func() { + disableCalls++ + }, + ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) { + return nil, xerrors.New("reload exploded") + }, + PrepareMessages: func(msgs []fantasy.Message) []fantasy.Message { + prepareCalls++ + return append(msgs, textMessage(fantasy.MessageRoleSystem, "prepared")) + }, + }) + + // Then: the poisoned previous_response_id is still cleared and + // DisableChainMode still runs, so the retry has any chance of + // succeeding against the chain-filtered prompt. + require.NoError(t, err) + require.Equal(t, 1, disableCalls) + require.Equal(t, 1, prepareCalls) + require.False(t, + chatopenai.HasPreviousResponseID(secondCallOpt), + "chain options must still be cleared even when reload fails", + ) + requireTextPrompt(t, secondPrompt, "follow up") + requireTextPrompt(t, secondPrompt, "prepared") +} + +func TestRun_ChainBrokenWithoutChainModeIsSafe(t *testing.T) { + t.Parallel() + + // Given: a run with no chain-mode options or callbacks + var streamCalls int + model := &chattest.FakeModel{ + ProviderName: "openai", + StreamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) { + streamCalls++ + switch streamCalls { + case 1: + return nil, xerrors.New(chainBrokenErrorMessage) + default: + return finishingStream(), nil + } + }, + } + + // When: a future provider returns a chain-broken signal, + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 1, + ContextLimitFallback: 4096, + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + // No ProviderOptions, no DisableChainMode, no ReloadMessages. + }) + + // Then: the recovery branch must no-op (no panic, no missing + // callbacks) and the retry runs normally. + require.NoError(t, err) + require.Equal(t, 2, streamCalls) +} + +func TestRun_NonChainBrokenRetryDoesNotTouchChainState(t *testing.T) { + t.Parallel() + + // Given: a chain-mode run with a still-valid previous_response_id + var ( + streamCalls int + secondCallOpt fantasy.ProviderOptions + ) + model := &chattest.FakeModel{ + ProviderName: "openai", + StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + streamCalls++ + switch streamCalls { + case 1: + return nil, xerrors.New("received status 503 from upstream") + default: + secondCallOpt = call.ProviderOptions + return finishingStream(), nil + } + }, + } + + disableCalls := 0 + reloadCalls := 0 + + // When: a non-chain-broken retryable error fires (503) + err := Run(context.Background(), RunOptions{ + Model: model, + MaxSteps: 1, + ContextLimitFallback: 4096, + Messages: []fantasy.Message{ + {Role: "user", Content: []fantasy.MessagePart{fantasy.TextPart{Text: "hi"}}}, + }, + ProviderOptions: chainModeProviderOptions("resp_still_valid"), + PersistStep: func(_ context.Context, _ PersistedStep) error { + return nil + }, + DisableChainMode: func() { + disableCalls++ + }, + ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) { + reloadCalls++ + return nil, nil + }, + }) + + // Then: chain mode stays engaged, ReloadMessages is not called, + // and the retry preserves previous_response_id. + require.NoError(t, err) + require.Equal(t, 0, disableCalls, + "non-chain-broken retry must not exit chain mode") + require.Equal(t, 0, reloadCalls, + "non-chain-broken retry must not reload history") + require.True(t, + chatopenai.HasPreviousResponseID(secondCallOpt), + "non-chain-broken retry must preserve previous_response_id", + ) +} + +// chainBrokenError is what OpenAI returns when previous_response_id +// points at a response it does not have stored. +const chainBrokenErrorMessage = "Previous response with id 'resp_abc' not found." + +// finishingStream returns a stream that emits a single Finish part. +// The chatloop treats a finishReason of Stop as "stoppedByModel" and +// exits the per-step loop after persisting. +func finishingStream() fantasy.StreamResponse { + return iter.Seq[fantasy.StreamPart](func(yield func(fantasy.StreamPart) bool) { + yield(fantasy.StreamPart{ + Type: fantasy.StreamPartTypeFinish, + FinishReason: fantasy.FinishReasonStop, + }) + }) +} + +// chainModeProviderOptions builds a fantasy.ProviderOptions carrying +// the OpenAI Responses options with previous_response_id set, the same +// shape chatd builds when chain mode is active. +func chainModeProviderOptions(previousResponseID string) fantasy.ProviderOptions { + store := true + return fantasy.ProviderOptions{ + fantasyopenai.Name: &fantasyopenai.ResponsesProviderOptions{ + Store: &store, + PreviousResponseID: &previousResponseID, + }, + } +} diff --git a/coderd/x/chatd/chatloop/metrics.go b/coderd/x/chatd/chatloop/metrics.go index 2db4f1ac13..6f13663017 100644 --- a/coderd/x/chatd/chatloop/metrics.go +++ b/coderd/x/chatd/chatloop/metrics.go @@ -3,6 +3,7 @@ package chatloop import ( "context" "errors" + "strconv" "charm.land/fantasy" "github.com/prometheus/client_golang/prometheus" @@ -101,7 +102,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { Subsystem: metricsSubsystem, Name: "stream_retries_total", Help: "Total LLM stream retries.", - }, []string{"provider", "model", "kind"}), + }, []string{"provider", "model", "kind", "chain_broken"}), StreamBufferDroppedTotal: factory.NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, @@ -140,12 +141,19 @@ func (m *Metrics) RecordCompaction(provider, model string, compacted bool, err e // RecordStreamRetry increments stream_retries_total. The caller // must obtain classified via chaterror.Classify (non-empty Kind). -// No-op when m is nil. +// No-op when m is nil. The chain_broken label is "true" for chain +// anchor failures (e.g. OpenAI previous_response_id 404) recovered +// by the chatloop, and "false" otherwise. func (m *Metrics) RecordStreamRetry(provider, model string, classified chaterror.ClassifiedError) { if m == nil { return } - m.StreamRetriesTotal.WithLabelValues(provider, model, string(classified.Kind)).Inc() + m.StreamRetriesTotal.WithLabelValues( + provider, + model, + string(classified.Kind), + strconv.FormatBool(classified.ChainBroken), + ).Inc() } // RecordToolError increments tool_errors_total for the given diff --git a/coderd/x/chatd/chatloop/metrics_test.go b/coderd/x/chatd/chatloop/metrics_test.go index 14d641b353..7aa3885750 100644 --- a/coderd/x/chatd/chatloop/metrics_test.go +++ b/coderd/x/chatd/chatloop/metrics_test.go @@ -2,6 +2,7 @@ package chatloop_test import ( "context" + "strconv" "testing" "time" @@ -34,7 +35,7 @@ func TestNewMetrics_RegistersAllMetrics(t *testing.T) { m.PromptSizeBytes.WithLabelValues("anthropic", "claude-sonnet-4-5") m.TTFTSeconds.WithLabelValues("anthropic", "claude-sonnet-4-5") m.StepsTotal.WithLabelValues("anthropic", "claude-sonnet-4-5") - m.StreamRetriesTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", string(codersdk.ChatErrorKindTimeout)) + m.StreamRetriesTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", string(codersdk.ChatErrorKindTimeout), "false") // StreamBufferDroppedTotal is a plain Counter, so it's always present // in Gather output once registered; no exerciser call is // needed. @@ -88,7 +89,7 @@ func TestNopMetrics_DoesNotPanic(t *testing.T) { m.CompactionTotal.WithLabelValues("openai", "gpt-5", "error").Inc() m.CompactionTotal.WithLabelValues("google", "gemini-2.5-pro", "timeout").Inc() m.StepsTotal.WithLabelValues("anthropic", "claude-sonnet-4-5").Inc() - m.StreamRetriesTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", string(codersdk.ChatErrorKindTimeout)).Inc() + m.StreamRetriesTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", string(codersdk.ChatErrorKindTimeout), "false").Inc() m.StreamBufferDroppedTotal.Inc() // Nil-receiver guard for RecordStreamRetry and @@ -285,8 +286,9 @@ func TestRecordStreamRetry(t *testing.T) { // guarantees Kind is non-empty, so no empty-string case is // needed. tests := []struct { - name string - kind codersdk.ChatErrorKind + name string + kind codersdk.ChatErrorKind + chainBroken bool }{ {name: "overloaded", kind: codersdk.ChatErrorKindOverloaded}, {name: "rate_limit", kind: codersdk.ChatErrorKindRateLimit}, @@ -295,6 +297,7 @@ func TestRecordStreamRetry(t *testing.T) { {name: "auth", kind: codersdk.ChatErrorKindAuth}, {name: "config", kind: codersdk.ChatErrorKindConfig}, {name: "generic", kind: codersdk.ChatErrorKindGeneric}, + {name: "chain_broken", kind: codersdk.ChatErrorKindGeneric, chainBroken: true}, } for _, tt := range tests { @@ -304,13 +307,15 @@ func TestRecordStreamRetry(t *testing.T) { reg := prometheus.NewRegistry() m := chatloop.NewMetrics(reg) m.RecordStreamRetry("test-provider", "test-model", chaterror.ClassifiedError{ - Kind: tt.kind, + Kind: tt.kind, + ChainBroken: tt.chainBroken, }) requireCounter(t, reg, "coderd_chatd_stream_retries_total", 1, map[string]string{ - "provider": "test-provider", - "model": "test-model", - "kind": string(tt.kind), + "provider": "test-provider", + "model": "test-model", + "kind": string(tt.kind), + "chain_broken": strconv.FormatBool(tt.chainBroken), }) }) } @@ -564,9 +569,10 @@ func TestRun_StreamRetry_RecordsMetric(t *testing.T) { // Metric assertion. requireCounter(t, reg, "coderd_chatd_stream_retries_total", 1, map[string]string{ - "provider": "test-provider", - "model": "test-model", - "kind": string(codersdk.ChatErrorKindRateLimit), + "provider": "test-provider", + "model": "test-model", + "kind": string(codersdk.ChatErrorKindRateLimit), + "chain_broken": "false", }) } diff --git a/docs/admin/integrations/prometheus.md b/docs/admin/integrations/prometheus.md index b78dbfe3b1..2cf8cd8014 100644 --- a/docs/admin/integrations/prometheus.md +++ b/docs/admin/integrations/prometheus.md @@ -207,7 +207,7 @@ deployment. They will always be available from the agent. | `coderd_chatd_stream_buffer_dropped_total` | counter | Number of chat stream buffer events dropped due to the per-chat buffer cap. | | | `coderd_chatd_stream_buffer_events` | gauge | Sum of current buffer lengths across all chat streams. | | | `coderd_chatd_stream_buffer_size_max` | gauge | Maximum current buffer length across all chat streams. | | -| `coderd_chatd_stream_retries_total` | counter | Total LLM stream retries. | `kind` `model` `provider` | +| `coderd_chatd_stream_retries_total` | counter | Total LLM stream retries. | `chain_broken` `kind` `model` `provider` | | `coderd_chatd_stream_subscribers` | gauge | Current number of chat stream subscribers across all chat streams. | | | `coderd_chatd_streams_active` | gauge | Current number of chat stream state entries (in-flight plus retained). | | | `coderd_chatd_tool_errors_total` | counter | Total tool calls that returned an error result. | `model` `provider` `tool_name` | diff --git a/scripts/metricsdocgen/generated_metrics b/scripts/metricsdocgen/generated_metrics index 43dd6cfdeb..d7aa8fd182 100644 --- a/scripts/metricsdocgen/generated_metrics +++ b/scripts/metricsdocgen/generated_metrics @@ -255,7 +255,7 @@ coderd_chatd_stream_buffer_events 0 coderd_chatd_stream_buffer_size_max 0 # HELP coderd_chatd_stream_retries_total Total LLM stream retries. # TYPE coderd_chatd_stream_retries_total counter -coderd_chatd_stream_retries_total{provider="",model="",kind=""} 0 +coderd_chatd_stream_retries_total{provider="",model="",kind="",chain_broken=""} 0 # HELP coderd_chatd_stream_subscribers Current number of chat stream subscribers across all chat streams. # TYPE coderd_chatd_stream_subscribers gauge coderd_chatd_stream_subscribers 0