Files
coder/coderd/x/chatd/chatloop/chatloop.go
T
Thomas Kosiewski a84534315c feat(coderd/x/chatd/chatloop): add exclusive tool execution policy (#24619)
## Summary

Introduce an **exclusive-tool execution policy** in `chatloop.Run()` so that a designated "planning-only" tool can never execute in the same batch as any other locally executed tool. This is the generic foundation for the advisor tool landed in the stacked PRs above.

## Motivation

The advisor tool (stacked on top) is intentionally a pre-action planning step. It must be consulted *alone*, not interleaved with side-effecting tools, so the model has to reason strategically before committing to actions. Rather than building advisor-specific plumbing into `chatloop`, this PR adds a small, general-purpose policy hook that any future tool can opt into.

## Changes

- `RunOptions.ExclusiveToolNames map[string]bool` declares which tool names must run exclusively within a single tool-execution batch.
- `executeTools()` detects mixed batches: if any exclusive tool name appears next to any other locally executed tool, no tool runs. Instead, structured `ToolResultOutputContentError` entries are synthesized for every tool in the batch so the model can cleanly retry.
- Deterministic, model-facing error copy:
  - Exclusive tool gets: "must be called by itself before action tools".
  - Sibling tools get: "skipped because `<exclusive>` must run alone".
- New tests in `chatloop_test.go` cover: single exclusive batch runs normally, mixed batch returns policy errors and executes nothing, non-exclusive batches are untouched.

## Stack context

This is **PR 1 of 6** in the advisor feature stack.

```
main
 └─ feat/advisor-01-chatloop-exclusive-policy   ← this PR
     └─ feat/advisor-02-chatadvisor-pkg
         └─ feat/advisor-03-config-api
             ├─ feat/advisor-04-chatd-runtime
             │   └─ feat/advisor-05-chat-tool-renderer
             └─ feat/advisor-06-admin-settings-ui
```

## Scope / non-goals

- No advisor-specific code lives in this PR.
- Zero behavior change when `ExclusiveToolNames` is empty (current default for every existing caller).

## Validation

- `go test ./coderd/x/chatd/chatloop/... -run TestExclusive`
- `make lint`

---

<details>
<summary>📋 Implementation Plan (shared across the advisor stack)</summary>

# Plan: Add a Mux-style advisor tool to coder agents/chatd

## Outcome

Add a first-class `advisor` tool to agent chats in `coderd/x/chatd` that feels native to Coder:

- it is a built-in server-side tool, not an MCP/dynamic-tool workaround;
- it performs a nested **tool-less** model call for strategic advice;
- it is exposed only when eligible, and the prompt mentions it only when it is actually available;
- it is treated as a **planning-only** tool so it does not run alongside action tools in the same batch;
- it tracks usage/cost separately enough for operators to reason about it;
- it has a minimally polished UI in the Agents page;
- and it ships with explicit dogfooding evidence, including screenshots and repro videos.

## Design decisions to lock before coding

1. **Primary architecture:** native built-in tool in `chattool/`, backed by a small `chatadvisor` package.
2. **Nested model execution:** reuse chatd's existing model/provider stack for a one-step, tool-less advisor call rather than inventing a new provider pathway.
3. **Execution policy:** treat `advisor` as an exclusive/planning-only tool; mixed batches must return structured policy errors and force the model to retry cleanly.
4. **Availability:** initial rollout is for root agent chats only; disable for child/sub-agent chats until recursion/cost policy is proven.
5. **Prompt sync:** use one eligibility boolean to drive both tool registration and advisor guidance injection.
6. **Persistence/cost split:** MVP should keep advisor usage visible in result metadata and server metrics; only add DB schema if product/billing explicitly needs queryable advisor-specific cost.
7. **UI scope:** generic tool rendering is an acceptable temporary milestone during backend bring-up, but the release candidate should include a dedicated lightweight advisor renderer.

## Delivery model

The work should be executed as coordinated workstreams with one integration owner and parallel contributors for low-conflict areas. The integration owner should own `coderd/x/chatd/chatd.go` because prompt assembly, tool registration, and model resolution all converge there.

## Detailed workstreams

### Repo evidence used for this plan

<details>
<summary>Mux reference and current chatd seams</summary>

**Mux reference implementation**

- `src/node/services/tools/advisor.ts` — native advisor tool implementation.
- `src/common/constants/advisor.ts` — advisor prompt/constants and truncation policy.
- `src/common/utils/tools/tools.ts` — conditional tool registration.
- `src/node/services/streamContextBuilder.ts` — injects advisor guidance only when the tool is available.

**Current chatd seams**

- `coderd/x/chatd/chatd.go`
  - `processChat()` — tool assembly, prompt assembly, and chatloop invocation.
  - `resolveChatModel()` — current model/provider/key resolution seam.
  - `type Config struct` — server-level chatd configuration surface.
- `coderd/x/chatd/chatloop/chatloop.go`
  - `Run()` — main streaming/model loop.
  - `executeTools()` — built-in tool execution/batching seam.
- `coderd/x/chatd/chattool/` — built-in tool implementations.
- `site/src/pages/AgentsPage/components/ChatElements/tools/Tool.tsx` — tool renderer dispatch.
- `site/src/pages/AgentsPage/components/ChatConversation/messageParsing.ts` and `ConversationTimeline.tsx` — tool/result merge and rendering flow.

</details>

### Workstream map and ownership

| Workstream | Primary owner | Main files | Can run in parallel? | Done when |
|---|---|---|---|---|
| 0. Integration + gating | Integration lead | `coderd/x/chatd/chatd.go` | No; central merge lane | Tool registration, prompt sync, and model selection are wired together |
| 1. Advisor runtime + tool | Backend agent | new `coderd/x/chatd/chatadvisor/`, new `coderd/x/chatd/chattool/advisor.go` | Yes | Tool can perform a tool-less advisor call in memory and return structured results |
| 2. Planning-only execution policy | Chatloop agent | `coderd/x/chatd/chatloop/chatloop.go`, related tests | Yes | Mixed `advisor` + action-tool batches are rejected cleanly and deterministically |
| 3. Metrics/usage/config | Backend/telemetry agent | `chatd.go`, `chatloop/metrics.go`, optional config plumbing | Partially; coordinate with integration lead | Advisor usage is separately visible in metadata/metrics and limits are enforced |
| 4. Frontend rendering | Frontend agent | `site/.../tools/Tool.tsx`, new `AdvisorTool.tsx`, stories | Yes after result schema stabilizes | Advisor renders as a readable card and story tests pass |
| 5. Dogfood + QA evidence | QA agent | dev server, Storybook, dogfood output | After backend + UI are usable | Repro videos, screenshots, and a concise QA report exist |

### Parallelization rules

- **Do not split `coderd/x/chatd/chatd.go` across multiple execution agents without an integration lead.** That file owns prompt building, tool registration, model resolution, and cost persistence.
- Workstreams 1 and 2 can be developed in parallel and then stacked onto the integration branch.
- Workstream 4 should begin once the backend result schema is agreed on, even if the backend is still behind a feature flag.
- Any agent that needs to re-check Mux behavior should clone `coder/mux` into a temporary directory (for example, `$(mktemp -d)/mux`) and inspect it read-only; do not vendor or copy code from Mux directly.

## Phase 0 — Preflight and guardrails

### Goals

- Align the team on the smallest shippable architecture.
- Prevent scope creep into MCP/dynamic-tool/sub-agent variants.
- Decide upfront what is MVP vs. follow-up.

### Tasks

1. **Confirm the MVP boundary.**
   - Ship a built-in advisor tool first.
   - Do **not** make MCP, dynamic tools, or sub-agents the primary implementation.
   - Do **not** add transient streaming phases in the first backend PR unless they fall out almost for free.

2. **Confirm local workflow hygiene before coding.**
   - Ensure the repo is using the project git hooks from `scripts/githooks`.
   - Do not bypass hooks with `--no-verify`.
   - Use `./scripts/develop.sh` for the full dev server rather than manual build/run commands.

3. **Lock the model-selection policy.**
   - **Recommended MVP:** advisor uses the same resolved provider/model/cost config as the current chat, with advisor-specific max-output and usage caps.
   - **Follow-up only if required:** add a separate `AdvisorModelConfigID`-style override that resolves through the existing `configCache`/model-config path. Do not invent a new free-form `provider:model` parser if chatd already stores provider/model separately.

4. **Lock the persistence policy.**
   - **Recommended MVP:** no DB migration. Persist advisor-visible metadata in the tool result and record separate metrics in memory/Prometheus.
   - **Only if product/billing explicitly asks for queryable advisor cost:** add a later DB migration or usage table, following the normal `queries/*.sql` + `make gen` workflow.

5. **Create an execution ADR note in the work item or tracking doc.**
   - Capture: built-in tool, tool-less nested call, root-chat-only rollout, exclusive execution policy, MVP no-DB-migration default.

### Quality gate

- Everyone on the team can state the same answers to these questions:
  - Is advisor a built-in tool? **Yes.**
  - Can advisor run with action tools in the same batch? **No.**
  - Does advisor get tools of its own? **No.**
  - Is a DB migration required for MVP? **No, unless billing insists.**

## Phase 1 — Build the advisor runtime and tool wrapper

### Goals

Create the core advisor implementation in a way that is easy to test and keeps `chattool/` thin.

### Files to add

- `coderd/x/chatd/chatadvisor/types.go`
- `coderd/x/chatd/chatadvisor/guidance.go`
- `coderd/x/chatd/chatadvisor/handoff.go`
- `coderd/x/chatd/chatadvisor/runtime.go`
- `coderd/x/chatd/chatadvisor/runner.go`
- `coderd/x/chatd/chattool/advisor.go`

### Responsibilities by file

1. **`types.go`**
   - Define the input/result schema used by the tool and UI.
   - Keep the result shape close to Mux so the UI and model both have predictable cases.
   - Recommended result variants:
     - `advice`
     - `limit_reached`
     - `error`

   Recommended shape:

   ```go
   type AdvisorArgs struct {
       Question string `json:"question"`
   }

   type AdvisorResult struct {
       Type          string              `json:"type"`
       Advice        string              `json:"advice,omitempty"`
       Error         string              `json:"error,omitempty"`
       AdvisorModel  string              `json:"advisor_model,omitempty"`
       RemainingUses int                 `json:"remaining_uses,omitempty"`
       Usage         *AdvisorUsageResult `json:"usage,omitempty"`
   }
   ```

2. **`guidance.go`**
   - Hold two strings:
     - the nested advisor system prompt;
     - the parent-agent guidance block to inject into the outer system prompt.
   - The nested advisor prompt must say, in plain language:
     - you are advising the parent agent;
     - you do not address the end user directly;
     - you do not claim actions happened;
     - you return concise strategic guidance and tradeoffs.

3. **`runtime.go`**
   - Define the per-run runtime state.
   - Recommended fields:
     - resolved model + model config;
     - provider keys/options reused from the outer chat;
     - `MaxUsesPerRun`;
     - `MaxOutputTokens`;
     - atomic/current call counter;
     - callback(s) to obtain the current prompt snapshot and current-step snapshot;
     - optional metrics/usage hook.
   - Add fail-fast validation for impossible config: nil model, non-positive limits, empty prompt builders, etc.

4. **`handoff.go`**
   - Build the advisor handoff message from:
     - the explicit question;
     - the exact prompt/messages the parent model just used;
     - the current step's text/reasoning snapshot, if available;
     - the most recent relevant tool outputs, if they are already in the prompt snapshot.
   - **Important:** use the already-prepared outer prompt tail, not a fresh DB reload. That keeps the advisor aligned with compaction and the exact context the outer model saw.
   - Apply hard truncation budgets with recent-context bias.

5. **`runner.go`**
   - Execute the nested advisor call.
   - **Recommended implementation:** call `chatloop.Run()` in an in-memory, one-step mode:
     - `Tools: nil`
     - `ProviderTools: nil`
     - `MaxSteps: 1`
     - `PersistStep`: capture the assistant output in memory instead of writing DB rows
   - Reuse the existing provider/model/cost path instead of building a second provider runner.
   - Assert that no tool definitions are passed to the nested call.

6. **`chattool/advisor.go`**
   - Keep this file thin and consistent with other built-ins.
   - Responsibilities:
     - decode `AdvisorArgs`;
     - validate `Question` is non-empty and bounded;
     - call the `chatadvisor` runner;
     - return a structured tool response.

### Defensive programming requirements

- Assert `Question` is non-empty after trimming.
- Assert runtime limits are positive.
- Assert the nested advisor call runs with zero tools/provider tools.
- Assert `AdvisorResult.Type` is one of the known variants before returning.
- Assert remaining uses never goes negative.

### Acceptance criteria

- A unit test can call the advisor tool with a fake model and receive a stable `advice` result.
- The nested advisor call is impossible to run with tools accidentally attached.
- The core logic lives in `chatadvisor/`, not embedded inside `chatd.go`.

## Phase 2 — Wire advisor into chatd and keep prompt/tool availability in sync

### Goals

Register the tool in the right place, expose it only when eligible, and inject system guidance only when the tool is present.

### Files to modify

- `coderd/x/chatd/chatd.go`
- optionally a small helper file if `chatd.go` becomes too crowded

### Tasks

1. **Compute one eligibility boolean in `processChat()`.**
   Recommended inputs:
   - server-level advisor enabled flag;
   - root chat only (`chat.ParentChatID == uuid.Nil` or equivalent existing root/child check);
   - a usable resolved model/provider exists;
   - optional experiment/workspace/org gate if product wants staged rollout.

2. **Create the runtime once per outer chat run.**
   - Use the model/config/keys resolved by `resolveChatModel()`.
   - Reuse provider options from the current chat's `ChatModelCallConfig`.
   - Set `MaxUsesPerRun` and `MaxOutputTokens` from advisor config defaults.

3. **Register the tool in the built-in tool block.**
   - Insert after the skill tools and before MCP tools in `processChat()`.
   - Record `builtinToolNames["advisor"] = true` so metrics stay bounded.

4. **Inject advisor guidance into the outer system prompt using the same boolean.**
   - Use `chatprompt.InsertSystem()` in the same prompt assembly path that already injects user/system instructions.
   - Place the block near the existing instruction insertion, before plan-path/skill context blocks.
   - Wrap the guidance in an explicit tag like `<advisor-guidance>` so it is easy to spot in tests and future refactors.

5. **Keep advisor out of child chats for the first release.**
   - That avoids recursion/cost blowups with `spawn_agent` / `wait_agent` flows.
   - Document this explicitly in the rollout notes and tests.

### Acceptance criteria

- If advisor is disabled, neither the tool nor the prompt guidance appears.
- If advisor is enabled, both the tool and the prompt guidance appear.
- Root chats can use advisor; child chats cannot.
- Built-in tool names include `advisor` so metrics do not collapse it into the generic `mcp` label.

## Phase 3 — Enforce planning-only execution policy in `chatloop`

### Goals

Prevent the model from calling `advisor` and action tools in the same execution batch.

### Files to modify

- `coderd/x/chatd/chatloop/chatloop.go`
- related chatloop tests

### Recommended implementation

Keep the MVP small; do **not** build a general policy engine yet.

1. Add a minimal field to `chatloop.RunOptions`, for example:

   ```go
   ExclusiveToolName *string
   ```

2. In `Run()` / `executeTools()`, detect the case where the exclusive tool appears in the same local-tool batch as any other locally executed tool.

3. When that happens, synthesize structured tool-result errors for the affected calls instead of executing anything in the batch.
   - `advisor` should receive a clear error like: _advisor must be called by itself before action tools_.
   - The sibling action tools should receive a paired policy error like: _this tool was skipped because advisor must run alone_.

4. Let the outer model see those tool errors and retry cleanly.
   - This is simpler and safer than partial execution or hidden deferral.
   - It preserves deterministic transcript history for debugging.

5. Pass the just-finished step snapshot into the tool execution context.
   - The advisor runtime should be able to see the current step's text/reasoning content, because that is often the best hint about what the outer model is trying to decide.

### Why this is the right fit

- It matches the intended semantics: advisor is consulted **before** taking action.
- It avoids subtle race conditions caused by concurrent built-in tool execution.
- It keeps the behavior easy to test with fake models.

### Acceptance criteria

- A model-emitted batch containing only `advisor` succeeds.
- A model-emitted batch containing `advisor` plus any other locally executed tool returns deterministic policy errors and executes nothing.
- Non-advisor tool execution stays unchanged for normal chats.

## Phase 4 — Usage limits, metrics, and configuration

### Goals

Make advisor safe to operate without over-designing billing/storage in the first release.

### Files to modify

- `coderd/x/chatd/chatd.go`
- `coderd/x/chatd/chatloop/metrics.go` as needed
- `coderd/x/chatd/chatd.go` `Config` struct and constructor path
- optional follow-up config/db files only if a separate advisor model or persistent billing is required

### Tasks

1. **Add explicit server config knobs for MVP.**
   Recommended fields on `chatd.Config` or a nested advisor config struct:
   - `AdvisorEnabled bool`
   - `AdvisorMaxUsesPerRun int`
   - `AdvisorMaxOutputTokens int64`

2. **Track usage per outer run.**
   - Reset the counter for each `processChat()` invocation.
   - Return `remaining_uses` in the tool result.
   - Return `limit_reached` when the cap is exhausted.

3. **Expose advisor usage metadata in the tool result.**
   - Include model name and token/cost summary if available.
   - Use the same `callConfig.Cost` calculation path as the outer chat for MVP if advisor reuses the same model.

4. **Record server-side metrics.**
   - Count advisor invocations, failures, and latency.
   - Ensure they show up under the built-in tool label `advisor`.

5. **Optional decision gate: separate advisor model.**
   - If product insists on a stronger/different advisor model, add a follow-up config hook that resolves another existing chat model config through the same `configCache` path.
   - Keep that out of the first landing PR unless it is required for acceptance.

6. **Optional decision gate: queryable advisor cost.**
   - If this becomes required, spin a follow-up DB task:
     - update `coderd/database/queries/*.sql`;
     - add migration files;
     - run `make gen`;
     - update audit mappings if a new auditable type/field is introduced.

### Acceptance criteria

- Advisor calls are capped per outer run.
- Limit exhaustion is user-visible in the tool result.
- Metrics distinguish advisor calls from other built-in tools.
- MVP does not require a schema migration unless explicitly approved.

## Phase 5 — Frontend rendering and Storybook coverage

### Goals

Make advisor feel intentional in the Agents UI without blocking the backend on fancy streaming UI.

### Files to modify

- `site/src/pages/AgentsPage/components/ChatElements/tools/Tool.tsx`
- new `site/src/pages/AgentsPage/components/ChatElements/tools/AdvisorTool.tsx`
- Storybook story file(s) in the same tools directory

### Delivery strategy

1. **Intermediate milestone during backend bring-up:** rely on the existing generic tool renderer if needed.
   - This is acceptable only as a short-lived integration checkpoint.

2. **Release milestone:** add a dedicated lightweight `AdvisorTool` renderer.
   - Reuse existing primitives:
     - `ToolCollapsible`
     - `ToolIcon`
     - `Response` for markdown/prose rendering
     - `ScrollArea` if the advice can be long
   - Keep styling light and consistent with the Agents page.
   - Do not add unnecessary React memoization in `site/src/pages/AgentsPage/`; that area is already React-Compiler aware.

3. **Render the structured result states cleanly.**
   - `advice` — readable prose/markdown with optional metadata footer.
   - `limit_reached` — warning-style message.
   - `error` — error state with visible fallback text.
   - `running` — existing tool loading state/spinner is enough for MVP.

4. **Add Storybook coverage instead of ad-hoc component tests.**
   Recommended stories:
   - successful advice;
   - running/loading;
   - limit reached;
   - error.

5. **Keep the UI contract narrow.**
   - Prefer one text field like `advice` plus small metadata rather than a deeply nested schema.
   - That keeps the UI resilient to prompt iteration.

### Acceptance criteria

- The advisor tool card renders readable content rather than raw quoted JSON in the final release branch.
- Running, limit, and error states are visibly distinct.
- Storybook stories and play assertions cover the new states.
- Existing tool rendering flows remain unchanged.

## Phase 6 — Automated tests and validation gates

### Backend tests to add

1. **Advisor runtime/tool tests**
   - question validation;
   - tool-less nested execution assertion;
   - success result shaping;
   - limit-reached result shaping;
   - error result shaping.

2. **Prompt/gating tests in chatd**
   - advisor disabled ⇒ no tool, no guidance;
   - advisor enabled/root chat ⇒ tool + guidance;
   - child chat ⇒ advisor absent.

3. **Chatloop policy tests**
   - advisor alone runs;
   - advisor + action tool mixed batch returns deterministic policy errors;
   - non-advisor tools still execute normally.

4. **Usage/metrics tests**
   - per-run cap resets correctly;
   - builtin tool labeling includes `advisor`;
   - returned metadata includes model/usage summary when available.

### Frontend tests to add

- Storybook `play()` assertions for the advisor renderer states.
- Verify expand/collapse behavior and visible fallback text.
- Verify the message timeline still renders adjacent tools correctly.

### Recommended command sequence

Run these as the implementation matures, not only at the end:

1. Backend-focused gate after phases 1–4:
   - `make test RUN=TestAdvisor`
   - `make test RUN=TestChatloopAdvisor`
   - `make lint`

2. Frontend-focused gate after phase 5:
   - `pnpm test:storybook src/pages/AgentsPage/components/ChatElements/tools/AdvisorTool.stories.tsx`
   - `pnpm lint`
   - `pnpm format`

3. Final repo gate before handoff:
   - `make pre-commit`
   - run any additional targeted `make test RUN=...` selections covering touched chatd paths

> Use the exact new test names the implementing agents create; the names above are recommended anchors, not existing tests.

## Dogfooding plan

### Principle

Dogfood the change as a real agent feature, not just a unit-tested backend. Per the dogfood and `agent-browser` skills, the reviewer should get **watchable repro videos** plus screenshots that make the behavior obvious without reading logs.

### Required setup

1. Start the full dev environment with:
   - `./scripts/develop.sh`
2. If the frontend renderer changes, also start Storybook from `site/` with:
   - `pnpm storybook --no-open`
3. Use `agent-browser` directly — **never `npx agent-browser`**.
4. Use named browser sessions and an output folder such as:
   - `./dogfood-output/advisor/`
   - with subfolders `screenshots/` and `videos/`

### Evidence protocol

For every interactive scenario below:

1. Start video recording **before** the action.
2. Capture step-by-step screenshots at human pace.
3. Capture one annotated screenshot of the final state.
4. Stop the recording.
5. Note the exact pass/fail observation in the QA report.

For static UI states (for example Storybook error/limit cards), an annotated screenshot is sufficient; video is optional but still encouraged by this project’s review preference.

### Dogfood scenarios

#### Scenario A — Happy path in the real Agents UI

**Goal:** prove that a root agent chat can invoke advisor and produce a readable recommendation before taking further action.

Steps:

1. Open the Agents page with an advisor-enabled root chat.
2. Start a repro video.
3. Send a prompt that should reasonably trigger strategic planning, such as an architecture or multi-tradeoff question.
4. Capture screenshots of:
   - the prompt before send;
   - the running advisor state;
   - the completed advisor card and the assistant’s follow-up response.
5. Stop recording.

Pass criteria:

- advisor appears in the timeline;
- the rendered result is readable;
- the assistant can continue after consuming the advisor output.

#### Scenario B — Advisor unavailable path

**Goal:** prove the feature is truly gated.

Suggested variants (at least one is required, both are better):

- feature flag/config off;
- child/sub-agent chat.

Evidence:

- annotated screenshot of the chat/tool state showing advisor is absent;
- short video if toggling the gate live is part of the repro.

Pass criteria:

- no advisor tool is available;
- no advisor-specific prompt behavior leaks through.

#### Scenario C — UI states in Storybook

**Goal:** prove the renderer handles non-happy states cleanly.

Required story states:

- success/advice;
- running;
- limit reached;
- error.

Evidence:

- one screenshot per state;
- at least one short video showing collapse/expand behavior.

Pass criteria:

- success renders readable advice;
- limit/error have visible fallback text;
- the component behaves like the other tool cards.

#### Scenario D — Regression sweep of nearby tools

**Goal:** ensure advisor does not break the surrounding chat timeline.

Check at minimum:

- another existing built-in tool still renders correctly near advisor;
- sub-agent/tool cards still expand/collapse normally;
- no obvious console errors appear in the Agents page during the advisor flow.

Evidence:

- screenshots of adjacent tool cards;
- console/error capture if anything suspicious appears.

### `agent-browser` usage notes for the QA agent

- Prefer `agent-browser batch` for 2+ sequential commands when no intermediate parsing is needed.
- Use `snapshot -i` to discover interactive refs.
- Re-snapshot after navigation or major DOM changes.
- Avoid `wait --load networkidle` unless the page is known to go idle; prefer explicit element/text waits or short fixed waits.
- Record videos at human pace and include pauses that a reviewer can follow.

## Rollout plan

### Initial rollout

- Gate behind a server-side advisor-enabled flag.
- Enable only for selected internal/root agent chats first.
- Watch metrics for:
  - invocation count;
  - failure rate;
  - latency;
  - obvious retry loops.

### Expansion conditions

Expand beyond the initial rollout only after the following are true:

- mixed-batch policy behavior is stable;
- cost impact is understood;
- frontend UX is readable in production-like dogfood;
- no recursion surprises have appeared with sub-agent flows.

### Explicit non-goals for the first release

- advisor inside child/sub-agent chats;
- provider-agnostic streaming phase UI;
- MCP-based external advisor implementation;
- mandatory DB-backed advisor cost reporting.

## Final acceptance checklist

- [ ] `advisor` is a built-in chatd tool, not an MCP/dynamic-tool substitute.
- [ ] The nested advisor call is tool-less and bounded to one in-memory step.
- [ ] One eligibility boolean controls both tool registration and prompt guidance injection.
- [ ] Root chats can use advisor; child chats cannot in the initial rollout.
- [ ] Mixed advisor/action batches produce deterministic policy errors instead of partial execution.
- [ ] Per-run usage caps and limit-reached behavior work.
- [ ] Advisor usage is visible in metadata/metrics without forcing a DB migration for MVP.
- [ ] The Agents UI has a readable advisor card and Storybook coverage.
- [ ] Dogfooding produced screenshots and repro videos for the required scenarios.
- [ ] Validation commands (`make lint`, targeted `make test`, Storybook tests, `make pre-commit`) passed before handoff.

## Suggested PR split

1. **PR 1 — Backend foundation**
   - `chatadvisor/` package
   - `chattool/advisor.go`
   - `chatloop` exclusive policy
   - chatd gating/prompt sync
   - backend tests

2. **PR 2 — Frontend + QA**
   - advisor renderer
   - stories/play assertions
   - dogfood artifacts and QA notes

3. **PR 3 — Optional follow-ups only if demanded by stakeholders**
   - separate advisor model override
   - persistent advisor billing/queryability
   - transient phase-stream UX


</details>

---
_Generated with [`mux`](https://github.com/coder/mux) • Model: `anthropic:claude-opus-4-7` • Thinking: `max`_
2026-04-30 14:08:59 +02:00

2025 lines
62 KiB
Go

package chatloop
import (
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"errors"
"maps"
"slices"
"strconv"
"strings"
"sync"
"time"
"unicode"
"charm.land/fantasy"
fantasyanthropic "charm.land/fantasy/providers/anthropic"
fantasyopenai "charm.land/fantasy/providers/openai"
"charm.land/fantasy/schema"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/x/chatd/chatdebug"
"github.com/coder/coder/v2/coderd/x/chatd/chaterror"
"github.com/coder/coder/v2/coderd/x/chatd/chatprompt"
"github.com/coder/coder/v2/coderd/x/chatd/chatretry"
"github.com/coder/coder/v2/coderd/x/chatd/chatsanitize"
"github.com/coder/coder/v2/coderd/x/chatd/chattool"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/quartz"
)
const (
interruptedToolResultErrorMessage = "tool call was interrupted before it produced a result"
// maxCompactionRetries limits how many times the post-run
// compaction safety net can re-enter the step loop. This
// prevents infinite compaction loops when the model keeps
// hitting the context limit after summarization.
maxCompactionRetries = 3
// defaultStartupTimeout bounds how long an individual
// model attempt may spend starting to respond before
// the attempt is canceled and retried.
defaultStartupTimeout = 60 * time.Second
)
var (
ErrInterrupted = xerrors.New("chat interrupted")
ErrDynamicToolCall = xerrors.New("dynamic tool call")
// ErrStopAfterTool is returned when a tool listed in
// StopAfterTools produces a successful result, indicating
// the run should terminate cleanly after persistence.
ErrStopAfterTool = xerrors.New("stop after tool")
errStartupTimeout = xerrors.New(
"chat response did not start before the startup timeout",
)
)
// PendingToolCall describes a tool call that targets a dynamic
// tool. These calls are not executed by the chatloop; instead
// they are persisted so the caller can fulfill them externally.
type PendingToolCall struct {
ToolCallID string
ToolName string
Args string
}
// PersistedStep contains the full content of a completed or
// interrupted agent step. Content includes both assistant blocks
// (text, reasoning, tool calls) and tool result blocks. The
// persistence layer is responsible for splitting these into
// separate database messages by role.
type PersistedStep struct {
Content []fantasy.Content
Usage fantasy.Usage
ContextLimit sql.NullInt64
ProviderResponseID string
// Runtime is the wall-clock duration of this step,
// covering LLM streaming, tool execution, and retries.
// Zero indicates the duration was not measured (e.g.
// interrupted steps).
Runtime time.Duration
// PendingDynamicToolCalls lists tool calls that target
// dynamic tools. When non-empty the chatloop exits with
// ErrDynamicToolCall so the caller can execute them
// externally and resume the loop.
PendingDynamicToolCalls []PendingToolCall
// ToolCallCreatedAt maps tool-call IDs to the time
// the model emitted each tool call. Applied by the
// persistence layer to set CreatedAt on persisted
// tool-call ChatMessageParts.
ToolCallCreatedAt map[string]time.Time
// ToolResultCreatedAt maps tool-call IDs to the time
// each tool result was produced (or interrupted).
// Applied by the persistence layer to set CreatedAt
// on persisted tool-result ChatMessageParts.
ToolResultCreatedAt map[string]time.Time
}
// RunOptions configures a single streaming chat loop run.
type RunOptions struct {
Model fantasy.LanguageModel
Messages []fantasy.Message
Tools []fantasy.AgentTool
MaxSteps int
// StartupTimeout bounds how long each model attempt may
// spend opening the provider stream and waiting for its
// first stream part before the attempt is canceled and
// retried. Zero uses the production default.
StartupTimeout time.Duration
// Clock creates startup guard timers. In production use a
// real clock; tests can inject quartz.NewMock(t) to make
// startup timeout behavior deterministic.
Clock quartz.Clock
ActiveTools []string
ContextLimitFallback int64
// DynamicToolNames lists tool names that are handled
// externally. When the model invokes one of these tools
// the chatloop persists partial results and exits with
// ErrDynamicToolCall instead of executing the tool.
DynamicToolNames map[string]bool
// StopAfterTools lists tool names that, when they produce a
// successful result, cause the run to stop after persisting
// the current step. This is used for plan turns where
// propose_plan should terminate the run on success.
StopAfterTools map[string]struct{}
// ExclusiveToolNames lists tool names that must be called
// alone in a batch. When any exclusive tool appears
// alongside other locally-executed tools, every tool in the
// batch receives a policy error and nothing executes.
ExclusiveToolNames map[string]bool
// ModelConfig holds per-call LLM parameters (temperature,
// max tokens, etc.) read from the chat model configuration.
ModelConfig codersdk.ChatModelCallConfig
// ProviderOptions are provider-specific call options
// converted from ModelConfig.ProviderOptions. This is a
// separate field because the conversion requires knowledge
// of the provider, which lives in chatd, not chatloop.
ProviderOptions fantasy.ProviderOptions
// ProviderTools are provider-native tools (like web search
// and computer use) whose definitions are passed directly
// to the provider API. When a ProviderTool has a non-nil
// Runner, tool calls are executed locally; otherwise the
// provider handles execution (e.g. web search).
ProviderTools []ProviderTool
PersistStep func(context.Context, PersistedStep) error
PublishMessagePart func(
role codersdk.ChatMessageRole,
part codersdk.ChatMessagePart,
)
// Callers should attach correlation fields (chat_id, owner_id, etc.)
// using Logger.With before passing the logger in.
Logger slog.Logger
Compaction *CompactionOptions
ReloadMessages func(context.Context) ([]fantasy.Message, error)
DisableChainMode func()
// PrepareMessages is called before each LLM step with the
// current message history. If it returns non-nil, the returned
// slice replaces messages for this and all subsequent steps.
// Used to inject system context that becomes available mid-loop
// (e.g. AGENTS.md after create_workspace).
PrepareMessages func([]fantasy.Message) []fantasy.Message
// OnRetry is called before each retry attempt when the LLM
// stream fails with a retryable error. It provides the attempt
// number, raw error, normalized classification, and backoff
// delay so callers can publish status events to connected
// clients. Callers should also clear any buffered stream state
// from the failed attempt in this callback to avoid sending
// duplicated content.
OnRetry chatretry.OnRetryFn
OnInterruptedPersistError func(error)
// Metrics records Prometheus metrics for the chatd subsystem.
// When nil, no metrics are recorded.
Metrics *Metrics
// BuiltinToolNames lists tool names that are built into chatd.
BuiltinToolNames map[string]bool
}
// ProviderTool pairs a provider-native tool definition with an
// optional local executor. When Runner is nil the tool is fully
// provider-executed (e.g. web search). When Runner is non-nil
// the definition is sent to the API but execution is handled
// locally (e.g. computer use).
type ProviderTool struct {
Definition fantasy.Tool
Runner fantasy.AgentTool
}
// stepResult holds the accumulated output of a single streaming
// step. Since we own the stream consumer, all content is tracked
// directly here, no shadow draft state needed.
type stepResult struct {
content []fantasy.Content
usage fantasy.Usage
providerMetadata fantasy.ProviderMetadata
finishReason fantasy.FinishReason
toolCalls []fantasy.ToolCallContent
shouldContinue bool
toolCallCreatedAt map[string]time.Time
toolResultCreatedAt map[string]time.Time
}
// toResponseMessages converts step content into messages suitable
// for appending to the conversation. Mirrors fantasy's
// toResponseMessages logic.
func (r stepResult) toResponseMessages() []fantasy.Message {
var assistantParts []fantasy.MessagePart
var toolParts []fantasy.MessagePart
for _, c := range r.content {
switch c.GetType() {
case fantasy.ContentTypeText:
text, ok := fantasy.AsContentType[fantasy.TextContent](c)
if !ok || strings.TrimSpace(text.Text) == "" {
continue
}
assistantParts = append(assistantParts, fantasy.TextPart{
Text: text.Text,
ProviderOptions: fantasy.ProviderOptions(text.ProviderMetadata),
})
case fantasy.ContentTypeReasoning:
reasoning, ok := fantasy.AsContentType[fantasy.ReasoningContent](c)
if !ok || strings.TrimSpace(reasoning.Text) == "" {
continue
}
assistantParts = append(assistantParts, fantasy.ReasoningPart{
Text: reasoning.Text,
ProviderOptions: fantasy.ProviderOptions(reasoning.ProviderMetadata),
})
case fantasy.ContentTypeToolCall:
toolCall, ok := fantasy.AsContentType[fantasy.ToolCallContent](c)
if !ok {
continue
}
assistantParts = append(assistantParts, fantasy.ToolCallPart{
ToolCallID: toolCall.ToolCallID,
ToolName: toolCall.ToolName,
Input: toolCall.Input,
ProviderExecuted: toolCall.ProviderExecuted,
ProviderOptions: fantasy.ProviderOptions(toolCall.ProviderMetadata),
})
case fantasy.ContentTypeFile:
file, ok := fantasy.AsContentType[fantasy.FileContent](c)
if !ok {
continue
}
assistantParts = append(assistantParts, fantasy.FilePart{
Data: file.Data,
MediaType: file.MediaType,
ProviderOptions: fantasy.ProviderOptions(file.ProviderMetadata),
})
case fantasy.ContentTypeSource:
// Sources are metadata about references; they don't
// need to be included in conversation messages.
continue
case fantasy.ContentTypeToolResult:
result, ok := fantasy.AsContentType[fantasy.ToolResultContent](c)
if !ok {
continue
}
part := fantasy.ToolResultPart{
ToolCallID: result.ToolCallID,
Output: result.Result,
ProviderExecuted: result.ProviderExecuted,
ProviderOptions: fantasy.ProviderOptions(result.ProviderMetadata),
}
// Provider-executed tool results (e.g. web_search)
// must stay in the assistant message so the result
// block appears inline after the corresponding
// server_tool_use block. This matches the persistence
// layer in chatd.go which keeps them in
// assistantBlocks.
if result.ProviderExecuted {
assistantParts = append(assistantParts, part)
} else {
toolParts = append(toolParts, part)
}
default:
continue
}
}
var messages []fantasy.Message
if len(assistantParts) > 0 {
messages = append(messages, fantasy.Message{
Role: fantasy.MessageRoleAssistant,
Content: assistantParts,
})
}
if len(toolParts) > 0 {
messages = append(messages, fantasy.Message{
Role: fantasy.MessageRoleTool,
Content: toolParts,
})
}
return messages
}
// reasoningState accumulates reasoning content and provider
// metadata while the stream is in flight.
type reasoningState struct {
text string
options fantasy.ProviderMetadata
}
// Run executes the chat step-stream loop and delegates
// persistence/publishing to callbacks.
func Run(ctx context.Context, opts RunOptions) error {
if opts.Model == nil {
return xerrors.New("chat model is required")
}
if opts.PersistStep == nil {
return xerrors.New("persist step callback is required")
}
if opts.MaxSteps <= 0 {
opts.MaxSteps = 1
}
if opts.StartupTimeout <= 0 {
opts.StartupTimeout = defaultStartupTimeout
}
if opts.Clock == nil {
opts.Clock = quartz.NewReal()
}
if opts.Metrics == nil {
opts.Metrics = NopMetrics()
}
publishMessagePart := func(role codersdk.ChatMessageRole, part codersdk.ChatMessagePart) {
if opts.PublishMessagePart == nil {
return
}
opts.PublishMessagePart(role, part)
}
tools := buildToolDefinitions(opts.Tools, opts.ActiveTools, opts.ProviderTools)
applyAnthropicCaching := shouldApplyAnthropicPromptCaching(opts.Model)
messages := opts.Messages
var lastUsage fantasy.Usage
var lastProviderMetadata fantasy.ProviderMetadata
needsFullHistoryReload := false
reloadFullHistory := func(stage string) error {
if opts.ReloadMessages == nil {
return nil
}
reloaded, err := opts.ReloadMessages(ctx)
if err != nil {
return xerrors.Errorf("reload messages %s: %w", stage, err)
}
messages = reloaded
return nil
}
totalSteps := 0
// When totalSteps reaches MaxSteps the inner loop exits immediately
// (its condition is false), stoppedByModel stays false, and the
// post-loop guard breaks the outer compaction loop.
for compactionAttempt := 0; ; compactionAttempt++ {
alreadyCompacted := false
// stoppedByModel is true when the inner step loop
// exited because the model produced no tool calls
// (shouldContinue was false). This distinguishes a
// natural stop from hitting MaxSteps.
stoppedByModel := false
// compactedOnFinalStep tracks whether compaction
// occurred on the very step where the model stopped.
// Only in that case should we re-enter, because the
// agent never had a chance to use the compacted context.
compactedOnFinalStep := false
for step := 0; totalSteps < opts.MaxSteps; step++ {
totalSteps++
provider := opts.Model.Provider()
modelName := opts.Model.Model()
opts.Metrics.StepsTotal.WithLabelValues(provider, modelName).Inc()
stepStart := time.Now()
// Copy messages so that provider-specific caching
// mutations don't leak back to the caller's slice.
// copy copies Message structs by value, so field
// reassignments in addAnthropicPromptCaching only
// affect the prepared slice.
if opts.PrepareMessages != nil {
if updated := opts.PrepareMessages(messages); updated != nil {
messages = updated
}
}
prepared := make([]fantasy.Message, len(messages))
copy(prepared, messages)
prepared, sanitizeStats := chatsanitize.SanitizeAnthropicProviderToolHistory(provider, prepared)
chatsanitize.LogAnthropicProviderToolSanitization(
ctx, opts.Logger, "pre_request", provider, modelName, sanitizeStats,
slog.F("step_index", step),
slog.F("total_steps", totalSteps),
)
prepared = chatsanitize.ApplyAnthropicProviderToolGuard(
ctx, opts.Logger, provider, modelName, prepared,
)
if applyAnthropicCaching {
addAnthropicPromptCaching(prepared)
}
opts.Metrics.MessageCount.WithLabelValues(provider, modelName).Observe(float64(len(prepared)))
opts.Metrics.PromptSizeBytes.WithLabelValues(provider, modelName).Observe(float64(EstimatePromptSize(prepared)))
call := fantasy.Call{
Prompt: prepared,
Tools: tools,
MaxOutputTokens: opts.ModelConfig.MaxOutputTokens,
Temperature: opts.ModelConfig.Temperature,
TopP: opts.ModelConfig.TopP,
TopK: opts.ModelConfig.TopK,
PresencePenalty: opts.ModelConfig.PresencePenalty,
FrequencyPenalty: opts.ModelConfig.FrequencyPenalty,
ProviderOptions: opts.ProviderOptions,
}
var result stepResult
stepCtx := chatdebug.ReuseStep(ctx)
err := chatretry.Retry(stepCtx, func(retryCtx context.Context) error {
attempt, streamErr := guardedStream(
retryCtx,
provider,
modelName,
opts.Clock,
opts.StartupTimeout,
func(attemptCtx context.Context) (fantasy.StreamResponse, error) {
return opts.Model.Stream(attemptCtx, call)
},
opts.Metrics,
)
if streamErr != nil {
return streamErr
}
defer attempt.release()
var processErr error
result, processErr = processStepStream(
attempt.ctx,
attempt.stream,
publishMessagePart,
)
return attempt.finish(processErr)
}, func(
attempt int,
retryErr error,
classified chatretry.ClassifiedError,
delay time.Duration,
) {
// Reset result from the failed attempt so the next
// attempt starts clean.
result = stepResult{}
// Record before OnRetry so a panicking callback can't
// drop the sample. The metric's provider label comes
// from the outer local; WithProvider only affects the
// classified payload handed to OnRetry.
classified = classified.WithProvider(provider)
opts.Metrics.RecordStreamRetry(provider, modelName, classified)
if opts.OnRetry != nil {
opts.OnRetry(attempt, retryErr, classified, delay)
}
})
if err != nil {
if errors.Is(err, ErrInterrupted) {
persistInterruptedStep(ctx, opts, &result)
return ErrInterrupted
}
return xerrors.Errorf("stream response: %w", err)
}
// Execute tools before persisting so that tool results
// are included in the persisted step content. The
// persistence layer splits assistant and tool-result
// blocks into separate database messages by role.
var toolResults []fantasy.ToolResultContent
if result.shouldContinue {
var err error
toolResults, err = executeToolsForStep(ctx, opts, &result, provider, modelName, step, stepStart, publishMessagePart)
if err != nil {
return err
}
}
// Extract context limit from provider metadata.
contextLimit := extractContextLimitWithFallback(
result.providerMetadata,
opts.ContextLimitFallback,
)
result.content = chatsanitize.SanitizeAnthropicProviderToolStepContent(
ctx, opts.Logger, provider, modelName,
"normal_persist", step, result.finishReason, result.content,
)
if len(result.content) == 0 {
lastUsage = result.usage
lastProviderMetadata = result.providerMetadata
stoppedByModel = true
break
}
// Persist the step. If persistence fails because
// the chat was interrupted between the previous
// check and here, fall back to the interrupt-safe
// path so partial content is not lost.
if err := opts.PersistStep(ctx, PersistedStep{
Content: result.content,
Usage: result.usage,
ContextLimit: contextLimit,
ProviderResponseID: extractOpenAIResponseIDIfStored(opts.ProviderOptions, result.providerMetadata),
Runtime: time.Since(stepStart),
ToolCallCreatedAt: result.toolCallCreatedAt,
ToolResultCreatedAt: result.toolResultCreatedAt,
}); err != nil {
if errors.Is(err, ErrInterrupted) {
persistInterruptedStep(ctx, opts, &result)
return ErrInterrupted
}
return xerrors.Errorf("persist step: %w", err)
}
lastUsage = result.usage
lastProviderMetadata = result.providerMetadata
// Check if any executed tool triggers an early stop.
if shouldStopAfterTools(opts.StopAfterTools, toolResults) {
tryCompactOnExit(ctx, opts, result.usage, result.providerMetadata)
return ErrStopAfterTool
}
// When chain mode is active (PreviousResponseID set), exit
// it after persisting the first chained step. Continuation
// steps include tool-result messages, which fantasy rejects
// when previous_response_id is set, so we must leave chain
// mode and reload the full history before the next call.
stepMessages := result.toResponseMessages()
if hasPreviousResponseID(opts.ProviderOptions) {
clearPreviousResponseID(opts.ProviderOptions)
if opts.DisableChainMode != nil {
opts.DisableChainMode()
}
switch {
case opts.ReloadMessages != nil:
if err := reloadFullHistory("after chain mode exit"); err != nil {
return err
}
needsFullHistoryReload = false
default:
messages = append(messages, stepMessages...)
needsFullHistoryReload = false
}
} else {
messages = append(messages, stepMessages...)
}
if needsFullHistoryReload && !result.shouldContinue &&
opts.ReloadMessages != nil {
if err := reloadFullHistory("before final compaction after chain mode exit"); err != nil {
return err
}
needsFullHistoryReload = false
}
// Inline compaction.
if !needsFullHistoryReload && opts.Compaction != nil && opts.ReloadMessages != nil {
did, compactErr := tryCompact(
ctx,
opts.Model,
opts.Compaction,
opts.ContextLimitFallback,
result.usage,
result.providerMetadata,
messages,
)
opts.Metrics.RecordCompaction(provider, modelName, did, compactErr)
if compactErr != nil && opts.Compaction.OnError != nil {
opts.Compaction.OnError(compactErr)
}
if did {
alreadyCompacted = true
compactedOnFinalStep = true
if err := reloadFullHistory("after compaction"); err != nil {
return err
}
}
}
if !result.shouldContinue {
stoppedByModel = true
break
}
// The agent is continuing with tool calls, so any
// prior compaction has already been consumed.
compactedOnFinalStep = false
}
if needsFullHistoryReload && stoppedByModel && opts.ReloadMessages != nil {
if err := reloadFullHistory("before post-run compaction after chain mode exit"); err != nil {
return err
}
needsFullHistoryReload = false
}
// Post-run compaction safety net: if we never compacted
// during the loop, try once at the end.
if !needsFullHistoryReload && !alreadyCompacted && opts.Compaction != nil && opts.ReloadMessages != nil {
did, err := tryCompact(
ctx,
opts.Model,
opts.Compaction,
opts.ContextLimitFallback,
lastUsage,
lastProviderMetadata,
messages,
)
opts.Metrics.RecordCompaction(opts.Model.Provider(), opts.Model.Model(), did, err)
if err != nil {
if opts.Compaction.OnError != nil {
opts.Compaction.OnError(err)
}
}
if did {
compactedOnFinalStep = true
}
}
// Re-enter the step loop when compaction fired on the
// model's final step. This lets the agent continue
// working with fresh summarized context instead of
// stopping. When the inner loop continued after inline
// compaction (tool-call steps kept going), the agent
// already used the compacted context, so no re-entry
// is needed. Limit retries to prevent infinite loops.
if compactedOnFinalStep && stoppedByModel &&
opts.ReloadMessages != nil &&
compactionAttempt < maxCompactionRetries {
reloaded, reloadErr := opts.ReloadMessages(ctx)
if reloadErr != nil {
return xerrors.Errorf("reload messages after compaction: %w", reloadErr)
}
messages = reloaded
continue
}
break
}
return nil
}
// guardedAttempt owns an attempt-scoped context and startup guard
// around a provider stream. release is idempotent and frees the
// attempt-scoped timer/context. finish canonicalizes startup timeout
// errors before the retry loop classifies them.
type guardedAttempt struct {
ctx context.Context
stream fantasy.StreamResponse
release func()
finish func(error) error
}
// startupGuard arbitrates whether an attempt times out during
// stream startup. Exactly one outcome wins: the timer cancels
// the attempt, or the first-part path disarms the timer.
type startupGuard struct {
timer *quartz.Timer
cancel context.CancelCauseFunc
once sync.Once
}
func newStartupGuard(
clock quartz.Clock,
timeout time.Duration,
cancel context.CancelCauseFunc,
) *startupGuard {
guard := &startupGuard{cancel: cancel}
guard.timer = clock.AfterFunc(timeout, guard.onTimeout, "startupGuard")
return guard
}
func (g *startupGuard) onTimeout() {
g.once.Do(func() {
g.cancel(errStartupTimeout)
})
}
func (g *startupGuard) Disarm() {
g.once.Do(func() {
g.timer.Stop()
})
}
func classifyStartupTimeout(
attemptCtx context.Context,
provider string,
err error,
) error {
if !errors.Is(context.Cause(attemptCtx), errStartupTimeout) {
return err
}
if err == nil {
err = errStartupTimeout
}
return chaterror.WithClassification(err, chaterror.ClassifiedError{
Kind: chaterror.KindStartupTimeout,
Provider: provider,
Retryable: true,
})
}
func guardedStream(
parent context.Context,
provider, model string,
clock quartz.Clock,
timeout time.Duration,
openStream func(context.Context) (fantasy.StreamResponse, error),
metrics *Metrics,
) (guardedAttempt, error) {
attemptCtx, cancelAttempt := context.WithCancelCause(parent)
guard := newStartupGuard(clock, timeout, cancelAttempt)
var releaseOnce sync.Once
release := func() {
releaseOnce.Do(func() {
guard.Disarm()
cancelAttempt(nil)
})
}
streamStart := clock.Now()
stream, err := openStream(attemptCtx)
if err != nil {
err = classifyStartupTimeout(attemptCtx, provider, err)
release()
return guardedAttempt{}, err
}
recordTTFT := sync.OnceFunc(func() {
metrics.TTFTSeconds.WithLabelValues(provider, model).Observe(
clock.Since(streamStart).Seconds(),
)
})
return guardedAttempt{
ctx: attemptCtx,
stream: fantasy.StreamResponse(func(yield func(fantasy.StreamPart) bool) {
for part := range stream {
guard.Disarm()
recordTTFT()
if !yield(part) {
return
}
}
}),
release: release,
finish: func(err error) error {
return classifyStartupTimeout(attemptCtx, provider, err)
},
}, nil
}
// processStepStream consumes a fantasy StreamResponse and
// accumulates all content into a stepResult. Callbacks fire
// inline and their errors propagate directly.
func processStepStream(
ctx context.Context,
stream fantasy.StreamResponse,
publishMessagePart func(codersdk.ChatMessageRole, codersdk.ChatMessagePart),
) (stepResult, error) {
var result stepResult
activeToolCalls := make(map[string]*fantasy.ToolCallContent)
activeTextContent := make(map[string]string)
activeReasoningContent := make(map[string]reasoningState)
// Track tool names by ID for input delta publishing.
toolNames := make(map[string]string)
for part := range stream {
switch part.Type {
case fantasy.StreamPartTypeTextStart:
activeTextContent[part.ID] = ""
case fantasy.StreamPartTypeTextDelta:
if _, exists := activeTextContent[part.ID]; exists {
activeTextContent[part.ID] += part.Delta
}
publishMessagePart(codersdk.ChatMessageRoleAssistant, codersdk.ChatMessageText(part.Delta))
case fantasy.StreamPartTypeTextEnd:
if text, exists := activeTextContent[part.ID]; exists {
result.content = append(result.content, fantasy.TextContent{
Text: text,
ProviderMetadata: part.ProviderMetadata,
})
delete(activeTextContent, part.ID)
}
case fantasy.StreamPartTypeReasoningStart:
activeReasoningContent[part.ID] = reasoningState{
text: part.Delta,
options: part.ProviderMetadata,
}
case fantasy.StreamPartTypeReasoningDelta:
if active, exists := activeReasoningContent[part.ID]; exists {
active.text += part.Delta
active.options = part.ProviderMetadata
activeReasoningContent[part.ID] = active
}
publishMessagePart(codersdk.ChatMessageRoleAssistant, codersdk.ChatMessageReasoning(part.Delta))
case fantasy.StreamPartTypeReasoningEnd:
if active, exists := activeReasoningContent[part.ID]; exists {
if part.ProviderMetadata != nil {
active.options = part.ProviderMetadata
}
content := fantasy.ReasoningContent{
Text: active.text,
ProviderMetadata: active.options,
}
result.content = append(result.content, content)
delete(activeReasoningContent, part.ID)
}
case fantasy.StreamPartTypeToolInputStart:
activeToolCalls[part.ID] = &fantasy.ToolCallContent{
ToolCallID: part.ID,
ToolName: part.ToolCallName,
Input: "",
ProviderExecuted: part.ProviderExecuted,
}
if strings.TrimSpace(part.ToolCallName) != "" {
toolNames[part.ID] = part.ToolCallName
}
case fantasy.StreamPartTypeToolInputDelta:
var providerExecuted bool
if toolCall, exists := activeToolCalls[part.ID]; exists {
toolCall.Input += part.Delta
providerExecuted = toolCall.ProviderExecuted
}
toolName := toolNames[part.ID]
publishMessagePart(codersdk.ChatMessageRoleAssistant, codersdk.ChatMessagePart{
Type: codersdk.ChatMessagePartTypeToolCall,
ToolCallID: part.ID,
ToolName: toolName,
ArgsDelta: part.Delta,
ProviderExecuted: providerExecuted,
})
case fantasy.StreamPartTypeToolInputEnd:
// No callback needed; the full tool call arrives in
// StreamPartTypeToolCall.
case fantasy.StreamPartTypeToolCall:
tc := fantasy.ToolCallContent{
ToolCallID: part.ID,
ToolName: part.ToolCallName,
Input: part.ToolCallInput,
ProviderExecuted: part.ProviderExecuted,
ProviderMetadata: part.ProviderMetadata,
}
result.toolCalls = append(result.toolCalls, tc)
result.content = append(result.content, tc)
if strings.TrimSpace(part.ToolCallName) != "" {
toolNames[part.ID] = part.ToolCallName
}
// Clean up active tool call tracking.
delete(activeToolCalls, part.ID)
// Record when the model emitted this tool call
// so the persisted part carries an accurate
// timestamp for duration computation.
now := dbtime.Now()
if result.toolCallCreatedAt == nil {
result.toolCallCreatedAt = make(map[string]time.Time)
}
result.toolCallCreatedAt[part.ID] = now
ssePart := chatprompt.PartFromContent(tc)
ssePart.CreatedAt = &now
publishMessagePart(
codersdk.ChatMessageRoleAssistant,
ssePart,
)
case fantasy.StreamPartTypeSource:
sourceContent := fantasy.SourceContent{
SourceType: part.SourceType,
ID: part.ID,
URL: part.URL,
Title: part.Title,
ProviderMetadata: part.ProviderMetadata,
}
result.content = append(result.content, sourceContent)
publishMessagePart(
codersdk.ChatMessageRoleAssistant,
chatprompt.PartFromContent(sourceContent),
)
case fantasy.StreamPartTypeToolResult:
// Provider-executed tool results (e.g. web search)
// are emitted by the provider and added directly
// to the step content for multi-turn round-tripping.
// This mirrors fantasy's agent.go accumulation logic.
if part.ProviderExecuted {
tr := fantasy.ToolResultContent{
ToolCallID: part.ID,
ToolName: part.ToolCallName,
ProviderExecuted: part.ProviderExecuted,
ProviderMetadata: part.ProviderMetadata,
}
result.content = append(result.content, tr)
now := dbtime.Now()
if result.toolResultCreatedAt == nil {
result.toolResultCreatedAt = make(map[string]time.Time)
}
result.toolResultCreatedAt[part.ID] = now
ssePart := chatprompt.PartFromContent(tr)
ssePart.CreatedAt = &now
publishMessagePart(
codersdk.ChatMessageRoleTool,
ssePart,
)
}
case fantasy.StreamPartTypeFinish:
result.usage = part.Usage
result.finishReason = part.FinishReason
result.providerMetadata = part.ProviderMetadata
case fantasy.StreamPartTypeError:
// Detect interruption: the stream may surface the
// cancel as context.Canceled or propagate the
// ErrInterrupted cause directly, depending on
// the provider implementation.
if errors.Is(context.Cause(ctx), ErrInterrupted) &&
(errors.Is(part.Error, context.Canceled) || errors.Is(part.Error, ErrInterrupted)) {
// Flush in-progress content so that
// persistInterruptedStep has access to partial
// text, reasoning, and tool calls that were
// still streaming when the interrupt arrived.
flushActiveState(
&result,
activeTextContent,
activeReasoningContent,
activeToolCalls,
toolNames,
)
return result, ErrInterrupted
}
return result, part.Error
}
}
// The stream iterator may stop yielding parts without
// producing a StreamPartTypeError when the context is
// canceled (e.g. some providers close the response body
// silently). Detect this case and flush partial content
// so that persistInterruptedStep can save it.
if ctx.Err() != nil &&
errors.Is(context.Cause(ctx), ErrInterrupted) {
flushActiveState(
&result,
activeTextContent,
activeReasoningContent,
activeToolCalls,
toolNames,
)
return result, ErrInterrupted
}
hasLocalToolCalls := false
for _, tc := range result.toolCalls {
if !tc.ProviderExecuted {
hasLocalToolCalls = true
break
}
}
result.shouldContinue = hasLocalToolCalls &&
result.finishReason == fantasy.FinishReasonToolCalls
return result, nil
}
// executeTools runs all tool calls concurrently after the stream
// completes. Results are published via onResult in the original
// tool-call order after all tools finish, preserving deterministic
// event ordering for SSE subscribers.
func executeTools(
ctx context.Context,
allTools []fantasy.AgentTool,
activeTools []string,
providerTools []ProviderTool,
toolCalls []fantasy.ToolCallContent,
metrics *Metrics,
logger slog.Logger,
provider, model string,
builtinToolNames map[string]bool,
onResult func(fantasy.ToolResultContent, time.Time),
) []fantasy.ToolResultContent {
if len(toolCalls) == 0 {
return nil
}
// Filter out provider-executed tool calls. These were
// handled server-side by the LLM provider (e.g., web
// search) and their results are already in the stream
// content.
localToolCalls := make([]fantasy.ToolCallContent, 0, len(toolCalls))
for _, tc := range toolCalls {
if !tc.ProviderExecuted {
localToolCalls = append(localToolCalls, tc)
}
}
if len(localToolCalls) == 0 {
return nil
}
toolMap := make(map[string]fantasy.AgentTool, len(allTools))
for _, t := range allTools {
toolMap[t.Info().Name] = t
}
providerRunnerNames := make(map[string]struct{}, len(providerTools))
// Include runners from provider tools so locally-executed
// provider tools (e.g. computer use) can be dispatched.
for _, pt := range providerTools {
if pt.Runner != nil {
name := pt.Runner.Info().Name
toolMap[name] = pt.Runner
providerRunnerNames[name] = struct{}{}
}
}
results := make([]fantasy.ToolResultContent, len(localToolCalls))
completedAt := make([]time.Time, len(localToolCalls))
var wg sync.WaitGroup
wg.Add(len(localToolCalls))
for i, tc := range localToolCalls {
go func() {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
results[i] = fantasy.ToolResultContent{
ToolCallID: tc.ToolCallID,
ToolName: tc.ToolName,
Result: fantasy.ToolResultOutputContentError{
Error: xerrors.Errorf("tool panicked: %v", r),
},
}
}
// Record when this tool completed (or panicked).
// Captured per-goroutine so parallel tools get
// accurate individual completion times.
completedAt[i] = dbtime.Now()
}()
results[i] = executeSingleTool(ctx, toolMap, tc, metrics, logger, provider, model, builtinToolNames, activeTools, providerRunnerNames)
}()
}
wg.Wait()
// Publish results in the original tool-call order so SSE
// subscribers see a deterministic event sequence.
if onResult != nil {
for i, tr := range results {
onResult(tr, completedAt[i])
}
}
return results
}
// executeToolsForStep runs the tool-execution phase of a single
// chatloop step. It enforces the exclusive-tool policy, partitions
// built-in versus dynamic tool calls, dispatches built-in tools, and
// when dynamic tool calls are present persists the step and returns
// ErrDynamicToolCall so the caller can execute them externally.
// Returns the tool results to append to the step, or an error that the
// caller must propagate (ErrInterrupted, ErrDynamicToolCall, ctx.Err(),
// or a persistence failure).
func executeToolsForStep(
ctx context.Context,
opts RunOptions,
result *stepResult,
provider, modelName string,
step int,
stepStart time.Time,
publishMessagePart func(codersdk.ChatMessageRole, codersdk.ChatMessagePart),
) ([]fantasy.ToolResultContent, error) {
// Check for context cancellation before starting tool
// execution. If the chat was interrupted between stream
// completion and here, persist what we have and bail out.
if ctx.Err() != nil {
if errors.Is(context.Cause(ctx), ErrInterrupted) {
persistInterruptedStep(ctx, opts, result)
return nil, ErrInterrupted
}
return nil, ctx.Err()
}
// Enforce exclusivity across ALL locally-executable tool
// calls (both built-in and dynamic) before partitioning.
// Checking only the built-in partition would let the model
// bypass the policy by mixing an exclusive tool with a
// dynamic tool: the exclusive tool would still run and the
// dynamic call would still be handed to the caller for
// external execution, breaking the planning-only contract.
localCandidates := make([]fantasy.ToolCallContent, 0, len(result.toolCalls))
for _, tc := range result.toolCalls {
if !tc.ProviderExecuted {
localCandidates = append(localCandidates, tc)
}
}
policyResults, exclusiveViolation := applyExclusiveToolPolicy(
localCandidates,
opts.ExclusiveToolNames,
opts.Metrics,
provider,
modelName,
)
if exclusiveViolation {
now := dbtime.Now()
for _, tr := range policyResults {
recordToolResultTimestamp(result, tr.ToolCallID, now)
publishToolAttachments(ctx, opts.Logger, tr, now, publishMessagePart)
ssePart := chatprompt.PartFromContentWithLogger(ctx, opts.Logger, tr)
ssePart.CreatedAt = &now
publishMessagePart(codersdk.ChatMessageRoleTool, ssePart)
}
for _, tr := range policyResults {
result.content = append(result.content, tr)
}
// Mirror the post-execution interruption check used by the
// non-policy path: if the chat was interrupted while we
// synthesized policy errors, route through
// persistInterruptedStep so the synthesized results are not
// dropped when the regular PersistStep path fails on a
// canceled context.
if ctx.Err() != nil {
if errors.Is(context.Cause(ctx), ErrInterrupted) {
persistInterruptedStep(ctx, opts, result)
return nil, ErrInterrupted
}
return nil, ctx.Err()
}
// Fall through to the normal persistence path so the loop
// continues with error results that the model can observe
// and retry. Skip partitioning, execution, and
// pending-dynamic persistence.
return policyResults, nil
}
// Partition tool calls into built-in and dynamic.
var builtinCalls, dynamicCalls []fantasy.ToolCallContent
if len(opts.DynamicToolNames) > 0 {
for _, tc := range result.toolCalls {
if opts.DynamicToolNames[tc.ToolName] {
dynamicCalls = append(dynamicCalls, tc)
} else {
builtinCalls = append(builtinCalls, tc)
}
}
} else {
builtinCalls = result.toolCalls
}
// Execute only built-in tools.
toolResults := executeTools(ctx, opts.Tools, opts.ActiveTools, opts.ProviderTools, builtinCalls, opts.Metrics, opts.Logger, provider, modelName, opts.BuiltinToolNames, func(tr fantasy.ToolResultContent, completedAt time.Time) {
recordToolResultTimestamp(result, tr.ToolCallID, completedAt)
publishToolAttachments(ctx, opts.Logger, tr, completedAt, publishMessagePart)
ssePart := chatprompt.PartFromContentWithLogger(ctx, opts.Logger, tr)
ssePart.CreatedAt = &completedAt
publishMessagePart(codersdk.ChatMessageRoleTool, ssePart)
})
for _, tr := range toolResults {
result.content = append(result.content, tr)
}
// If dynamic tools were called, persist what we have
// (assistant + built-in results) and exit so the caller can
// execute them externally.
if len(dynamicCalls) > 0 {
// Strip Anthropic provider-executed tool calls without
// matching results before persisting so the action-required
// step does not carry a malformed tool-call history into
// downstream provider requests.
result.content = chatsanitize.SanitizeAnthropicProviderToolStepContent(
ctx, opts.Logger, provider, modelName,
"dynamic_tool_persist", step, result.finishReason, result.content,
)
if err := persistPendingDynamicStep(ctx, opts, result, stepStart, dynamicCalls); err != nil {
return nil, err
}
tryCompactOnExit(ctx, opts, result.usage, result.providerMetadata)
return nil, ErrDynamicToolCall
}
// Check for interruption after tool execution. Tools that
// were canceled mid-flight produce error results via ctx
// cancellation. Persist the full step (assistant blocks +
// tool results) through the interrupt-safe path so nothing
// is lost.
if ctx.Err() != nil {
if errors.Is(context.Cause(ctx), ErrInterrupted) {
persistInterruptedStep(ctx, opts, result)
return nil, ErrInterrupted
}
return nil, ctx.Err()
}
return toolResults, nil
}
// persistPendingDynamicStep persists a step that has pending dynamic
// tool calls awaiting external execution. Returns ErrInterrupted when
// persistence fails because the chat was interrupted.
func persistPendingDynamicStep(
ctx context.Context,
opts RunOptions,
result *stepResult,
stepStart time.Time,
dynamicCalls []fantasy.ToolCallContent,
) error {
pending := make([]PendingToolCall, 0, len(dynamicCalls))
for _, dc := range dynamicCalls {
pending = append(pending, PendingToolCall{
ToolCallID: dc.ToolCallID,
ToolName: dc.ToolName,
Args: dc.Input,
})
}
contextLimit := extractContextLimitWithFallback(result.providerMetadata, opts.ContextLimitFallback)
if err := opts.PersistStep(ctx, PersistedStep{
Content: result.content,
Usage: result.usage,
ContextLimit: contextLimit,
ProviderResponseID: extractOpenAIResponseIDIfStored(opts.ProviderOptions, result.providerMetadata),
Runtime: time.Since(stepStart),
PendingDynamicToolCalls: pending,
}); err != nil {
if errors.Is(err, ErrInterrupted) {
persistInterruptedStep(ctx, opts, result)
return ErrInterrupted
}
return xerrors.Errorf("persist step: %w", err)
}
return nil
}
// applyExclusiveToolPolicy checks whether toolCalls violate the
// exclusive-tool policy declared by exclusiveToolNames. When a
// violation is detected it synthesizes deterministic policy-error
// results for every tool call and records size/error metrics so the
// exclusivity failure mode is visible to operators. Returns
// (results, true) on violation; (nil, false) otherwise.
func applyExclusiveToolPolicy(
toolCalls []fantasy.ToolCallContent,
exclusiveToolNames map[string]bool,
metrics *Metrics,
provider, model string,
) ([]fantasy.ToolResultContent, bool) {
blockingToolName, ok := firstExclusiveToolName(toolCalls, exclusiveToolNames)
if !ok {
return nil, false
}
results := exclusiveToolPolicyResults(toolCalls, exclusiveToolNames, blockingToolName)
for _, tr := range results {
recordToolResultMetrics(metrics, provider, model, tr)
}
return results, true
}
// recordToolResultMetrics observes tool result size and increments
// tool_errors_total when the result carries an error output. Mirrors
// the metric-recording defer in executeSingleTool so that synthetic
// results (e.g. exclusive-tool policy errors) contribute to operator
// visibility.
func recordToolResultMetrics(metrics *Metrics, provider, model string, tr fantasy.ToolResultContent) {
if metrics == nil {
return
}
label := tr.ToolName
if label == "" {
label = "unknown"
}
metrics.ToolResultSizeBytes.WithLabelValues(provider, model, label).Observe(
float64(ToolResultSize(tr)),
)
if _, ok := tr.Result.(fantasy.ToolResultOutputContentError); ok {
metrics.RecordToolError(provider, model, label)
}
}
func firstExclusiveToolName(
toolCalls []fantasy.ToolCallContent,
exclusiveToolNames map[string]bool,
) (string, bool) {
if len(toolCalls) <= 1 || len(exclusiveToolNames) == 0 {
return "", false
}
for _, tc := range toolCalls {
if exclusiveToolNames[tc.ToolName] {
return tc.ToolName, true
}
}
return "", false
}
func exclusiveToolPolicyResults(
toolCalls []fantasy.ToolCallContent,
exclusiveToolNames map[string]bool,
blockingToolName string,
) []fantasy.ToolResultContent {
results := make([]fantasy.ToolResultContent, len(toolCalls))
for i, tc := range toolCalls {
message := exclusiveToolSkippedErrorMessage(blockingToolName)
if exclusiveToolNames[tc.ToolName] {
message = exclusiveToolMustRunAloneErrorMessage(tc.ToolName)
}
results[i] = fantasy.ToolResultContent{
ToolCallID: tc.ToolCallID,
ToolName: tc.ToolName,
Result: fantasy.ToolResultOutputContentError{
Error: xerrors.New(message),
},
}
}
return results
}
func exclusiveToolMustRunAloneErrorMessage(toolName string) string {
return toolName + " must be called alone, without other tools in the same batch. Retry with only the " + toolName + " call."
}
func exclusiveToolSkippedErrorMessage(toolName string) string {
return "this tool was skipped because " + toolName + " must run alone in its batch. Retry your tool calls without " + toolName + ", or call " + toolName + " separately first."
}
// executeSingleTool executes one tool call and converts the
// response into a ToolResultContent.
func executeSingleTool(
ctx context.Context,
toolMap map[string]fantasy.AgentTool,
tc fantasy.ToolCallContent,
metrics *Metrics,
logger slog.Logger,
provider, model string,
builtinToolNames map[string]bool,
activeTools []string,
providerRunnerNames map[string]struct{},
) fantasy.ToolResultContent {
result := fantasy.ToolResultContent{
ToolCallID: tc.ToolCallID,
ToolName: tc.ToolName,
ProviderExecuted: false,
}
defer func() {
metricLabel := tc.ToolName
if metricLabel == "" {
metricLabel = "unknown"
}
metrics.ToolResultSizeBytes.WithLabelValues(provider, model, metricLabel).Observe(
float64(ToolResultSize(result)),
)
if _, ok := result.Result.(fantasy.ToolResultOutputContentError); ok {
metrics.RecordToolError(provider, model, metricLabel)
}
}()
_, isProviderRunner := providerRunnerNames[tc.ToolName]
if !isProviderRunner && !isToolActive(tc.ToolName, activeTools) {
result.Result = fantasy.ToolResultOutputContentError{
Error: xerrors.New("Tool not active in this turn: " + tc.ToolName),
}
return result
}
tool, exists := toolMap[tc.ToolName]
if !exists {
result.Result = fantasy.ToolResultOutputContentError{
Error: xerrors.New("Tool not found: " + tc.ToolName),
}
return result
}
logger.Debug(ctx, "tool execution",
slog.F("tool_name", tc.ToolName),
slog.F("tool_call_id", tc.ToolCallID),
slog.F("builtin", builtinToolNames[tc.ToolName]),
slog.F("is_provider_runner", isProviderRunner),
)
resp, err := tool.Run(ctx, fantasy.ToolCall{
ID: tc.ToolCallID,
Name: tc.ToolName,
Input: tc.Input,
})
if err != nil {
result.Result = fantasy.ToolResultOutputContentError{
Error: err,
}
result.ClientMetadata = resp.Metadata
logger.Error(ctx, "tool execution failed",
slog.F("tool_name", tc.ToolName),
slog.F("tool_call_id", tc.ToolCallID),
slog.Error(err),
)
return result
}
result.ClientMetadata = resp.Metadata
switch {
case resp.IsError:
result.Result = fantasy.ToolResultOutputContentError{
Error: xerrors.New(resp.Content),
}
logger.Info(ctx, "tool returned error result",
slog.F("tool_name", tc.ToolName),
slog.F("tool_call_id", tc.ToolCallID),
slog.F("tool_error", resp.Content),
)
case resp.Type == "image" || resp.Type == "media":
result.Result = fantasy.ToolResultOutputContentMedia{
Data: base64.StdEncoding.EncodeToString(resp.Data),
MediaType: resp.MediaType,
Text: strings.ToValidUTF8(resp.Content, "\uFFFD"),
}
default:
result.Result = fantasy.ToolResultOutputContentText{
Text: strings.ToValidUTF8(resp.Content, "\uFFFD"),
}
}
return result
}
// flushActiveState moves any in-progress text, reasoning, and
// tool calls from the active tracking maps into result.content
// and result.toolCalls. This is called on interruption so that
// partial content from an incomplete stream is available for
// persistence.
func flushActiveState(
result *stepResult,
activeText map[string]string,
activeReasoning map[string]reasoningState,
activeToolCalls map[string]*fantasy.ToolCallContent,
toolNames map[string]string,
) {
// Flush partial text content.
for _, text := range activeText {
if text != "" {
result.content = append(result.content, fantasy.TextContent{Text: text})
}
}
// Flush partial reasoning content.
for _, rs := range activeReasoning {
if rs.text != "" {
result.content = append(result.content, fantasy.ReasoningContent{
Text: rs.text,
ProviderMetadata: rs.options,
})
}
}
// Flush in-progress tool calls. These haven't received a
// StreamPartTypeToolCall yet, so they only exist in
// activeToolCalls. We add them to both content and toolCalls
// so persistInterruptedStep can generate synthetic error
// results for them.
for id, tc := range activeToolCalls {
if tc == nil {
continue
}
// Prefer the tool name from the toolNames map since
// ToolInputStart may provide a cleaner name.
toolName := tc.ToolName
if name, ok := toolNames[id]; ok && strings.TrimSpace(name) != "" {
toolName = name
}
flushed := fantasy.ToolCallContent{
ToolCallID: tc.ToolCallID,
ToolName: toolName,
Input: tc.Input,
ProviderExecuted: tc.ProviderExecuted,
}
result.content = append(result.content, flushed)
result.toolCalls = append(result.toolCalls, flushed)
}
}
// persistInterruptedStep saves durable content from a partial stream.
// Provider-executed calls without results are removed because their
// result metadata cannot be synthesized safely.
func persistInterruptedStep(
ctx context.Context,
opts RunOptions,
result *stepResult,
) {
if result == nil || (len(result.content) == 0 && len(result.toolCalls) == 0) {
return
}
provider := ""
modelName := ""
if opts.Model != nil {
provider = opts.Model.Provider()
modelName = opts.Model.Model()
}
var sanitizeStats chatsanitize.AnthropicProviderToolSanitizationStats
result.content, sanitizeStats = chatsanitize.SanitizeAnthropicProviderToolContent(provider, result.content)
chatsanitize.LogAnthropicProviderToolSanitization(
ctx, opts.Logger, "interrupted_persist", provider, modelName, sanitizeStats,
)
// Track which tool calls already have results in the content.
answeredToolCalls := make(map[string]struct{})
for _, c := range result.content {
tr, ok := fantasy.AsContentType[fantasy.ToolResultContent](c)
if ok && tr.ToolCallID != "" {
answeredToolCalls[tr.ToolCallID] = struct{}{}
}
}
// Copy existing timestamps and add result timestamps for
// interrupted tool calls so the frontend can show partial
// duration.
toolCallCreatedAt := maps.Clone(result.toolCallCreatedAt)
if toolCallCreatedAt == nil {
toolCallCreatedAt = make(map[string]time.Time)
}
toolResultCreatedAt := maps.Clone(result.toolResultCreatedAt)
if toolResultCreatedAt == nil {
toolResultCreatedAt = make(map[string]time.Time)
}
// Build combined content: all accumulated content + synthetic
// interrupted results for any unanswered tool calls.
content := make([]fantasy.Content, 0, len(result.content))
content = append(content, result.content...)
interruptedAt := dbtime.Now()
for _, tc := range result.toolCalls {
if tc.ToolCallID == "" {
continue
}
if _, exists := answeredToolCalls[tc.ToolCallID]; exists {
continue
}
if chatsanitize.IsAnthropicProviderExecutedToolCall(provider, tc) {
continue
}
content = append(content, fantasy.ToolResultContent{
ToolCallID: tc.ToolCallID,
ToolName: tc.ToolName,
ProviderExecuted: tc.ProviderExecuted,
Result: fantasy.ToolResultOutputContentError{
Error: xerrors.New(interruptedToolResultErrorMessage),
},
})
// Only stamp synthetic results; don't clobber
// timestamps from tools that completed before
// the interruption arrived.
if _, exists := toolResultCreatedAt[tc.ToolCallID]; !exists {
toolResultCreatedAt[tc.ToolCallID] = interruptedAt
}
answeredToolCalls[tc.ToolCallID] = struct{}{}
}
if len(content) == 0 {
return
}
persistCtx := context.WithoutCancel(ctx)
if err := opts.PersistStep(persistCtx, PersistedStep{
Content: content,
ToolCallCreatedAt: toolCallCreatedAt,
ToolResultCreatedAt: toolResultCreatedAt,
}); err != nil {
if opts.OnInterruptedPersistError != nil {
opts.OnInterruptedPersistError(err)
}
}
}
// tryCompactOnExit runs compaction when the chatloop is about
// to exit early (e.g. via ErrDynamicToolCall). The normal
// inline and post-run compaction paths are unreachable in
// early-exit scenarios, so this ensures the context window
// doesn't grow unbounded.
func tryCompactOnExit(
ctx context.Context,
opts RunOptions,
usage fantasy.Usage,
metadata fantasy.ProviderMetadata,
) {
if opts.Compaction == nil || opts.ReloadMessages == nil {
return
}
reloaded, err := opts.ReloadMessages(ctx)
if err != nil {
return
}
did, compactErr := tryCompact(
ctx,
opts.Model,
opts.Compaction,
opts.ContextLimitFallback,
usage,
metadata,
reloaded,
)
opts.Metrics.RecordCompaction(opts.Model.Provider(), opts.Model.Model(), did, compactErr)
if compactErr != nil && opts.Compaction.OnError != nil {
opts.Compaction.OnError(compactErr)
}
}
func isToolActive(name string, activeTools []string) bool {
return len(activeTools) == 0 || slices.Contains(activeTools, name)
}
// buildToolDefinitions converts AgentTool definitions into the
// fantasy.Tool slice expected by fantasy.Call. When activeTools
// is non-empty, only function tools whose name appears in the
// list are included. Provider tool definitions are always
// appended unconditionally.
func buildToolDefinitions(tools []fantasy.AgentTool, activeTools []string, providerTools []ProviderTool) []fantasy.Tool {
prepared := make([]fantasy.Tool, 0, len(tools)+len(providerTools))
for _, tool := range tools {
info := tool.Info()
if !isToolActive(info.Name, activeTools) {
continue
}
inputSchema := map[string]any{
"type": "object",
"properties": info.Parameters,
}
// Only include "required" when non-empty so that a nil slice
// never serializes to null, which OpenAI rejects.
if len(info.Required) > 0 {
inputSchema["required"] = info.Required
}
schema.Normalize(inputSchema)
prepared = append(prepared, fantasy.FunctionTool{
Name: info.Name,
Description: info.Description,
InputSchema: inputSchema,
ProviderOptions: tool.ProviderOptions(),
})
}
for _, pt := range providerTools {
prepared = append(prepared, pt.Definition)
}
return prepared
}
// shouldStopAfterTools returns true if any tool result in the
// slice matches a name in stopTools and produced a successful
// (non-error) result.
func shouldStopAfterTools(stopTools map[string]struct{}, results []fantasy.ToolResultContent) bool {
if len(stopTools) == 0 {
return false
}
for _, tr := range results {
if _, ok := stopTools[tr.ToolName]; !ok {
continue
}
if _, isErr := tr.Result.(fantasy.ToolResultOutputContentError); !isErr {
return true
}
}
return false
}
func shouldApplyAnthropicPromptCaching(model fantasy.LanguageModel) bool {
if model == nil {
return false
}
return model.Provider() == fantasyanthropic.Name
}
// addAnthropicPromptCaching mutates messages in-place, setting
// ProviderOptions for Anthropic prompt caching on the last system
// message and the final two messages.
func addAnthropicPromptCaching(messages []fantasy.Message) {
for i := range messages {
messages[i].ProviderOptions = nil
}
providerOption := fantasy.ProviderOptions{
fantasyanthropic.Name: &fantasyanthropic.ProviderCacheControlOptions{
CacheControl: fantasyanthropic.CacheControl{Type: "ephemeral"},
},
}
lastSystemRoleIdx := -1
systemMessageUpdated := false
for i, msg := range messages {
if msg.Role == fantasy.MessageRoleSystem {
lastSystemRoleIdx = i
} else if !systemMessageUpdated && lastSystemRoleIdx >= 0 {
messages[lastSystemRoleIdx].ProviderOptions = providerOption
systemMessageUpdated = true
}
if i > len(messages)-3 {
messages[i].ProviderOptions = providerOption
}
}
}
// hasPreviousResponseID checks whether the provider options contain
// an OpenAI Responses entry with a non-empty PreviousResponseID.
func hasPreviousResponseID(providerOptions fantasy.ProviderOptions) bool {
if providerOptions == nil {
return false
}
for _, entry := range providerOptions {
if options, ok := entry.(*fantasyopenai.ResponsesProviderOptions); ok {
return options.PreviousResponseID != nil &&
*options.PreviousResponseID != ""
}
}
return false
}
// clearPreviousResponseID removes PreviousResponseID from the OpenAI
// Responses provider options entry, if present.
func clearPreviousResponseID(providerOptions fantasy.ProviderOptions) {
if providerOptions == nil {
return
}
for _, entry := range providerOptions {
if options, ok := entry.(*fantasyopenai.ResponsesProviderOptions); ok {
options.PreviousResponseID = nil
}
}
}
// extractOpenAIResponseID extracts the OpenAI Responses API response
// ID from provider metadata. Returns an empty string if no OpenAI
// Responses metadata is present.
func extractOpenAIResponseID(metadata fantasy.ProviderMetadata) string {
if len(metadata) == 0 {
return ""
}
for _, entry := range metadata {
if providerMetadata, ok := entry.(*fantasyopenai.ResponsesProviderMetadata); ok && providerMetadata != nil {
return providerMetadata.ResponseID
}
}
return ""
}
// extractOpenAIResponseIDIfStored returns the OpenAI response ID
// only when the provider options indicate store=true. Response IDs
// from store=false turns are not persisted server-side and cannot
// be used for chaining.
func extractOpenAIResponseIDIfStored(
providerOptions fantasy.ProviderOptions,
metadata fantasy.ProviderMetadata,
) string {
if !isResponsesStoreEnabled(providerOptions) {
return ""
}
return extractOpenAIResponseID(metadata)
}
// isResponsesStoreEnabled checks whether the OpenAI Responses
// provider options explicitly enable store=true.
func isResponsesStoreEnabled(providerOptions fantasy.ProviderOptions) bool {
if providerOptions == nil {
return false
}
for _, entry := range providerOptions {
if options, ok := entry.(*fantasyopenai.ResponsesProviderOptions); ok {
return options.Store != nil && *options.Store
}
}
return false
}
// recordToolResultTimestamp lazily initializes the
// toolResultCreatedAt map on the stepResult and records
// the completion timestamp for the given tool-call ID.
func recordToolResultTimestamp(result *stepResult, toolCallID string, ts time.Time) {
if result.toolResultCreatedAt == nil {
result.toolResultCreatedAt = make(map[string]time.Time)
}
result.toolResultCreatedAt[toolCallID] = ts
}
func publishToolAttachments(
ctx context.Context,
logger slog.Logger,
tr fantasy.ToolResultContent,
createdAt time.Time,
publishMessagePart func(codersdk.ChatMessageRole, codersdk.ChatMessagePart),
) {
attachments, err := chattool.AttachmentsFromMetadata(tr.ClientMetadata)
if err != nil {
logger.Warn(ctx, "skipping malformed tool attachment metadata",
slog.F("tool_name", tr.ToolName),
slog.F("tool_call_id", tr.ToolCallID),
slog.Error(err),
)
return
}
for _, attachment := range attachments {
filePart := codersdk.ChatMessageFile(
attachment.FileID,
attachment.MediaType,
attachment.Name,
)
filePart.CreatedAt = &createdAt
publishMessagePart(codersdk.ChatMessageRoleAssistant, filePart)
}
}
func extractContextLimit(metadata fantasy.ProviderMetadata) sql.NullInt64 {
if len(metadata) == 0 {
return sql.NullInt64{}
}
encoded, err := json.Marshal(metadata)
if err != nil || len(encoded) == 0 {
return sql.NullInt64{}
}
var payload any
if err := json.Unmarshal(encoded, &payload); err != nil {
return sql.NullInt64{}
}
limit, ok := findContextLimitValue(payload)
if !ok {
return sql.NullInt64{}
}
return sql.NullInt64{
Int64: limit,
Valid: true,
}
}
func extractContextLimitWithFallback(metadata fantasy.ProviderMetadata, fallback int64) sql.NullInt64 {
contextLimit := extractContextLimit(metadata)
if contextLimit.Valid || fallback <= 0 {
return contextLimit
}
return sql.NullInt64{
Int64: fallback,
Valid: true,
}
}
func findContextLimitValue(value any) (int64, bool) {
var (
limit int64
found bool
)
collectContextLimitValues(value, func(candidate int64) {
if !found || candidate > limit {
limit = candidate
found = true
}
})
return limit, found
}
func collectContextLimitValues(value any, onValue func(int64)) {
switch typed := value.(type) {
case map[string]any:
for key, child := range typed {
if isContextLimitKey(key) {
if numeric, ok := numericContextLimitValue(child); ok {
onValue(numeric)
}
}
collectContextLimitValues(child, onValue)
}
case []any:
for _, child := range typed {
collectContextLimitValues(child, onValue)
}
}
}
func isContextLimitKey(key string) bool {
normalized := normalizeMetadataKey(key)
if normalized == "" {
return false
}
switch normalized {
case
"contextlimit",
"contextwindow",
"contextlength",
"maxcontext",
"maxcontexttokens",
"maxinputtokens",
"maxinputtoken",
"inputtokenlimit":
return true
}
words := metadataKeyWords(key)
if !slices.Contains(words, "context") {
return false
}
if slices.Contains(words, "limit") {
return true
}
if slices.Contains(words, "window") {
return slices.Contains(words, "size") || slices.Contains(words, "max")
}
if slices.Contains(words, "length") {
return slices.Contains(words, "max")
}
return (slices.Contains(words, "token") || slices.Contains(words, "tokens")) &&
(slices.Contains(words, "max") || slices.Contains(words, "limit"))
}
func normalizeMetadataKey(key string) string {
var b strings.Builder
b.Grow(len(key))
for _, r := range key {
switch {
case r >= 'a' && r <= 'z':
_, _ = b.WriteRune(r)
case r >= 'A' && r <= 'Z':
_, _ = b.WriteRune(r + ('a' - 'A'))
case r >= '0' && r <= '9':
_, _ = b.WriteRune(r)
}
}
return b.String()
}
func metadataKeyWords(key string) []string {
words := make([]string, 0, 4)
var current strings.Builder
flush := func() {
if current.Len() == 0 {
return
}
words = append(words, current.String())
current.Reset()
}
var prev rune
var hasPrev bool
for _, r := range key {
if !unicode.IsLetter(r) {
flush()
hasPrev = false
continue
}
if hasPrev && unicode.IsUpper(r) && unicode.IsLower(prev) {
flush()
}
_, _ = current.WriteRune(unicode.ToLower(r))
prev = r
hasPrev = true
}
flush()
return words
}
func numericContextLimitValue(value any) (int64, bool) {
switch typed := value.(type) {
case int64:
return positiveInt64(typed)
case int32:
return positiveInt64(int64(typed))
case int:
return positiveInt64(int64(typed))
case float64:
casted := int64(typed)
if typed > 0 && float64(casted) == typed {
return casted, true
}
case string:
parsed, err := strconv.ParseInt(strings.TrimSpace(typed), 10, 64)
if err == nil {
return positiveInt64(parsed)
}
case json.Number:
parsed, err := typed.Int64()
if err == nil {
return positiveInt64(parsed)
}
}
return 0, false
}
func positiveInt64(value int64) (int64, bool) {
if value <= 0 {
return 0, false
}
return value, true
}