mirror of
https://github.com/coder/coder.git
synced 2026-06-04 05:28:20 +00:00
ddfe630757
## Summary Replaces fantasy's `Agent` abstraction with a direct step loop calling `LanguageModel.Stream()`. Fantasy is retained as the provider abstraction layer (streaming parsers, types, tool schema) but we no longer use `fantasy.Agent`, `AgentStreamCall`, `AgentResult`, or `StepResult`. ## Problems solved | Problem | Before | After | |---|---|---| | **Sentinel prompt hack** | fantasy.Agent requires non-empty Prompt → UUID sentinel generated and stripped in PrepareStep | Messages passed directly to `model.Stream()` | | **Discarded PersistStep errors** | `_ = opts.OnStepFinish(result)` silently swallows errors | Errors propagate directly from `PersistStep()` | | **Shadow draft state** | ~160 LOC tracking content in parallel because fantasy doesn't expose in-progress content on interruption | `stepResult` owns content directly; `flushActiveState()` is trivial | | **Nested retry layers** | fantasy's 2-attempt retry nested inside chatretry's indefinite retry | Single `chatretry.Retry` layer | | **Callback-mediated compaction** | Mutex + boolean flag + coordination between OnStepFinish/PrepareStep callbacks | Inline `if` statement between steps | | **Duplicate compaction paths** | `compactStep()` + `maybeCompact()` sharing ~80% logic | Single `tryCompact()` function | ## Changes ### `coderd/chatd/chatloop/chatloop.go` — Rewritten - **Removed**: `fantasy.NewAgent()`, `AgentStreamCall`, sentinel prompt, shadow draft state (~160 LOC of closures), `compactedMu`/`compacted` flag, `PrepareStepResult` - **Added**: `stepResult` struct, `processStepStream()` (stream consumer), `executeTools()` (sequential tool execution), `flushActiveState()` (interrupt handling), `buildToolDefinitions()`, `toResponseMessages()` - **Changed**: `Run()` return type from `(*fantasy.AgentResult, error)` to `error` (callers already discarded the result) - **Preserved**: Anthropic prompt caching, reasoning title extraction, `extractContextLimit()`, `ErrInterrupted` semantics ### `coderd/chatd/chatloop/compaction.go` — Simplified - Merged `compactStep()` + `maybeCompact()` → single `tryCompact()` - Removed `[]StepResult` parameter from `generateCompactionSummary()` (caller provides complete message list) - Kept helper functions: `normalizedCompactionConfig`, `contextTokensFromUsage`, `resolveContextLimit`, `shouldCompact` ### `coderd/chatd/chatd.go` — Caller updates - Removed `AgentStreamCall` construction - Changed `_, err = chatloop.Run(...)` to `err = chatloop.Run(...)` - Model parameters moved from `AgentStreamCall` fields to `RunOptions` fields ### Tests — 4 new tests - `MidLoopCompactionReloadsMessages` — compaction fires mid-loop, messages reloaded - `PostRunCompactionSkippedAfterMidLoop` — no double compaction - `MultiStepToolExecution` — tools execute between steps, results feed next step - `PersistStepErrorPropagates` — persistence errors propagate (was silently discarded)
466 lines
13 KiB
Go
466 lines
13 KiB
Go
package chatloop //nolint:testpackage // Uses internal symbols.
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
|
|
"charm.land/fantasy"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/coder/coder/v2/codersdk"
|
|
)
|
|
|
|
func TestRun_Compaction(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("PersistsWhenThresholdReached", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
persistCompactionCalls := 0
|
|
var persistedCompaction CompactionResult
|
|
const summaryText = "summary text for compaction"
|
|
|
|
model := &loopTestModel{
|
|
provider: "fake",
|
|
streamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{Type: fantasy.StreamPartTypeTextStart, ID: "text-1"},
|
|
{Type: fantasy.StreamPartTypeTextDelta, ID: "text-1", Delta: "done"},
|
|
{Type: fantasy.StreamPartTypeTextEnd, ID: "text-1"},
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonStop,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 80,
|
|
TotalTokens: 85,
|
|
},
|
|
},
|
|
}), nil
|
|
},
|
|
generateFn: func(_ context.Context, call fantasy.Call) (*fantasy.Response, error) {
|
|
require.NotEmpty(t, call.Prompt)
|
|
lastPrompt := call.Prompt[len(call.Prompt)-1]
|
|
require.Equal(t, fantasy.MessageRoleUser, lastPrompt.Role)
|
|
require.Len(t, lastPrompt.Content, 1)
|
|
|
|
instruction, ok := fantasy.AsMessagePart[fantasy.TextPart](lastPrompt.Content[0])
|
|
require.True(t, ok)
|
|
require.Equal(t, "summarize now", instruction.Text)
|
|
|
|
return &fantasy.Response{
|
|
Content: []fantasy.Content{
|
|
fantasy.TextContent{Text: summaryText},
|
|
},
|
|
}, nil
|
|
},
|
|
}
|
|
|
|
err := Run(context.Background(), RunOptions{
|
|
Model: model,
|
|
Messages: []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleUser, "hello"),
|
|
},
|
|
MaxSteps: 1,
|
|
PersistStep: func(_ context.Context, _ PersistedStep) error {
|
|
return nil
|
|
},
|
|
ContextLimitFallback: 100,
|
|
Compaction: &CompactionOptions{
|
|
ThresholdPercent: 70,
|
|
SummaryPrompt: "summarize now",
|
|
Persist: func(_ context.Context, result CompactionResult) error {
|
|
persistCompactionCalls++
|
|
persistedCompaction = result
|
|
return nil
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, persistCompactionCalls)
|
|
require.Contains(t, persistedCompaction.SystemSummary, summaryText)
|
|
require.Equal(t, summaryText, persistedCompaction.SummaryReport)
|
|
require.Equal(t, int64(80), persistedCompaction.ContextTokens)
|
|
require.Equal(t, int64(100), persistedCompaction.ContextLimit)
|
|
require.InDelta(t, 80.0, persistedCompaction.UsagePercent, 0.0001)
|
|
})
|
|
|
|
t.Run("PublishesPartsBeforeAndAfterPersist", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
const summaryText = "compaction summary for ordering test"
|
|
|
|
// Track the order of callbacks to verify the tool-call
|
|
// part publishes before Generate (summary generation)
|
|
// and the tool-result part publishes after Persist.
|
|
var callOrder []string
|
|
|
|
model := &loopTestModel{
|
|
provider: "fake",
|
|
streamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{Type: fantasy.StreamPartTypeTextStart, ID: "text-1"},
|
|
{Type: fantasy.StreamPartTypeTextDelta, ID: "text-1", Delta: "done"},
|
|
{Type: fantasy.StreamPartTypeTextEnd, ID: "text-1"},
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonStop,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 80,
|
|
TotalTokens: 85,
|
|
},
|
|
},
|
|
}), nil
|
|
},
|
|
generateFn: func(_ context.Context, _ fantasy.Call) (*fantasy.Response, error) {
|
|
callOrder = append(callOrder, "generate")
|
|
return &fantasy.Response{
|
|
Content: []fantasy.Content{
|
|
fantasy.TextContent{Text: summaryText},
|
|
},
|
|
}, nil
|
|
},
|
|
}
|
|
|
|
err := Run(context.Background(), RunOptions{
|
|
Model: model,
|
|
Messages: []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleUser, "hello"),
|
|
},
|
|
MaxSteps: 1,
|
|
PersistStep: func(_ context.Context, _ PersistedStep) error {
|
|
return nil
|
|
},
|
|
ContextLimitFallback: 100,
|
|
Compaction: &CompactionOptions{
|
|
ThresholdPercent: 70,
|
|
SummaryPrompt: "summarize now",
|
|
ToolCallID: "test-tool-call-id",
|
|
ToolName: "chat_summarized",
|
|
PublishMessagePart: func(role fantasy.MessageRole, part codersdk.ChatMessagePart) {
|
|
switch part.Type {
|
|
case codersdk.ChatMessagePartTypeToolCall:
|
|
callOrder = append(callOrder, "publish_tool_call")
|
|
case codersdk.ChatMessagePartTypeToolResult:
|
|
callOrder = append(callOrder, "publish_tool_result")
|
|
}
|
|
},
|
|
Persist: func(_ context.Context, _ CompactionResult) error {
|
|
callOrder = append(callOrder, "persist")
|
|
return nil
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, []string{
|
|
"publish_tool_call",
|
|
"generate",
|
|
"persist",
|
|
"publish_tool_result",
|
|
}, callOrder)
|
|
})
|
|
|
|
t.Run("PublishNotCalledBelowThreshold", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
publishCalled := false
|
|
|
|
model := &loopTestModel{
|
|
provider: "fake",
|
|
streamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonStop,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 10,
|
|
},
|
|
},
|
|
}), nil
|
|
},
|
|
}
|
|
|
|
err := Run(context.Background(), RunOptions{
|
|
Model: model,
|
|
Messages: []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleUser, "hello"),
|
|
},
|
|
MaxSteps: 1,
|
|
PersistStep: func(_ context.Context, _ PersistedStep) error {
|
|
return nil
|
|
},
|
|
ContextLimitFallback: 100,
|
|
Compaction: &CompactionOptions{
|
|
ThresholdPercent: 70,
|
|
ToolCallID: "test-tool-call-id",
|
|
ToolName: "chat_summarized",
|
|
PublishMessagePart: func(_ fantasy.MessageRole, _ codersdk.ChatMessagePart) {
|
|
publishCalled = true
|
|
},
|
|
Persist: func(_ context.Context, _ CompactionResult) error {
|
|
return nil
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.False(t, publishCalled, "PublishMessagePart should not fire when usage is below threshold")
|
|
})
|
|
|
|
t.Run("MidLoopCompactionReloadsMessages", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var mu sync.Mutex
|
|
var streamCallCount int
|
|
persistCompactionCalls := 0
|
|
reloadCalls := 0
|
|
|
|
const summaryText = "compacted summary"
|
|
|
|
model := &loopTestModel{
|
|
provider: "fake",
|
|
streamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
|
|
mu.Lock()
|
|
step := streamCallCount
|
|
streamCallCount++
|
|
mu.Unlock()
|
|
|
|
switch step {
|
|
case 0:
|
|
// Step 0: tool call with high usage (80/100 = 80% > 70%).
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{Type: fantasy.StreamPartTypeToolInputStart, ID: "tc-1", ToolCallName: "read_file"},
|
|
{Type: fantasy.StreamPartTypeToolInputDelta, ID: "tc-1", Delta: `{}`},
|
|
{Type: fantasy.StreamPartTypeToolInputEnd, ID: "tc-1"},
|
|
{
|
|
Type: fantasy.StreamPartTypeToolCall,
|
|
ID: "tc-1",
|
|
ToolCallName: "read_file",
|
|
ToolCallInput: `{}`,
|
|
},
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonToolCalls,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 80,
|
|
TotalTokens: 85,
|
|
},
|
|
},
|
|
}), nil
|
|
default:
|
|
// Step 1: text with low usage (30/100 = 30% < 70%).
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{Type: fantasy.StreamPartTypeTextStart, ID: "text-1"},
|
|
{Type: fantasy.StreamPartTypeTextDelta, ID: "text-1", Delta: "done"},
|
|
{Type: fantasy.StreamPartTypeTextEnd, ID: "text-1"},
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonStop,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 30,
|
|
TotalTokens: 35,
|
|
},
|
|
},
|
|
}), nil
|
|
}
|
|
},
|
|
generateFn: func(_ context.Context, _ fantasy.Call) (*fantasy.Response, error) {
|
|
return &fantasy.Response{
|
|
Content: []fantasy.Content{
|
|
fantasy.TextContent{Text: summaryText},
|
|
},
|
|
}, nil
|
|
},
|
|
}
|
|
|
|
compactedMessages := []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleSystem, "compacted system"),
|
|
textMessage(fantasy.MessageRoleUser, "compacted user"),
|
|
}
|
|
|
|
err := Run(context.Background(), RunOptions{
|
|
Model: model,
|
|
Messages: []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleUser, "hello"),
|
|
},
|
|
Tools: []fantasy.AgentTool{
|
|
newNoopTool("read_file"),
|
|
},
|
|
MaxSteps: 5,
|
|
PersistStep: func(_ context.Context, _ PersistedStep) error {
|
|
return nil
|
|
},
|
|
ContextLimitFallback: 100,
|
|
Compaction: &CompactionOptions{
|
|
ThresholdPercent: 70,
|
|
SummaryPrompt: "summarize now",
|
|
Persist: func(_ context.Context, _ CompactionResult) error {
|
|
persistCompactionCalls++
|
|
return nil
|
|
},
|
|
},
|
|
ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) {
|
|
reloadCalls++
|
|
return compactedMessages, nil
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Compaction fired after step 0 (above threshold).
|
|
require.GreaterOrEqual(t, persistCompactionCalls, 1)
|
|
// ReloadMessages was called after mid-loop compaction.
|
|
require.GreaterOrEqual(t, reloadCalls, 1)
|
|
// Both steps ran (tool-call step + follow-up text step).
|
|
require.Equal(t, 2, streamCallCount)
|
|
})
|
|
|
|
t.Run("PostRunCompactionSkippedAfterMidLoop", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var mu sync.Mutex
|
|
var streamCallCount int
|
|
persistCompactionCalls := 0
|
|
|
|
const summaryText = "compacted summary for skip test"
|
|
|
|
model := &loopTestModel{
|
|
provider: "fake",
|
|
streamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
|
|
mu.Lock()
|
|
step := streamCallCount
|
|
streamCallCount++
|
|
mu.Unlock()
|
|
|
|
switch step {
|
|
case 0:
|
|
// Step 0: tool call with high usage (80/100 = 80% > 70%).
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{Type: fantasy.StreamPartTypeToolInputStart, ID: "tc-1", ToolCallName: "read_file"},
|
|
{Type: fantasy.StreamPartTypeToolInputDelta, ID: "tc-1", Delta: `{}`},
|
|
{Type: fantasy.StreamPartTypeToolInputEnd, ID: "tc-1"},
|
|
{
|
|
Type: fantasy.StreamPartTypeToolCall,
|
|
ID: "tc-1",
|
|
ToolCallName: "read_file",
|
|
ToolCallInput: `{}`,
|
|
},
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonToolCalls,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 80,
|
|
TotalTokens: 85,
|
|
},
|
|
},
|
|
}), nil
|
|
default:
|
|
// Step 1: text with low usage (20/100 = 20% < 70%).
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{Type: fantasy.StreamPartTypeTextStart, ID: "text-1"},
|
|
{Type: fantasy.StreamPartTypeTextDelta, ID: "text-1", Delta: "done"},
|
|
{Type: fantasy.StreamPartTypeTextEnd, ID: "text-1"},
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonStop,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 20,
|
|
TotalTokens: 25,
|
|
},
|
|
},
|
|
}), nil
|
|
}
|
|
},
|
|
generateFn: func(_ context.Context, _ fantasy.Call) (*fantasy.Response, error) {
|
|
return &fantasy.Response{
|
|
Content: []fantasy.Content{
|
|
fantasy.TextContent{Text: summaryText},
|
|
},
|
|
}, nil
|
|
},
|
|
}
|
|
|
|
compactedMessages := []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleSystem, "compacted system"),
|
|
textMessage(fantasy.MessageRoleUser, "compacted user"),
|
|
}
|
|
|
|
err := Run(context.Background(), RunOptions{
|
|
Model: model,
|
|
Messages: []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleUser, "hello"),
|
|
},
|
|
Tools: []fantasy.AgentTool{
|
|
newNoopTool("read_file"),
|
|
},
|
|
MaxSteps: 5,
|
|
PersistStep: func(_ context.Context, _ PersistedStep) error {
|
|
return nil
|
|
},
|
|
ContextLimitFallback: 100,
|
|
Compaction: &CompactionOptions{
|
|
ThresholdPercent: 70,
|
|
SummaryPrompt: "summarize now",
|
|
Persist: func(_ context.Context, _ CompactionResult) error {
|
|
persistCompactionCalls++
|
|
return nil
|
|
},
|
|
},
|
|
ReloadMessages: func(_ context.Context) ([]fantasy.Message, error) {
|
|
return compactedMessages, nil
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Only mid-loop compaction fires after step 0. The post-run
|
|
// safety net is skipped because alreadyCompacted is true.
|
|
require.Equal(t, 1, persistCompactionCalls)
|
|
})
|
|
|
|
t.Run("ErrorsAreReported", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
model := &loopTestModel{
|
|
provider: "fake",
|
|
streamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
|
|
return streamFromParts([]fantasy.StreamPart{
|
|
{
|
|
Type: fantasy.StreamPartTypeFinish,
|
|
FinishReason: fantasy.FinishReasonStop,
|
|
Usage: fantasy.Usage{
|
|
InputTokens: 80,
|
|
},
|
|
},
|
|
}), nil
|
|
},
|
|
generateFn: func(_ context.Context, _ fantasy.Call) (*fantasy.Response, error) {
|
|
return nil, xerrors.New("generate failed")
|
|
},
|
|
}
|
|
|
|
compactionErr := xerrors.New("unset")
|
|
err := Run(context.Background(), RunOptions{
|
|
Model: model,
|
|
Messages: []fantasy.Message{
|
|
textMessage(fantasy.MessageRoleUser, "hello"),
|
|
},
|
|
MaxSteps: 1,
|
|
PersistStep: func(_ context.Context, _ PersistedStep) error {
|
|
return nil
|
|
},
|
|
ContextLimitFallback: 100,
|
|
Compaction: &CompactionOptions{
|
|
ThresholdPercent: 70,
|
|
Persist: func(_ context.Context, _ CompactionResult) error {
|
|
return nil
|
|
},
|
|
OnError: func(err error) {
|
|
compactionErr = err
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Error(t, compactionErr)
|
|
require.ErrorContains(t, compactionErr, "generate summary text")
|
|
})
|
|
}
|