chore: add aibridge data to telemetry (#20449)

- Adds a new table to keep track of which payloads have already been
reported since we only report for the last clock hour
- Adds a query to gather and aggregate all the data by
provider/model/client

Relates to https://github.com/coder/coder-telemetry-server/issues/27
This commit is contained in:
Dean Sheather
2025-10-28 03:16:41 +11:00
committed by GitHub
parent cadf1352b4
commit 5a3ceb38f0
20 changed files with 940 additions and 6 deletions
+170 -4
View File
@@ -28,7 +28,6 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
"cdr.dev/slog"
"github.com/coder/coder/v2/buildinfo"
clitelemetry "github.com/coder/coder/v2/cli/telemetry"
"github.com/coder/coder/v2/coderd/database"
@@ -36,6 +35,7 @@ import (
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
)
const (
@@ -48,6 +48,7 @@ type Options struct {
Disabled bool
Database database.Store
Logger slog.Logger
Clock quartz.Clock
// URL is an endpoint to direct telemetry towards!
URL *url.URL
Experiments codersdk.Experiments
@@ -65,6 +66,9 @@ type Options struct {
// Duplicate data will be sent, it's on the server-side to index by UUID.
// Data is anonymized prior to being sent!
func New(options Options) (Reporter, error) {
if options.Clock == nil {
options.Clock = quartz.NewReal()
}
if options.SnapshotFrequency == 0 {
// Report once every 30mins by default!
options.SnapshotFrequency = 30 * time.Minute
@@ -86,7 +90,7 @@ func New(options Options) (Reporter, error) {
options: options,
deploymentURL: deploymentURL,
snapshotURL: snapshotURL,
startedAt: dbtime.Now(),
startedAt: dbtime.Time(options.Clock.Now()).UTC(),
client: &http.Client{},
}
go reporter.runSnapshotter()
@@ -166,7 +170,7 @@ func (r *remoteReporter) Close() {
return
}
close(r.closed)
now := dbtime.Now()
now := dbtime.Time(r.options.Clock.Now()).UTC()
r.shutdownAt = &now
if r.Enabled() {
// Report a final collection of telemetry prior to close!
@@ -412,7 +416,7 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) {
ctx = r.ctx
// For resources that grow in size very quickly (like workspace builds),
// we only report events that occurred within the past hour.
createdAfter = dbtime.Now().Add(-1 * time.Hour)
createdAfter = dbtime.Time(r.options.Clock.Now().Add(-1 * time.Hour)).UTC()
eg errgroup.Group
snapshot = &Snapshot{
DeploymentID: r.options.DeploymentID,
@@ -744,6 +748,14 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) {
}
return nil
})
eg.Go(func() error {
summaries, err := r.generateAIBridgeInterceptionsSummaries(ctx)
if err != nil {
return xerrors.Errorf("generate AIBridge interceptions telemetry summaries: %w", err)
}
snapshot.AIBridgeInterceptionsSummaries = summaries
return nil
})
err := eg.Wait()
if err != nil {
@@ -752,6 +764,76 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) {
return snapshot, nil
}
func (r *remoteReporter) generateAIBridgeInterceptionsSummaries(ctx context.Context) ([]AIBridgeInterceptionsSummary, error) {
// Get the current timeframe, which is the previous hour.
now := dbtime.Time(r.options.Clock.Now()).UTC()
endedAtBefore := now.Truncate(time.Hour)
endedAtAfter := endedAtBefore.Add(-1 * time.Hour)
// Note: we don't use a transaction for this function since we do tolerate
// some errors, like duplicate lock rows, and we also calculate
// summaries in parallel.
// Claim the heartbeat lock row for this hour.
err := r.options.Database.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{
EventType: "aibridge_interceptions_summary",
PeriodEndingAt: endedAtBefore,
})
if database.IsUniqueViolation(err, database.UniqueTelemetryLocksPkey) {
// Another replica has already claimed the lock row for this hour.
r.options.Logger.Debug(ctx, "aibridge interceptions telemetry lock already claimed for this hour by another replica, skipping", slog.F("period_ending_at", endedAtBefore))
return nil, nil
}
if err != nil {
return nil, xerrors.Errorf("insert AIBridge interceptions telemetry lock (period_ending_at=%q): %w", endedAtBefore, err)
}
// List the summary categories that need to be calculated.
summaryCategories, err := r.options.Database.ListAIBridgeInterceptionsTelemetrySummaries(ctx, database.ListAIBridgeInterceptionsTelemetrySummariesParams{
EndedAtAfter: endedAtAfter, // inclusive
EndedAtBefore: endedAtBefore, // exclusive
})
if err != nil {
return nil, xerrors.Errorf("list AIBridge interceptions telemetry summaries (startedAtAfter=%q, endedAtBefore=%q): %w", endedAtAfter, endedAtBefore, err)
}
// Calculate and convert the summaries for all categories.
var (
eg, egCtx = errgroup.WithContext(ctx)
mu sync.Mutex
summaries = make([]AIBridgeInterceptionsSummary, 0, len(summaryCategories))
)
for _, category := range summaryCategories {
eg.Go(func() error {
summary, err := r.options.Database.CalculateAIBridgeInterceptionsTelemetrySummary(egCtx, database.CalculateAIBridgeInterceptionsTelemetrySummaryParams{
Provider: category.Provider,
Model: category.Model,
Client: category.Client,
EndedAtAfter: endedAtAfter,
EndedAtBefore: endedAtBefore,
})
if err != nil {
return xerrors.Errorf("calculate AIBridge interceptions telemetry summary (provider=%q, model=%q, client=%q, startedAtAfter=%q, endedAtBefore=%q): %w", category.Provider, category.Model, category.Client, endedAtAfter, endedAtBefore, err)
}
// Double check that at least one interception was found in the
// timeframe.
if summary.InterceptionCount == 0 {
return nil
}
converted := ConvertAIBridgeInterceptionsSummary(endedAtBefore, category.Provider, category.Model, category.Client, summary)
mu.Lock()
defer mu.Unlock()
summaries = append(summaries, converted)
return nil
})
}
return summaries, eg.Wait()
}
// ConvertAPIKey anonymizes an API key.
func ConvertAPIKey(apiKey database.APIKey) APIKey {
a := APIKey{
@@ -1223,6 +1305,7 @@ type Snapshot struct {
TelemetryItems []TelemetryItem `json:"telemetry_items"`
UserTailnetConnections []UserTailnetConnection `json:"user_tailnet_connections"`
PrebuiltWorkspaces []PrebuiltWorkspace `json:"prebuilt_workspaces"`
AIBridgeInterceptionsSummaries []AIBridgeInterceptionsSummary `json:"aibridge_interceptions_summaries"`
}
// Deployment contains information about the host running Coder.
@@ -1859,6 +1942,89 @@ type PrebuiltWorkspace struct {
Count int `json:"count"`
}
type AIBridgeInterceptionsSummaryDurationMillis struct {
P50 int64 `json:"p50"`
P90 int64 `json:"p90"`
P95 int64 `json:"p95"`
P99 int64 `json:"p99"`
}
type AIBridgeInterceptionsSummaryTokenCount struct {
Input int64 `json:"input"`
Output int64 `json:"output"`
CachedRead int64 `json:"cached_read"`
CachedWritten int64 `json:"cached_written"`
}
type AIBridgeInterceptionsSummaryToolCallsCount struct {
Injected int64 `json:"injected"`
NonInjected int64 `json:"non_injected"`
}
// AIBridgeInterceptionsSummary is a summary of aggregated AI Bridge
// interception data over a period of 1 hour. We send a summary each hour for
// each unique provider + model + client combination.
type AIBridgeInterceptionsSummary struct {
ID uuid.UUID `json:"id"`
// The end of the hour for which the summary is taken. This will always be a
// UTC timestamp truncated to the hour.
Timestamp time.Time `json:"timestamp"`
Provider string `json:"provider"`
Model string `json:"model"`
Client string `json:"client"`
InterceptionCount int64 `json:"interception_count"`
InterceptionDurationMillis AIBridgeInterceptionsSummaryDurationMillis `json:"interception_duration_millis"`
// Map of route to number of interceptions.
// e.g. "/v1/chat/completions:blocking", "/v1/chat/completions:streaming"
InterceptionsByRoute map[string]int64 `json:"interceptions_by_route"`
UniqueInitiatorCount int64 `json:"unique_initiator_count"`
UserPromptsCount int64 `json:"user_prompts_count"`
TokenUsagesCount int64 `json:"token_usages_count"`
TokenCount AIBridgeInterceptionsSummaryTokenCount `json:"token_count"`
ToolCallsCount AIBridgeInterceptionsSummaryToolCallsCount `json:"tool_calls_count"`
InjectedToolCallErrorCount int64 `json:"injected_tool_call_error_count"`
}
func ConvertAIBridgeInterceptionsSummary(endTime time.Time, provider, model, client string, summary database.CalculateAIBridgeInterceptionsTelemetrySummaryRow) AIBridgeInterceptionsSummary {
return AIBridgeInterceptionsSummary{
ID: uuid.New(),
Timestamp: endTime,
Provider: provider,
Model: model,
Client: client,
InterceptionCount: summary.InterceptionCount,
InterceptionDurationMillis: AIBridgeInterceptionsSummaryDurationMillis{
P50: summary.InterceptionDurationP50Millis,
P90: summary.InterceptionDurationP90Millis,
P95: summary.InterceptionDurationP95Millis,
P99: summary.InterceptionDurationP99Millis,
},
// TODO: currently we don't track by route
InterceptionsByRoute: make(map[string]int64),
UniqueInitiatorCount: summary.UniqueInitiatorCount,
UserPromptsCount: summary.UserPromptsCount,
TokenUsagesCount: summary.TokenUsagesCount,
TokenCount: AIBridgeInterceptionsSummaryTokenCount{
Input: summary.TokenCountInput,
Output: summary.TokenCountOutput,
CachedRead: summary.TokenCountCachedRead,
CachedWritten: summary.TokenCountCachedWritten,
},
ToolCallsCount: AIBridgeInterceptionsSummaryToolCallsCount{
Injected: summary.ToolCallsCountInjected,
NonInjected: summary.ToolCallsCountNonInjected,
},
InjectedToolCallErrorCount: summary.InjectedToolCallErrorCount,
}
}
type noopReporter struct{}
func (*noopReporter) Report(_ *Snapshot) {}
+127 -2
View File
@@ -28,6 +28,7 @@ import (
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
)
func TestMain(m *testing.M) {
@@ -44,6 +45,7 @@ func TestTelemetry(t *testing.T) {
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitMedium)
now := dbtime.Now()
org, err := db.GetDefaultOrganization(ctx)
require.NoError(t, err)
@@ -208,12 +210,88 @@ func TestTelemetry(t *testing.T) {
AgentID: wsagent.ID,
})
_, snapshot := collectSnapshot(ctx, t, db, nil)
previousAIBridgeInterceptionPeriod := now.Truncate(time.Hour)
user2 := dbgen.User(t, db, database.User{})
aiBridgeInterception1 := dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{
InitiatorID: user.ID,
Provider: "anthropic",
Model: "deanseek",
StartedAt: previousAIBridgeInterceptionPeriod.Add(-30 * time.Minute),
}, nil)
_ = dbgen.AIBridgeTokenUsage(t, db, database.InsertAIBridgeTokenUsageParams{
InterceptionID: aiBridgeInterception1.ID,
InputTokens: 100,
OutputTokens: 200,
Metadata: json.RawMessage(`{"cache_read_input":300,"cache_creation_input":400}`),
})
_ = dbgen.AIBridgeUserPrompt(t, db, database.InsertAIBridgeUserPromptParams{
InterceptionID: aiBridgeInterception1.ID,
})
_ = dbgen.AIBridgeToolUsage(t, db, database.InsertAIBridgeToolUsageParams{
InterceptionID: aiBridgeInterception1.ID,
Injected: true,
InvocationError: sql.NullString{String: "error1", Valid: true},
})
_, err = db.UpdateAIBridgeInterceptionEnded(ctx, database.UpdateAIBridgeInterceptionEndedParams{
ID: aiBridgeInterception1.ID,
EndedAt: aiBridgeInterception1.StartedAt.Add(1 * time.Minute), // 1 minute duration
})
require.NoError(t, err)
aiBridgeInterception2 := dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{
InitiatorID: user2.ID,
Provider: aiBridgeInterception1.Provider,
Model: aiBridgeInterception1.Model,
StartedAt: aiBridgeInterception1.StartedAt,
}, nil)
_ = dbgen.AIBridgeTokenUsage(t, db, database.InsertAIBridgeTokenUsageParams{
InterceptionID: aiBridgeInterception2.ID,
InputTokens: 100,
OutputTokens: 200,
Metadata: json.RawMessage(`{"cache_read_input":300,"cache_creation_input":400}`),
})
_ = dbgen.AIBridgeUserPrompt(t, db, database.InsertAIBridgeUserPromptParams{
InterceptionID: aiBridgeInterception2.ID,
})
_ = dbgen.AIBridgeToolUsage(t, db, database.InsertAIBridgeToolUsageParams{
InterceptionID: aiBridgeInterception2.ID,
Injected: false,
})
_, err = db.UpdateAIBridgeInterceptionEnded(ctx, database.UpdateAIBridgeInterceptionEndedParams{
ID: aiBridgeInterception2.ID,
EndedAt: aiBridgeInterception2.StartedAt.Add(2 * time.Minute), // 2 minute duration
})
require.NoError(t, err)
aiBridgeInterception3 := dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{
InitiatorID: user2.ID,
Provider: "openai",
Model: "gpt-5",
StartedAt: aiBridgeInterception1.StartedAt,
}, nil)
_, err = db.UpdateAIBridgeInterceptionEnded(ctx, database.UpdateAIBridgeInterceptionEndedParams{
ID: aiBridgeInterception3.ID,
EndedAt: aiBridgeInterception3.StartedAt.Add(3 * time.Minute), // 3 minute duration
})
require.NoError(t, err)
_ = dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{
InitiatorID: user2.ID,
Provider: "openai",
Model: "gpt-5",
StartedAt: aiBridgeInterception1.StartedAt,
}, nil)
// not ended, so it should not affect summaries
clock := quartz.NewMock(t)
clock.Set(now)
_, snapshot := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options {
opts.Clock = clock
return opts
})
require.Len(t, snapshot.ProvisionerJobs, 2)
require.Len(t, snapshot.Licenses, 1)
require.Len(t, snapshot.Templates, 2)
require.Len(t, snapshot.TemplateVersions, 3)
require.Len(t, snapshot.Users, 1)
require.Len(t, snapshot.Users, 2)
require.Len(t, snapshot.Groups, 2)
// 1 member in the everyone group + 1 member in the custom group
require.Len(t, snapshot.GroupMembers, 2)
@@ -287,6 +365,53 @@ func TestTelemetry(t *testing.T) {
for _, entity := range snapshot.Templates {
require.Equal(t, entity.OrganizationID, org.ID)
}
// 2 unique provider + model + client combinations
require.Len(t, snapshot.AIBridgeInterceptionsSummaries, 2)
snapshot1 := snapshot.AIBridgeInterceptionsSummaries[0]
snapshot2 := snapshot.AIBridgeInterceptionsSummaries[1]
if snapshot1.Provider != aiBridgeInterception1.Provider {
snapshot1, snapshot2 = snapshot2, snapshot1
}
require.Equal(t, snapshot1.Provider, aiBridgeInterception1.Provider)
require.Equal(t, snapshot1.Model, aiBridgeInterception1.Model)
require.Equal(t, snapshot1.Client, "unknown") // no client info yet
require.EqualValues(t, snapshot1.InterceptionCount, 2)
require.EqualValues(t, snapshot1.InterceptionsByRoute, map[string]int64{}) // no route info yet
require.EqualValues(t, snapshot1.InterceptionDurationMillis.P50, 90_000)
require.EqualValues(t, snapshot1.InterceptionDurationMillis.P90, 114_000)
require.EqualValues(t, snapshot1.InterceptionDurationMillis.P95, 117_000)
require.EqualValues(t, snapshot1.InterceptionDurationMillis.P99, 119_400)
require.EqualValues(t, snapshot1.UniqueInitiatorCount, 2)
require.EqualValues(t, snapshot1.UserPromptsCount, 2)
require.EqualValues(t, snapshot1.TokenUsagesCount, 2)
require.EqualValues(t, snapshot1.TokenCount.Input, 200)
require.EqualValues(t, snapshot1.TokenCount.Output, 400)
require.EqualValues(t, snapshot1.TokenCount.CachedRead, 600)
require.EqualValues(t, snapshot1.TokenCount.CachedWritten, 800)
require.EqualValues(t, snapshot1.ToolCallsCount.Injected, 1)
require.EqualValues(t, snapshot1.ToolCallsCount.NonInjected, 1)
require.EqualValues(t, snapshot1.InjectedToolCallErrorCount, 1)
require.Equal(t, snapshot2.Provider, aiBridgeInterception3.Provider)
require.Equal(t, snapshot2.Model, aiBridgeInterception3.Model)
require.Equal(t, snapshot2.Client, "unknown") // no client info yet
require.EqualValues(t, snapshot2.InterceptionCount, 1)
require.EqualValues(t, snapshot2.InterceptionsByRoute, map[string]int64{}) // no route info yet
require.EqualValues(t, snapshot2.InterceptionDurationMillis.P50, 180_000)
require.EqualValues(t, snapshot2.InterceptionDurationMillis.P90, 180_000)
require.EqualValues(t, snapshot2.InterceptionDurationMillis.P95, 180_000)
require.EqualValues(t, snapshot2.InterceptionDurationMillis.P99, 180_000)
require.EqualValues(t, snapshot2.UniqueInitiatorCount, 1)
require.EqualValues(t, snapshot2.UserPromptsCount, 0)
require.EqualValues(t, snapshot2.TokenUsagesCount, 0)
require.EqualValues(t, snapshot2.TokenCount.Input, 0)
require.EqualValues(t, snapshot2.TokenCount.Output, 0)
require.EqualValues(t, snapshot2.TokenCount.CachedRead, 0)
require.EqualValues(t, snapshot2.TokenCount.CachedWritten, 0)
require.EqualValues(t, snapshot2.ToolCallsCount.Injected, 0)
require.EqualValues(t, snapshot2.ToolCallsCount.NonInjected, 0)
})
t.Run("HashedEmail", func(t *testing.T) {
t.Parallel()