feat: label chatd metrics by model, add stream-state diagnostics (#24475)

Adds production-observability metrics to coderd/x/chatd/ for
model-level correlation and a chatStreams memory-leak investigation.

- Label per-request chatd metrics (steps_total, message_count,
  prompt_size_bytes, tool_result_size_bytes, ttft_seconds,
  compaction_total) with `model` and enrich the per-turn logger
  with provider/model.
- Add `coderd_chatd_stream_retries_total{provider, model, kind}`
  counter incremented in chatloop before OnRetry.
- Register a prometheus.Collector exposing `streams_active`,
  `stream_buffer_size_max`, `stream_buffer_events`,
  `stream_subscribers` from p.chatStreams.
- Add `coderd_chatd_stream_buffer_dropped_total` counter,
  incremented per publishToStream drop independently of the
  existing log-rate-limited bufferDropCount.
- Snapshot logger/model before the title-generation goroutine to
  avoid a data race with the logger/model rebind below it.

> 🤖
This commit is contained in:
Cian Johnston
2026-04-17 16:16:30 +01:00
committed by GitHub
parent 91f9de27a1
commit 4b585465b8
7 changed files with 690 additions and 73 deletions
+75 -3
View File
@@ -737,6 +737,66 @@ func (s *chatStreamState) resetDropCounters() {
s.subscriberLastWarnAt = time.Time{}
}
// streamStateCollector exposes scrape-time gauges derived from
// p.chatStreams. Scrape cost is O(n) with a brief per-state mutex
// held for two len() reads; acceptable at typical scrape cadences.
type streamStateCollector struct {
server *Server
}
var (
streamsActiveDesc = prometheus.NewDesc(
"coderd_chatd_streams_active",
"Current number of chat stream state entries (in-flight plus retained).",
nil, nil,
)
streamBufferSizeMaxDesc = prometheus.NewDesc(
"coderd_chatd_stream_buffer_size_max",
"Maximum current buffer length across all chat streams.",
nil, nil,
)
streamBufferEventsDesc = prometheus.NewDesc(
"coderd_chatd_stream_buffer_events",
"Sum of current buffer lengths across all chat streams.",
nil, nil,
)
streamSubscribersDesc = prometheus.NewDesc(
"coderd_chatd_stream_subscribers",
"Current number of chat stream subscribers across all chat streams.",
nil, nil,
)
)
func (*streamStateCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- streamsActiveDesc
ch <- streamBufferSizeMaxDesc
ch <- streamBufferEventsDesc
ch <- streamSubscribersDesc
}
func (c *streamStateCollector) Collect(ch chan<- prometheus.Metric) {
var active, totalEvents, maxBufLen, totalSubs int
c.server.chatStreams.Range(func(_, v any) bool {
state, ok := v.(*chatStreamState)
if !ok {
return true
}
active++
state.mu.Lock()
bufLen := len(state.buffer)
subs := len(state.subscribers)
state.mu.Unlock()
totalEvents += bufLen
totalSubs += subs
maxBufLen = max(maxBufLen, bufLen)
return true
})
ch <- prometheus.MustNewConstMetric(streamsActiveDesc, prometheus.GaugeValue, float64(active))
ch <- prometheus.MustNewConstMetric(streamBufferSizeMaxDesc, prometheus.GaugeValue, float64(maxBufLen))
ch <- prometheus.MustNewConstMetric(streamBufferEventsDesc, prometheus.GaugeValue, float64(totalEvents))
ch <- prometheus.MustNewConstMetric(streamSubscribersDesc, prometheus.GaugeValue, float64(totalSubs))
}
// MaxQueueSize is the maximum number of queued user messages per chat.
const MaxQueueSize = 20
@@ -2796,6 +2856,7 @@ func New(cfg Config) *Server {
}
if cfg.PrometheusRegistry != nil {
p.metrics = chatloop.NewMetrics(cfg.PrometheusRegistry)
cfg.PrometheusRegistry.MustRegister(&streamStateCollector{server: p})
} else {
p.metrics = chatloop.NopMetrics()
}
@@ -2952,6 +3013,7 @@ func (p *Server) publishToStream(chatID uuid.UUID, event codersdk.ChatStreamEven
return
}
if len(state.buffer) >= maxStreamBufferSize {
p.metrics.RecordStreamBufferDropped()
state.bufferDropCount++
now := p.clock.Now()
if now.Sub(state.bufferLastWarnAt) >= streamDropWarnInterval {
@@ -4880,9 +4942,10 @@ func (p *Server) runChat(
// Fire title generation asynchronously so it doesn't block the
// chat response. It uses a detached context so it can finish
// even after the chat processing context is canceled.
// Snapshot the original chat model so the goroutine doesn't
// race with the model = cuModel reassignment below.
// Snapshot model and logger before launch; both get
// reassigned below and the goroutine captures by reference.
titleModel := result.PushSummaryModel
titleLogger := logger
p.inflight.Add(1)
go func() {
defer p.inflight.Done()
@@ -4893,7 +4956,7 @@ func (p *Server) runChat(
titleModel,
providerKeys,
generatedTitle,
logger,
titleLogger,
)
}()
@@ -5475,6 +5538,14 @@ func (p *Server) runChat(
model = cuModel
}
// Enrich the scoped logger with provider/model for this turn.
// Bound once after the cuModel swap; slog.Logger.With appends
// rather than deduping.
logger = logger.With(
slog.F("provider", model.Provider()),
slog.F("model", model.Model()),
)
allowAskUserQuestion := isPlanModeTurn && isRootChat
tools := []fantasy.AgentTool{
chattool.ReadFile(chattool.ReadFileOptions{
@@ -5725,6 +5796,7 @@ func (p *Server) runChat(
logger.Warn(ctx, "retrying LLM stream",
slog.F("attempt", attempt),
slog.F("delay", delay.String()),
slog.F("kind", classified.Kind),
slog.Error(retryErr),
)
payload := chaterror.StreamRetryPayload(attempt, delay, classified)
+21 -14
View File
@@ -371,7 +371,8 @@ func Run(ctx context.Context, opts RunOptions) error {
for step := 0; totalSteps < opts.MaxSteps; step++ {
totalSteps++
provider := opts.Model.Provider()
opts.Metrics.StepsTotal.WithLabelValues(provider).Inc()
modelName := opts.Model.Model()
opts.Metrics.StepsTotal.WithLabelValues(provider, modelName).Inc()
stepStart := time.Now()
// Copy messages so that provider-specific caching
// mutations don't leak back to the caller's slice.
@@ -388,8 +389,8 @@ func Run(ctx context.Context, opts RunOptions) error {
if applyAnthropicCaching {
addAnthropicPromptCaching(prepared)
}
opts.Metrics.MessageCount.WithLabelValues(provider).Observe(float64(len(prepared)))
opts.Metrics.PromptSizeBytes.WithLabelValues(provider).Observe(float64(EstimatePromptSize(prepared)))
opts.Metrics.MessageCount.WithLabelValues(provider, modelName).Observe(float64(len(prepared)))
opts.Metrics.PromptSizeBytes.WithLabelValues(provider, modelName).Observe(float64(EstimatePromptSize(prepared)))
call := fantasy.Call{
Prompt: prepared,
@@ -408,6 +409,7 @@ func Run(ctx context.Context, opts RunOptions) error {
attempt, streamErr := guardedStream(
retryCtx,
provider,
modelName,
opts.Clock,
opts.StartupTimeout,
func(attemptCtx context.Context) (fantasy.StreamResponse, error) {
@@ -435,8 +437,13 @@ func Run(ctx context.Context, opts RunOptions) error {
// Reset result from the failed attempt so the next
// attempt starts clean.
result = stepResult{}
// Record before OnRetry so a panicking callback can't
// drop the sample. The metric's provider label comes
// from the outer local; WithProvider only affects the
// classified payload handed to OnRetry.
classified = classified.WithProvider(provider)
opts.Metrics.RecordStreamRetry(provider, modelName, classified)
if opts.OnRetry != nil {
classified = classified.WithProvider(opts.Model.Provider())
opts.OnRetry(attempt, retryErr, classified, delay)
}
})
@@ -481,7 +488,7 @@ func Run(ctx context.Context, opts RunOptions) error {
}
// Execute only built-in tools.
toolResults = executeTools(ctx, opts.Tools, opts.ActiveTools, opts.ProviderTools, builtinCalls, opts.Metrics, provider, opts.BuiltinToolNames, func(tr fantasy.ToolResultContent, completedAt time.Time) {
toolResults = executeTools(ctx, opts.Tools, opts.ActiveTools, opts.ProviderTools, builtinCalls, opts.Metrics, provider, modelName, opts.BuiltinToolNames, func(tr fantasy.ToolResultContent, completedAt time.Time) {
recordToolResultTimestamp(&result, tr.ToolCallID, completedAt)
ssePart := chatprompt.PartFromContent(tr)
ssePart.CreatedAt = &completedAt
@@ -625,7 +632,7 @@ func Run(ctx context.Context, opts RunOptions) error {
result.providerMetadata,
messages,
)
opts.Metrics.RecordCompaction(opts.Model.Provider(), did, compactErr)
opts.Metrics.RecordCompaction(provider, modelName, did, compactErr)
if compactErr != nil && opts.Compaction.OnError != nil {
opts.Compaction.OnError(compactErr)
}
@@ -667,7 +674,7 @@ func Run(ctx context.Context, opts RunOptions) error {
lastProviderMetadata,
messages,
)
opts.Metrics.RecordCompaction(opts.Model.Provider(), did, err)
opts.Metrics.RecordCompaction(opts.Model.Provider(), opts.Model.Model(), did, err)
if err != nil {
if opts.Compaction.OnError != nil {
opts.Compaction.OnError(err)
@@ -762,7 +769,7 @@ func classifyStartupTimeout(
func guardedStream(
parent context.Context,
provider string,
provider, model string,
clock quartz.Clock,
timeout time.Duration,
openStream func(context.Context) (fantasy.StreamResponse, error),
@@ -787,7 +794,7 @@ func guardedStream(
}
recordTTFT := sync.OnceFunc(func() {
metrics.TTFTSeconds.WithLabelValues(provider).Observe(
metrics.TTFTSeconds.WithLabelValues(provider, model).Observe(
clock.Since(streamStart).Seconds(),
)
})
@@ -1041,7 +1048,7 @@ func executeTools(
providerTools []ProviderTool,
toolCalls []fantasy.ToolCallContent,
metrics *Metrics,
provider string,
provider, model string,
builtinToolNames map[string]bool,
onResult func(fantasy.ToolResultContent, time.Time),
) []fantasy.ToolResultContent {
@@ -1100,7 +1107,7 @@ func executeTools(
// accurate individual completion times.
completedAt[i] = dbtime.Now()
}()
results[i] = executeSingleTool(ctx, toolMap, tc, metrics, provider, builtinToolNames, activeTools, providerRunnerNames)
results[i] = executeSingleTool(ctx, toolMap, tc, metrics, provider, model, builtinToolNames, activeTools, providerRunnerNames)
}()
}
wg.Wait()
@@ -1122,7 +1129,7 @@ func executeSingleTool(
toolMap map[string]fantasy.AgentTool,
tc fantasy.ToolCallContent,
metrics *Metrics,
provider string,
provider, model string,
builtinToolNames map[string]bool,
activeTools []string,
providerRunnerNames map[string]struct{},
@@ -1137,7 +1144,7 @@ func executeSingleTool(
if !builtinToolNames[tc.ToolName] {
toolLabel = "mcp"
}
metrics.ToolResultSizeBytes.WithLabelValues(provider, toolLabel).Observe(
metrics.ToolResultSizeBytes.WithLabelValues(provider, model, toolLabel).Observe(
float64(ToolResultSize(result)),
)
}()
@@ -1347,7 +1354,7 @@ func tryCompactOnExit(
metadata,
reloaded,
)
opts.Metrics.RecordCompaction(opts.Model.Provider(), did, compactErr)
opts.Metrics.RecordCompaction(opts.Model.Provider(), opts.Model.Model(), did, compactErr)
if compactErr != nil && opts.Compaction.OnError != nil {
opts.Compaction.OnError(compactErr)
}
+52 -17
View File
@@ -7,6 +7,8 @@ import (
"charm.land/fantasy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/coder/coder/v2/coderd/x/chatd/chaterror"
)
const (
@@ -25,13 +27,15 @@ const (
// Metrics holds Prometheus metrics for the chatd subsystem.
type Metrics struct {
Chats *prometheus.GaugeVec
MessageCount *prometheus.HistogramVec
PromptSizeBytes *prometheus.HistogramVec
ToolResultSizeBytes *prometheus.HistogramVec
TTFTSeconds *prometheus.HistogramVec
CompactionTotal *prometheus.CounterVec
StepsTotal *prometheus.CounterVec
Chats *prometheus.GaugeVec
MessageCount *prometheus.HistogramVec
PromptSizeBytes *prometheus.HistogramVec
ToolResultSizeBytes *prometheus.HistogramVec
TTFTSeconds *prometheus.HistogramVec
CompactionTotal *prometheus.CounterVec
StepsTotal *prometheus.CounterVec
StreamRetriesTotal *prometheus.CounterVec
StreamBufferDroppedTotal prometheus.Counter
}
// NewMetrics creates a new Metrics instance registered with the
@@ -51,40 +55,52 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Name: "message_count",
Help: "Number of messages in the prompt per LLM request.",
Buckets: prometheus.ExponentialBuckets(1, 2, 11), // 1, 2, 4, ..., 1024
}, []string{"provider"}),
}, []string{"provider", "model"}),
PromptSizeBytes: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "prompt_size_bytes",
Help: "Estimated byte size of the prompt per LLM request.",
Buckets: prometheus.ExponentialBuckets(1024, 4, 10), // 1KB .. 256MB
}, []string{"provider"}),
}, []string{"provider", "model"}),
ToolResultSizeBytes: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tool_result_size_bytes",
Help: "Size in bytes of each tool execution result.",
Buckets: prometheus.ExponentialBuckets(64, 4, 9), // 64B .. 4MB
}, []string{"provider", "tool_name"}),
}, []string{"provider", "model", "tool_name"}),
TTFTSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "ttft_seconds",
Help: "Time-to-first-token: wall time from LLM request to first streamed chunk.",
Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60},
}, []string{"provider"}),
}, []string{"provider", "model"}),
CompactionTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "compaction_total",
Help: "Total compaction outcomes (only recorded when compaction was triggered or failed).",
}, []string{"provider", "result"}),
}, []string{"provider", "model", "result"}),
StepsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "steps_total",
Help: "Total agentic loop steps across all chats.",
}, []string{"provider"}),
}, []string{"provider", "model"}),
StreamRetriesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "stream_retries_total",
Help: "Total LLM stream retries.",
}, []string{"provider", "model", "kind"}),
StreamBufferDroppedTotal: factory.NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "stream_buffer_dropped_total",
Help: "Number of chat stream buffer events dropped due to the per-chat buffer cap.",
}),
}
}
@@ -96,25 +112,44 @@ func NopMetrics() *Metrics {
// RecordCompaction classifies and records a compaction attempt.
// It is a no-op when m is nil.
func (m *Metrics) RecordCompaction(provider string, compacted bool, err error) {
func (m *Metrics) RecordCompaction(provider, model string, compacted bool, err error) {
if m == nil {
return
}
switch {
case err != nil && errors.Is(err, context.DeadlineExceeded):
m.CompactionTotal.WithLabelValues(provider, CompactionResultTimeout).Inc()
m.CompactionTotal.WithLabelValues(provider, model, CompactionResultTimeout).Inc()
case err != nil && errors.Is(err, context.Canceled):
// User interruption, not a compaction failure.
return
case err != nil:
m.CompactionTotal.WithLabelValues(provider, CompactionResultError).Inc()
m.CompactionTotal.WithLabelValues(provider, model, CompactionResultError).Inc()
case compacted:
m.CompactionTotal.WithLabelValues(provider, CompactionResultSuccess).Inc()
m.CompactionTotal.WithLabelValues(provider, model, CompactionResultSuccess).Inc()
// !compacted && err == nil means threshold not reached -- not
// recorded.
}
}
// RecordStreamRetry increments stream_retries_total. The caller
// must obtain classified via chaterror.Classify (non-empty Kind).
// No-op when m is nil.
func (m *Metrics) RecordStreamRetry(provider, model string, classified chaterror.ClassifiedError) {
if m == nil {
return
}
m.StreamRetriesTotal.WithLabelValues(provider, model, classified.Kind).Inc()
}
// RecordStreamBufferDropped increments stream_buffer_dropped_total
// once per dropped event. No-op when m is nil.
func (m *Metrics) RecordStreamBufferDropped() {
if m == nil {
return
}
m.StreamBufferDroppedTotal.Inc()
}
// EstimatePromptSize returns a cheap byte-size estimate of a
// fantasy prompt by summing the text content lengths of all
// message parts. This avoids JSON marshaling overhead.
+290 -27
View File
@@ -3,14 +3,18 @@ package chatloop_test
import (
"context"
"testing"
"time"
"charm.land/fantasy"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/coderd/x/chatd/chaterror"
"github.com/coder/coder/v2/coderd/x/chatd/chatloop"
"github.com/coder/coder/v2/coderd/x/chatd/chatretry"
"github.com/coder/coder/v2/coderd/x/chatd/chattest"
)
@@ -22,24 +26,30 @@ func TestNewMetrics_RegistersAllMetrics(t *testing.T) {
// Initialize vector metrics so they appear in Gather output.
m.Chats.WithLabelValues(chatloop.StateStreaming)
m.CompactionTotal.WithLabelValues("anthropic", chatloop.CompactionResultSuccess)
m.ToolResultSizeBytes.WithLabelValues("anthropic", "test")
m.MessageCount.WithLabelValues("anthropic")
m.PromptSizeBytes.WithLabelValues("anthropic")
m.TTFTSeconds.WithLabelValues("anthropic")
m.StepsTotal.WithLabelValues("anthropic")
m.CompactionTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", chatloop.CompactionResultSuccess)
m.ToolResultSizeBytes.WithLabelValues("anthropic", "claude-sonnet-4-5", "test")
m.MessageCount.WithLabelValues("anthropic", "claude-sonnet-4-5")
m.PromptSizeBytes.WithLabelValues("anthropic", "claude-sonnet-4-5")
m.TTFTSeconds.WithLabelValues("anthropic", "claude-sonnet-4-5")
m.StepsTotal.WithLabelValues("anthropic", "claude-sonnet-4-5")
m.StreamRetriesTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", chaterror.KindTimeout)
// StreamBufferDroppedTotal is a plain Counter, so it's always present
// in Gather output once registered; no exerciser call is
// needed.
families, err := reg.Gather()
require.NoError(t, err)
expected := map[string]dto.MetricType{
"coderd_chatd_chats": dto.MetricType_GAUGE,
"coderd_chatd_message_count": dto.MetricType_HISTOGRAM,
"coderd_chatd_prompt_size_bytes": dto.MetricType_HISTOGRAM,
"coderd_chatd_tool_result_size_bytes": dto.MetricType_HISTOGRAM,
"coderd_chatd_ttft_seconds": dto.MetricType_HISTOGRAM,
"coderd_chatd_compaction_total": dto.MetricType_COUNTER,
"coderd_chatd_steps_total": dto.MetricType_COUNTER,
"coderd_chatd_chats": dto.MetricType_GAUGE,
"coderd_chatd_message_count": dto.MetricType_HISTOGRAM,
"coderd_chatd_prompt_size_bytes": dto.MetricType_HISTOGRAM,
"coderd_chatd_tool_result_size_bytes": dto.MetricType_HISTOGRAM,
"coderd_chatd_ttft_seconds": dto.MetricType_HISTOGRAM,
"coderd_chatd_compaction_total": dto.MetricType_COUNTER,
"coderd_chatd_steps_total": dto.MetricType_COUNTER,
"coderd_chatd_stream_retries_total": dto.MetricType_COUNTER,
"coderd_chatd_stream_buffer_dropped_total": dto.MetricType_COUNTER,
}
found := make(map[string]dto.MetricType)
@@ -66,14 +76,23 @@ func TestNopMetrics_DoesNotPanic(t *testing.T) {
m.Chats.WithLabelValues("streaming").Dec()
m.Chats.WithLabelValues("waiting").Inc()
m.Chats.WithLabelValues("waiting").Dec()
m.MessageCount.WithLabelValues("anthropic").Observe(10)
m.PromptSizeBytes.WithLabelValues("openai").Observe(4096)
m.ToolResultSizeBytes.WithLabelValues("anthropic", "execute").Observe(512)
m.TTFTSeconds.WithLabelValues("anthropic").Observe(0.5)
m.CompactionTotal.WithLabelValues("anthropic", "success").Inc()
m.CompactionTotal.WithLabelValues("openai", "error").Inc()
m.CompactionTotal.WithLabelValues("google", "timeout").Inc()
m.StepsTotal.WithLabelValues("anthropic").Inc()
m.MessageCount.WithLabelValues("anthropic", "claude-sonnet-4-5").Observe(10)
m.PromptSizeBytes.WithLabelValues("openai", "gpt-5").Observe(4096)
m.ToolResultSizeBytes.WithLabelValues("anthropic", "claude-sonnet-4-5", "execute").Observe(512)
m.TTFTSeconds.WithLabelValues("anthropic", "claude-sonnet-4-5").Observe(0.5)
m.CompactionTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", "success").Inc()
m.CompactionTotal.WithLabelValues("openai", "gpt-5", "error").Inc()
m.CompactionTotal.WithLabelValues("google", "gemini-2.5-pro", "timeout").Inc()
m.StepsTotal.WithLabelValues("anthropic", "claude-sonnet-4-5").Inc()
m.StreamRetriesTotal.WithLabelValues("anthropic", "claude-sonnet-4-5", chaterror.KindTimeout).Inc()
m.StreamBufferDroppedTotal.Inc()
// Nil-receiver guard for RecordStreamRetry and
// RecordStreamBufferDropped mirrors the existing RecordCompaction nil
// guard.
var nilMetrics *chatloop.Metrics
nilMetrics.RecordStreamRetry("anthropic", "claude-sonnet-4-5", chaterror.ClassifiedError{Kind: chaterror.KindTimeout})
nilMetrics.RecordStreamBufferDropped()
}
func TestEstimatePromptSize(t *testing.T) {
@@ -178,7 +197,7 @@ func TestRecordCompaction(t *testing.T) {
t.Run("nil metrics does not panic", func(t *testing.T) {
t.Parallel()
var m *chatloop.Metrics
m.RecordCompaction("anthropic", true, nil)
m.RecordCompaction("anthropic", "claude-sonnet-4-5", true, nil)
})
tests := []struct {
@@ -231,7 +250,7 @@ func TestRecordCompaction(t *testing.T) {
reg := prometheus.NewRegistry()
m := chatloop.NewMetrics(reg)
m.RecordCompaction("test", tt.compacted, tt.err)
m.RecordCompaction("test-provider", "test-model", tt.compacted, tt.err)
families, err := reg.Gather()
require.NoError(t, err)
@@ -253,18 +272,112 @@ func TestRecordCompaction(t *testing.T) {
require.Len(t, f.GetMetric(), 1)
metric := f.GetMetric()[0]
assert.Equal(t, float64(tt.wantCount), metric.GetCounter().GetValue())
// Check label.
// Check labels: provider, model, result.
labels := map[string]string{}
for _, lp := range metric.GetLabel() {
if lp.GetName() == "result" {
assert.Equal(t, tt.wantLabel, lp.GetValue())
}
labels[lp.GetName()] = lp.GetValue()
}
assert.Equal(t, "test-provider", labels["provider"])
assert.Equal(t, "test-model", labels["model"])
assert.Equal(t, tt.wantLabel, labels["result"])
}
assert.True(t, found, "compaction_total metric not found")
})
}
}
func TestRecordStreamRetry(t *testing.T) {
t.Parallel()
// One row per chaterror.Kind* constant. Production callers always
// reach RecordStreamRetry through chaterror.Classify, which
// guarantees Kind is non-empty, so no empty-string case is
// needed.
tests := []struct {
name string
kind string
}{
{name: "overloaded", kind: chaterror.KindOverloaded},
{name: "rate_limit", kind: chaterror.KindRateLimit},
{name: "timeout", kind: chaterror.KindTimeout},
{name: "startup_timeout", kind: chaterror.KindStartupTimeout},
{name: "auth", kind: chaterror.KindAuth},
{name: "config", kind: chaterror.KindConfig},
{name: "generic", kind: chaterror.KindGeneric},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
m := chatloop.NewMetrics(reg)
m.RecordStreamRetry("test-provider", "test-model", chaterror.ClassifiedError{
Kind: tt.kind,
})
families, err := reg.Gather()
require.NoError(t, err)
var found bool
for _, f := range families {
if f.GetName() != "coderd_chatd_stream_retries_total" {
continue
}
found = true
require.Len(t, f.GetMetric(), 1)
metric := f.GetMetric()[0]
assert.Equal(t, float64(1), metric.GetCounter().GetValue())
labels := map[string]string{}
for _, lp := range metric.GetLabel() {
labels[lp.GetName()] = lp.GetValue()
}
assert.Equal(t, "test-provider", labels["provider"])
assert.Equal(t, "test-model", labels["model"])
assert.Equal(t, tt.kind, labels["kind"])
}
assert.True(t, found, "stream_retries_total metric not found")
})
}
}
func TestRecordStreamBufferDropped(t *testing.T) {
t.Parallel()
t.Run("nil metrics does not panic", func(t *testing.T) {
t.Parallel()
var m *chatloop.Metrics
m.RecordStreamBufferDropped()
})
t.Run("increments monotonically", func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
m := chatloop.NewMetrics(reg)
m.RecordStreamBufferDropped()
m.RecordStreamBufferDropped()
m.RecordStreamBufferDropped()
families, err := reg.Gather()
require.NoError(t, err)
var found bool
for _, f := range families {
if f.GetName() != "coderd_chatd_stream_buffer_dropped_total" {
continue
}
found = true
require.Len(t, f.GetMetric(), 1)
assert.Equal(t, float64(3), f.GetMetric()[0].GetCounter().GetValue())
assert.Empty(t, f.GetMetric()[0].GetLabel(),
"stream_buffer_dropped_total must be an unlabeled counter")
}
assert.True(t, found, "stream_buffer_dropped_total metric not found")
})
}
func TestRun_RecordsMetrics(t *testing.T) {
t.Parallel()
@@ -273,6 +386,7 @@ func TestRun_RecordsMetrics(t *testing.T) {
model := &chattest.FakeModel{
ProviderName: "test-provider",
ModelName: "test-model",
StreamFn: func(_ context.Context, call fantasy.Call) (fantasy.StreamResponse, error) {
return func(yield func(fantasy.StreamPart) bool) {
parts := []fantasy.StreamPart{
@@ -311,6 +425,16 @@ func TestRun_RecordsMetrics(t *testing.T) {
families, err := reg.Gather()
require.NoError(t, err)
assertProviderModelLabels := func(t *testing.T, metric *dto.Metric) {
t.Helper()
labels := map[string]string{}
for _, lp := range metric.GetLabel() {
labels[lp.GetName()] = lp.GetValue()
}
assert.Equal(t, "test-provider", labels["provider"])
assert.Equal(t, "test-model", labels["model"])
}
found := make(map[string]bool)
for _, f := range families {
found[f.GetName()] = true
@@ -320,18 +444,22 @@ func TestRun_RecordsMetrics(t *testing.T) {
require.Len(t, f.GetMetric(), 1)
assert.Equal(t, float64(1), f.GetMetric()[0].GetCounter().GetValue(),
"steps_total should be 1 after one step")
assertProviderModelLabels(t, f.GetMetric()[0])
case "coderd_chatd_message_count":
require.Len(t, f.GetMetric(), 1)
assert.Equal(t, uint64(1), f.GetMetric()[0].GetHistogram().GetSampleCount(),
"message_count should have 1 observation")
assertProviderModelLabels(t, f.GetMetric()[0])
case "coderd_chatd_prompt_size_bytes":
require.Len(t, f.GetMetric(), 1)
assert.Equal(t, uint64(1), f.GetMetric()[0].GetHistogram().GetSampleCount(),
"prompt_size_bytes should have 1 observation")
assertProviderModelLabels(t, f.GetMetric()[0])
case "coderd_chatd_ttft_seconds":
require.Len(t, f.GetMetric(), 1)
assert.Equal(t, uint64(1), f.GetMetric()[0].GetHistogram().GetSampleCount(),
"ttft_seconds should have 1 observation")
assertProviderModelLabels(t, f.GetMetric()[0])
}
}
@@ -340,3 +468,138 @@ func TestRun_RecordsMetrics(t *testing.T) {
assert.True(t, found["coderd_chatd_prompt_size_bytes"], "prompt_size_bytes not recorded")
assert.True(t, found["coderd_chatd_ttft_seconds"], "ttft_seconds not recorded")
}
// TestRun_StreamRetry_RecordsMetric exercises the end-to-end retry
// path: a retryable error on the first Stream call, success on the
// second. Asserts both the metric and the back-compat OnRetry
// callback fire.
//
// Note: chatretry.Retry uses time.NewTimer (not quartz.Clock), so
// this test pays chatretry.InitialDelay (1s) of real wall-clock
// time per retry. Keep it to one retry.
func TestRun_StreamRetry_RecordsMetric(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
metrics := chatloop.NewMetrics(reg)
type retryCall struct {
attempt int
classified chatretry.ClassifiedError
}
var retries []retryCall
calls := 0
model := &chattest.FakeModel{
ProviderName: "test-provider",
ModelName: "test-model",
StreamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
calls++
if calls == 1 {
return nil, xerrors.New("received status 429 from upstream")
}
return func(yield func(fantasy.StreamPart) bool) {
yield(fantasy.StreamPart{
Type: fantasy.StreamPartTypeFinish,
FinishReason: fantasy.FinishReasonStop,
})
}, nil
},
}
err := chatloop.Run(context.Background(), chatloop.RunOptions{
Model: model,
MaxSteps: 1,
ContextLimitFallback: 4096,
PersistStep: func(_ context.Context, _ chatloop.PersistedStep) error {
return nil
},
Metrics: metrics,
OnRetry: func(
attempt int,
_ error,
classified chatretry.ClassifiedError,
_ time.Duration,
) {
retries = append(retries, retryCall{
attempt: attempt,
classified: classified,
})
},
})
require.NoError(t, err)
// Back-compat: OnRetry still fires with classified error.
require.Len(t, retries, 1)
assert.Equal(t, 1, retries[0].attempt)
assert.Equal(t, chaterror.KindRateLimit, retries[0].classified.Kind)
assert.Equal(t, "test-provider", retries[0].classified.Provider)
// Metric assertion.
families, err := reg.Gather()
require.NoError(t, err)
var found bool
for _, f := range families {
if f.GetName() != "coderd_chatd_stream_retries_total" {
continue
}
found = true
require.Len(t, f.GetMetric(), 1)
metric := f.GetMetric()[0]
assert.Equal(t, float64(1), metric.GetCounter().GetValue())
labels := map[string]string{}
for _, lp := range metric.GetLabel() {
labels[lp.GetName()] = lp.GetValue()
}
assert.Equal(t, "test-provider", labels["provider"])
assert.Equal(t, "test-model", labels["model"])
assert.Equal(t, chaterror.KindRateLimit, labels["kind"])
}
assert.True(t, found, "stream_retries_total metric not found")
}
// TestRun_StreamRetry_CanceledDoesNotIncrement pins the invariant
// that canceled streams never increment stream_retries_total.
// chaterror.Classify routes context.Canceled to
// ClassifiedError{Retryable: false}, so chatretry.Retry returns
// immediately without calling onRetry. This test guards against
// future classification changes that could silently introduce
// misleading retry samples.
func TestRun_StreamRetry_CanceledDoesNotIncrement(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
metrics := chatloop.NewMetrics(reg)
model := &chattest.FakeModel{
ProviderName: "test-provider",
ModelName: "test-model",
StreamFn: func(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
return nil, context.Canceled
},
}
err := chatloop.Run(context.Background(), chatloop.RunOptions{
Model: model,
MaxSteps: 1,
ContextLimitFallback: 4096,
PersistStep: func(_ context.Context, _ chatloop.PersistedStep) error {
return nil
},
Metrics: metrics,
})
// Expect an error (the stream failed); we don't care which error
// kind as long as no retry was recorded.
require.Error(t, err)
families, err := reg.Gather()
require.NoError(t, err)
for _, f := range families {
if f.GetName() == "coderd_chatd_stream_retries_total" {
assert.Empty(t, f.GetMetric(),
"stream_retries_total should have no samples after a canceled stream")
}
}
}
@@ -0,0 +1,216 @@
package chatd
import (
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/x/chatd/chatloop"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
)
// TestStreamStateCollector exercises the four gauges emitted by
// streamStateCollector against representative map states.
func TestStreamStateCollector(t *testing.T) {
t.Parallel()
t.Run("EmptyMap", func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
server := &Server{}
reg.MustRegister(&streamStateCollector{server: server})
assertGauges(t, reg, gaugeExpectations{
active: 0,
bufferMax: 0,
bufferTotal: 0,
subscribers: 0,
})
})
t.Run("PopulatedMap", func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
server := &Server{}
server.chatStreams.Store(uuid.New(), &chatStreamState{
buffer: make([]codersdk.ChatStreamEvent, 10),
subscribers: newSubscribers(t, 2),
})
server.chatStreams.Store(uuid.New(), &chatStreamState{
buffer: make([]codersdk.ChatStreamEvent, 25),
subscribers: map[uuid.UUID]chan codersdk.ChatStreamEvent{},
})
server.chatStreams.Store(uuid.New(), &chatStreamState{
buffer: nil,
subscribers: newSubscribers(t, 1),
})
reg.MustRegister(&streamStateCollector{server: server})
assertGauges(t, reg, gaugeExpectations{
active: 3,
bufferMax: 25,
bufferTotal: 35,
subscribers: 3,
})
})
t.Run("SkipsWrongType", func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
server := &Server{}
server.chatStreams.Store(uuid.New(), "garbage")
server.chatStreams.Store(uuid.New(), &chatStreamState{
buffer: make([]codersdk.ChatStreamEvent, 5),
subscribers: newSubscribers(t, 1),
})
reg.MustRegister(&streamStateCollector{server: server})
// The non-matching entry is silently skipped. Only the
// valid chatStreamState counts.
assertGauges(t, reg, gaugeExpectations{
active: 1,
bufferMax: 5,
bufferTotal: 5,
subscribers: 1,
})
})
// Runs Collect concurrently with state.mu mutations; catches
// missing lock acquisition under `go test -race`.
t.Run("LockContentionSmoke", func(t *testing.T) {
t.Parallel()
server := &Server{}
state := &chatStreamState{
buffer: make([]codersdk.ChatStreamEvent, 0, 100),
subscribers: newSubscribers(t, 1),
}
server.chatStreams.Store(uuid.New(), state)
collector := &streamStateCollector{server: server}
const iterations = 100
var wg sync.WaitGroup
// Mutator: grows and shrinks the buffer under state.mu.
wg.Go(func() {
for range iterations {
state.mu.Lock()
state.buffer = append(state.buffer, codersdk.ChatStreamEvent{})
if len(state.buffer) > 50 {
state.buffer = state.buffer[10:]
}
state.mu.Unlock()
}
})
// Scraper: repeatedly invokes Collect into a discard
// channel. A panic or race here fails the test.
wg.Go(func() {
ctx := testutil.Context(t, 10*time.Second)
for range iterations {
ch := make(chan prometheus.Metric, 4)
collector.Collect(ch)
// Drain all metrics the collector wrote.
for range 4 {
testutil.SoftTryReceive(ctx, t, ch)
}
}
})
wg.Wait()
})
}
type gaugeExpectations struct {
active float64
bufferMax float64
bufferTotal float64
subscribers float64
}
func assertGauges(t *testing.T, reg *prometheus.Registry, want gaugeExpectations) {
t.Helper()
families, err := reg.Gather()
require.NoError(t, err)
got := map[string]float64{}
for _, f := range families {
require.Len(t, f.GetMetric(), 1, "metric %q should have exactly one sample", f.GetName())
got[f.GetName()] = f.GetMetric()[0].GetGauge().GetValue()
}
assert.Equal(t, want.active, got["coderd_chatd_streams_active"], "streams_active")
assert.Equal(t, want.bufferMax, got["coderd_chatd_stream_buffer_size_max"], "buffer_size_max")
assert.Equal(t, want.bufferTotal, got["coderd_chatd_stream_buffer_events"], "buffer_events")
assert.Equal(t, want.subscribers, got["coderd_chatd_stream_subscribers"], "subscribers")
}
func newSubscribers(t *testing.T, n int) map[uuid.UUID]chan codersdk.ChatStreamEvent {
t.Helper()
subs := make(map[uuid.UUID]chan codersdk.ChatStreamEvent, n)
for range n {
subs[uuid.New()] = make(chan codersdk.ChatStreamEvent, 1)
}
return subs
}
// TestStreamStateCollector_BufferDroppedIncrementsOnCapacity pre-fills
// a buffer to capacity and asserts stream_buffer_dropped_total
// increments on each subsequent publishToStream drop.
func TestStreamStateCollector_BufferDroppedIncrementsOnCapacity(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
server := &Server{
logger: slog.Make(),
clock: quartz.NewMock(t),
metrics: chatloop.NewMetrics(reg),
}
chatID := uuid.New()
server.chatStreams.Store(chatID, &chatStreamState{
buffering: true,
buffer: make([]codersdk.ChatStreamEvent, maxStreamBufferSize),
})
partEvent := codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessagePart,
MessagePart: &codersdk.ChatStreamMessagePart{},
}
server.publishToStream(chatID, partEvent)
assert.Equal(t, float64(1), counterValue(t, reg, "coderd_chatd_stream_buffer_dropped_total"))
server.publishToStream(chatID, partEvent)
assert.Equal(t, float64(2), counterValue(t, reg, "coderd_chatd_stream_buffer_dropped_total"))
}
func counterValue(t *testing.T, reg *prometheus.Registry, name string) float64 {
t.Helper()
families, err := reg.Gather()
require.NoError(t, err)
for _, f := range families {
if f.GetName() != name {
continue
}
require.Len(t, f.GetMetric(), 1, "counter %q should have exactly one sample", name)
return f.GetMetric()[0].GetCounter().GetValue()
}
t.Fatalf("counter %q not registered", name)
return 0
}
+12 -6
View File
@@ -199,12 +199,18 @@ deployment. They will always be available from the agent.
| `coderd_authz_prepare_authorize_duration_seconds` | histogram | Duration of the 'PrepareAuthorize' call in seconds. | |
| `coderd_build_info` | gauge | Describes the current build/version of the Coder server. Value is always 1. | `revision` `version` |
| `coderd_chatd_chats` | gauge | Number of chats being processed, by state. | `state` |
| `coderd_chatd_compaction_total` | counter | Total compaction outcomes (only recorded when compaction was triggered or failed). | `provider` `result` |
| `coderd_chatd_message_count` | histogram | Number of messages in the prompt per LLM request. | `provider` |
| `coderd_chatd_prompt_size_bytes` | histogram | Estimated byte size of the prompt per LLM request. | `provider` |
| `coderd_chatd_steps_total` | counter | Total agentic loop steps across all chats. | `provider` |
| `coderd_chatd_tool_result_size_bytes` | histogram | Size in bytes of each tool execution result. | `provider` `tool_name` |
| `coderd_chatd_ttft_seconds` | histogram | Time-to-first-token: wall time from LLM request to first streamed chunk. | `provider` |
| `coderd_chatd_compaction_total` | counter | Total compaction outcomes (only recorded when compaction was triggered or failed). | `model` `provider` `result` |
| `coderd_chatd_message_count` | histogram | Number of messages in the prompt per LLM request. | `model` `provider` |
| `coderd_chatd_prompt_size_bytes` | histogram | Estimated byte size of the prompt per LLM request. | `model` `provider` |
| `coderd_chatd_steps_total` | counter | Total agentic loop steps across all chats. | `model` `provider` |
| `coderd_chatd_stream_buffer_dropped_total` | counter | Number of chat stream buffer events dropped due to the per-chat buffer cap. | |
| `coderd_chatd_stream_buffer_events` | gauge | Sum of current buffer lengths across all chat streams. | |
| `coderd_chatd_stream_buffer_size_max` | gauge | Maximum current buffer length across all chat streams. | |
| `coderd_chatd_stream_retries_total` | counter | Total LLM stream retries. | `kind` `model` `provider` |
| `coderd_chatd_stream_subscribers` | gauge | Current number of chat stream subscribers across all chat streams. | |
| `coderd_chatd_streams_active` | gauge | Current number of chat stream state entries (in-flight plus retained). | |
| `coderd_chatd_tool_result_size_bytes` | histogram | Size in bytes of each tool execution result. | `model` `provider` `tool_name` |
| `coderd_chatd_ttft_seconds` | histogram | Time-to-first-token: wall time from LLM request to first streamed chunk. | `model` `provider` |
| `coderd_db_query_counts_total` | counter | Total number of queries labelled by HTTP route, method, and query name. | `method` `query` `route` |
| `coderd_db_query_latencies_seconds` | histogram | Latency distribution of queries in seconds. | `query` |
| `coderd_db_tx_duration_seconds` | histogram | Duration of transactions in seconds. | `success` `tx_id` |
+24 -6
View File
@@ -231,22 +231,40 @@ coderd_build_info{version="",revision=""} 0
coderd_chatd_chats{state=""} 0
# HELP coderd_chatd_compaction_total Total compaction outcomes (only recorded when compaction was triggered or failed).
# TYPE coderd_chatd_compaction_total counter
coderd_chatd_compaction_total{provider="",result=""} 0
coderd_chatd_compaction_total{provider="",model="",result=""} 0
# HELP coderd_chatd_message_count Number of messages in the prompt per LLM request.
# TYPE coderd_chatd_message_count histogram
coderd_chatd_message_count{provider=""} 0
coderd_chatd_message_count{provider="",model=""} 0
# HELP coderd_chatd_prompt_size_bytes Estimated byte size of the prompt per LLM request.
# TYPE coderd_chatd_prompt_size_bytes histogram
coderd_chatd_prompt_size_bytes{provider=""} 0
coderd_chatd_prompt_size_bytes{provider="",model=""} 0
# HELP coderd_chatd_steps_total Total agentic loop steps across all chats.
# TYPE coderd_chatd_steps_total counter
coderd_chatd_steps_total{provider=""} 0
coderd_chatd_steps_total{provider="",model=""} 0
# HELP coderd_chatd_stream_buffer_dropped_total Number of chat stream buffer events dropped due to the per-chat buffer cap.
# TYPE coderd_chatd_stream_buffer_dropped_total counter
coderd_chatd_stream_buffer_dropped_total 0
# HELP coderd_chatd_stream_buffer_events Sum of current buffer lengths across all chat streams.
# TYPE coderd_chatd_stream_buffer_events gauge
coderd_chatd_stream_buffer_events 0
# HELP coderd_chatd_stream_buffer_size_max Maximum current buffer length across all chat streams.
# TYPE coderd_chatd_stream_buffer_size_max gauge
coderd_chatd_stream_buffer_size_max 0
# HELP coderd_chatd_stream_retries_total Total LLM stream retries.
# TYPE coderd_chatd_stream_retries_total counter
coderd_chatd_stream_retries_total{provider="",model="",kind=""} 0
# HELP coderd_chatd_stream_subscribers Current number of chat stream subscribers across all chat streams.
# TYPE coderd_chatd_stream_subscribers gauge
coderd_chatd_stream_subscribers 0
# HELP coderd_chatd_streams_active Current number of chat stream state entries (in-flight plus retained).
# TYPE coderd_chatd_streams_active gauge
coderd_chatd_streams_active 0
# HELP coderd_chatd_tool_result_size_bytes Size in bytes of each tool execution result.
# TYPE coderd_chatd_tool_result_size_bytes histogram
coderd_chatd_tool_result_size_bytes{provider="",tool_name=""} 0
coderd_chatd_tool_result_size_bytes{provider="",model="",tool_name=""} 0
# HELP coderd_chatd_ttft_seconds Time-to-first-token: wall time from LLM request to first streamed chunk.
# TYPE coderd_chatd_ttft_seconds histogram
coderd_chatd_ttft_seconds{provider=""} 0
coderd_chatd_ttft_seconds{provider="",model=""} 0
# HELP coderd_db_query_counts_total Total number of queries labelled by HTTP route, method, and query name.
# TYPE coderd_db_query_counts_total counter
coderd_db_query_counts_total{route="",method="",query=""} 0