From 5a3ceb38f0636e63d273cb64aef230b252b544a5 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Tue, 28 Oct 2025 03:16:41 +1100 Subject: [PATCH] chore: add aibridge data to telemetry (#20449) - Adds a new table to keep track of which payloads have already been reported since we only report for the last clock hour - Adds a query to gather and aggregate all the data by provider/model/client Relates to https://github.com/coder/coder-telemetry-server/issues/27 --- .gitignore | 2 + coderd/database/check_constraint.go | 1 + coderd/database/dbauthz/dbauthz.go | 28 ++ coderd/database/dbauthz/dbauthz_test.go | 22 ++ coderd/database/dbmetrics/querymetrics.go | 28 ++ coderd/database/dbmock/dbmock.go | 58 +++++ coderd/database/dbpurge/dbpurge.go | 10 + coderd/database/dbpurge/dbpurge_test.go | 53 ++++ coderd/database/dump.sql | 17 ++ .../000390_telemetry_locks.down.sql | 1 + .../migrations/000390_telemetry_locks.up.sql | 12 + .../fixtures/000390_telemetry_locks.up.sql | 8 + coderd/database/models.go | 8 + coderd/database/querier.go | 14 + coderd/database/queries.sql.go | 244 ++++++++++++++++++ coderd/database/queries/aibridge.sql | 119 +++++++++ coderd/database/queries/telemetrylocks.sql | 17 ++ coderd/database/unique_constraint.go | 1 + coderd/telemetry/telemetry.go | 174 ++++++++++++- coderd/telemetry/telemetry_test.go | 129 ++++++++- 20 files changed, 940 insertions(+), 6 deletions(-) create mode 100644 coderd/database/migrations/000390_telemetry_locks.down.sql create mode 100644 coderd/database/migrations/000390_telemetry_locks.up.sql create mode 100644 coderd/database/migrations/testdata/fixtures/000390_telemetry_locks.up.sql create mode 100644 coderd/database/queries/telemetrylocks.sql diff --git a/.gitignore b/.gitignore index 86f70cc3af..9b1edcec2d 100644 --- a/.gitignore +++ b/.gitignore @@ -89,3 +89,5 @@ result __debug_bin* **/.claude/settings.local.json + +/.env diff --git a/coderd/database/check_constraint.go b/coderd/database/check_constraint.go index d84802b988..8b1917b769 100644 --- a/coderd/database/check_constraint.go +++ b/coderd/database/check_constraint.go @@ -14,6 +14,7 @@ const ( CheckSubsystemsNotNone CheckConstraint = "subsystems_not_none" // workspace_agents CheckWorkspaceBuildsAiTaskSidebarAppIDRequired CheckConstraint = "workspace_builds_ai_task_sidebar_app_id_required" // workspace_builds CheckWorkspaceBuildsDeadlineBelowMaxDeadline CheckConstraint = "workspace_builds_deadline_below_max_deadline" // workspace_builds + CheckTelemetryLockEventTypeConstraint CheckConstraint = "telemetry_lock_event_type_constraint" // telemetry_locks CheckValidationMonotonicOrder CheckConstraint = "validation_monotonic_order" // template_version_parameters CheckUsageEventTypeCheck CheckConstraint = "usage_event_type_check" // usage_events ) diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index 4dc0fb93e3..74f24a617c 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -1424,6 +1424,13 @@ func (q *querier) BulkMarkNotificationMessagesSent(ctx context.Context, arg data return q.db.BulkMarkNotificationMessagesSent(ctx, arg) } +func (q *querier) CalculateAIBridgeInterceptionsTelemetrySummary(ctx context.Context, arg database.CalculateAIBridgeInterceptionsTelemetrySummaryParams) (database.CalculateAIBridgeInterceptionsTelemetrySummaryRow, error) { + if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceAibridgeInterception); err != nil { + return database.CalculateAIBridgeInterceptionsTelemetrySummaryRow{}, err + } + return q.db.CalculateAIBridgeInterceptionsTelemetrySummary(ctx, arg) +} + func (q *querier) ClaimPrebuiltWorkspace(ctx context.Context, arg database.ClaimPrebuiltWorkspaceParams) (database.ClaimPrebuiltWorkspaceRow, error) { empty := database.ClaimPrebuiltWorkspaceRow{} @@ -1723,6 +1730,13 @@ func (q *querier) DeleteOldProvisionerDaemons(ctx context.Context) error { return q.db.DeleteOldProvisionerDaemons(ctx) } +func (q *querier) DeleteOldTelemetryLocks(ctx context.Context, beforeTime time.Time) error { + if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil { + return err + } + return q.db.DeleteOldTelemetryLocks(ctx, beforeTime) +} + func (q *querier) DeleteOldWorkspaceAgentLogs(ctx context.Context, threshold time.Time) error { if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil { return err @@ -4212,6 +4226,13 @@ func (q *querier) InsertTelemetryItemIfNotExists(ctx context.Context, arg databa return q.db.InsertTelemetryItemIfNotExists(ctx, arg) } +func (q *querier) InsertTelemetryLock(ctx context.Context, arg database.InsertTelemetryLockParams) error { + if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil { + return err + } + return q.db.InsertTelemetryLock(ctx, arg) +} + func (q *querier) InsertTemplate(ctx context.Context, arg database.InsertTemplateParams) error { obj := rbac.ResourceTemplate.InOrg(arg.OrganizationID) if err := q.authorizeContext(ctx, policy.ActionCreate, obj); err != nil { @@ -4523,6 +4544,13 @@ func (q *querier) ListAIBridgeInterceptions(ctx context.Context, arg database.Li return q.db.ListAuthorizedAIBridgeInterceptions(ctx, arg, prep) } +func (q *querier) ListAIBridgeInterceptionsTelemetrySummaries(ctx context.Context, arg database.ListAIBridgeInterceptionsTelemetrySummariesParams) ([]database.ListAIBridgeInterceptionsTelemetrySummariesRow, error) { + if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceAibridgeInterception); err != nil { + return nil, err + } + return q.db.ListAIBridgeInterceptionsTelemetrySummaries(ctx, arg) +} + func (q *querier) ListAIBridgeTokenUsagesByInterceptionIDs(ctx context.Context, interceptionIDs []uuid.UUID) ([]database.AIBridgeTokenUsage, error) { // This function is a system function until we implement a join for aibridge interceptions. // Matches the behavior of the workspaces listing endpoint. diff --git a/coderd/database/dbauthz/dbauthz_test.go b/coderd/database/dbauthz/dbauthz_test.go index c7a0b2bb70..8cf622a434 100644 --- a/coderd/database/dbauthz/dbauthz_test.go +++ b/coderd/database/dbauthz/dbauthz_test.go @@ -4627,3 +4627,25 @@ func (s *MethodTestSuite) TestAIBridge() { check.Args(params).Asserts(intc, policy.ActionUpdate).Returns(intc) })) } + +func (s *MethodTestSuite) TestTelemetry() { + s.Run("InsertTelemetryLock", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) { + db.EXPECT().InsertTelemetryLock(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + check.Args(database.InsertTelemetryLockParams{}).Asserts(rbac.ResourceSystem, policy.ActionCreate) + })) + + s.Run("DeleteOldTelemetryLocks", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) { + db.EXPECT().DeleteOldTelemetryLocks(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + check.Args(time.Time{}).Asserts(rbac.ResourceSystem, policy.ActionDelete) + })) + + s.Run("ListAIBridgeInterceptionsTelemetrySummaries", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) { + db.EXPECT().ListAIBridgeInterceptionsTelemetrySummaries(gomock.Any(), gomock.Any()).Return([]database.ListAIBridgeInterceptionsTelemetrySummariesRow{}, nil).AnyTimes() + check.Args(database.ListAIBridgeInterceptionsTelemetrySummariesParams{}).Asserts(rbac.ResourceAibridgeInterception, policy.ActionRead) + })) + + s.Run("CalculateAIBridgeInterceptionsTelemetrySummary", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) { + db.EXPECT().CalculateAIBridgeInterceptionsTelemetrySummary(gomock.Any(), gomock.Any()).Return(database.CalculateAIBridgeInterceptionsTelemetrySummaryRow{}, nil).AnyTimes() + check.Args(database.CalculateAIBridgeInterceptionsTelemetrySummaryParams{}).Asserts(rbac.ResourceAibridgeInterception, policy.ActionRead) + })) +} diff --git a/coderd/database/dbmetrics/querymetrics.go b/coderd/database/dbmetrics/querymetrics.go index 57396cd31b..1bd8fda624 100644 --- a/coderd/database/dbmetrics/querymetrics.go +++ b/coderd/database/dbmetrics/querymetrics.go @@ -158,6 +158,13 @@ func (m queryMetricsStore) BulkMarkNotificationMessagesSent(ctx context.Context, return r0, r1 } +func (m queryMetricsStore) CalculateAIBridgeInterceptionsTelemetrySummary(ctx context.Context, arg database.CalculateAIBridgeInterceptionsTelemetrySummaryParams) (database.CalculateAIBridgeInterceptionsTelemetrySummaryRow, error) { + start := time.Now() + r0, r1 := m.s.CalculateAIBridgeInterceptionsTelemetrySummary(ctx, arg) + m.queryLatencies.WithLabelValues("CalculateAIBridgeInterceptionsTelemetrySummary").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m queryMetricsStore) ClaimPrebuiltWorkspace(ctx context.Context, arg database.ClaimPrebuiltWorkspaceParams) (database.ClaimPrebuiltWorkspaceRow, error) { start := time.Now() r0, r1 := m.s.ClaimPrebuiltWorkspace(ctx, arg) @@ -403,6 +410,13 @@ func (m queryMetricsStore) DeleteOldProvisionerDaemons(ctx context.Context) erro return r0 } +func (m queryMetricsStore) DeleteOldTelemetryLocks(ctx context.Context, periodEndingAtBefore time.Time) error { + start := time.Now() + r0 := m.s.DeleteOldTelemetryLocks(ctx, periodEndingAtBefore) + m.queryLatencies.WithLabelValues("DeleteOldTelemetryLocks").Observe(time.Since(start).Seconds()) + return r0 +} + func (m queryMetricsStore) DeleteOldWorkspaceAgentLogs(ctx context.Context, arg time.Time) error { start := time.Now() r0 := m.s.DeleteOldWorkspaceAgentLogs(ctx, arg) @@ -2517,6 +2531,13 @@ func (m queryMetricsStore) InsertTelemetryItemIfNotExists(ctx context.Context, a return r0 } +func (m queryMetricsStore) InsertTelemetryLock(ctx context.Context, arg database.InsertTelemetryLockParams) error { + start := time.Now() + r0 := m.s.InsertTelemetryLock(ctx, arg) + m.queryLatencies.WithLabelValues("InsertTelemetryLock").Observe(time.Since(start).Seconds()) + return r0 +} + func (m queryMetricsStore) InsertTemplate(ctx context.Context, arg database.InsertTemplateParams) error { start := time.Now() err := m.s.InsertTemplate(ctx, arg) @@ -2734,6 +2755,13 @@ func (m queryMetricsStore) ListAIBridgeInterceptions(ctx context.Context, arg da return r0, r1 } +func (m queryMetricsStore) ListAIBridgeInterceptionsTelemetrySummaries(ctx context.Context, arg database.ListAIBridgeInterceptionsTelemetrySummariesParams) ([]database.ListAIBridgeInterceptionsTelemetrySummariesRow, error) { + start := time.Now() + r0, r1 := m.s.ListAIBridgeInterceptionsTelemetrySummaries(ctx, arg) + m.queryLatencies.WithLabelValues("ListAIBridgeInterceptionsTelemetrySummaries").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m queryMetricsStore) ListAIBridgeTokenUsagesByInterceptionIDs(ctx context.Context, interceptionIds []uuid.UUID) ([]database.AIBridgeTokenUsage, error) { start := time.Now() r0, r1 := m.s.ListAIBridgeTokenUsagesByInterceptionIDs(ctx, interceptionIds) diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index a558c41dac..1983092aa5 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -190,6 +190,21 @@ func (mr *MockStoreMockRecorder) BulkMarkNotificationMessagesSent(ctx, arg any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkMarkNotificationMessagesSent", reflect.TypeOf((*MockStore)(nil).BulkMarkNotificationMessagesSent), ctx, arg) } +// CalculateAIBridgeInterceptionsTelemetrySummary mocks base method. +func (m *MockStore) CalculateAIBridgeInterceptionsTelemetrySummary(ctx context.Context, arg database.CalculateAIBridgeInterceptionsTelemetrySummaryParams) (database.CalculateAIBridgeInterceptionsTelemetrySummaryRow, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CalculateAIBridgeInterceptionsTelemetrySummary", ctx, arg) + ret0, _ := ret[0].(database.CalculateAIBridgeInterceptionsTelemetrySummaryRow) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CalculateAIBridgeInterceptionsTelemetrySummary indicates an expected call of CalculateAIBridgeInterceptionsTelemetrySummary. +func (mr *MockStoreMockRecorder) CalculateAIBridgeInterceptionsTelemetrySummary(ctx, arg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CalculateAIBridgeInterceptionsTelemetrySummary", reflect.TypeOf((*MockStore)(nil).CalculateAIBridgeInterceptionsTelemetrySummary), ctx, arg) +} + // ClaimPrebuiltWorkspace mocks base method. func (m *MockStore) ClaimPrebuiltWorkspace(ctx context.Context, arg database.ClaimPrebuiltWorkspaceParams) (database.ClaimPrebuiltWorkspaceRow, error) { m.ctrl.T.Helper() @@ -736,6 +751,20 @@ func (mr *MockStoreMockRecorder) DeleteOldProvisionerDaemons(ctx any) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOldProvisionerDaemons", reflect.TypeOf((*MockStore)(nil).DeleteOldProvisionerDaemons), ctx) } +// DeleteOldTelemetryLocks mocks base method. +func (m *MockStore) DeleteOldTelemetryLocks(ctx context.Context, periodEndingAtBefore time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteOldTelemetryLocks", ctx, periodEndingAtBefore) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteOldTelemetryLocks indicates an expected call of DeleteOldTelemetryLocks. +func (mr *MockStoreMockRecorder) DeleteOldTelemetryLocks(ctx, periodEndingAtBefore any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOldTelemetryLocks", reflect.TypeOf((*MockStore)(nil).DeleteOldTelemetryLocks), ctx, periodEndingAtBefore) +} + // DeleteOldWorkspaceAgentLogs mocks base method. func (m *MockStore) DeleteOldWorkspaceAgentLogs(ctx context.Context, threshold time.Time) error { m.ctrl.T.Helper() @@ -5392,6 +5421,20 @@ func (mr *MockStoreMockRecorder) InsertTelemetryItemIfNotExists(ctx, arg any) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertTelemetryItemIfNotExists", reflect.TypeOf((*MockStore)(nil).InsertTelemetryItemIfNotExists), ctx, arg) } +// InsertTelemetryLock mocks base method. +func (m *MockStore) InsertTelemetryLock(ctx context.Context, arg database.InsertTelemetryLockParams) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertTelemetryLock", ctx, arg) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertTelemetryLock indicates an expected call of InsertTelemetryLock. +func (mr *MockStoreMockRecorder) InsertTelemetryLock(ctx, arg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertTelemetryLock", reflect.TypeOf((*MockStore)(nil).InsertTelemetryLock), ctx, arg) +} + // InsertTemplate mocks base method. func (m *MockStore) InsertTemplate(ctx context.Context, arg database.InsertTemplateParams) error { m.ctrl.T.Helper() @@ -5847,6 +5890,21 @@ func (mr *MockStoreMockRecorder) ListAIBridgeInterceptions(ctx, arg any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAIBridgeInterceptions", reflect.TypeOf((*MockStore)(nil).ListAIBridgeInterceptions), ctx, arg) } +// ListAIBridgeInterceptionsTelemetrySummaries mocks base method. +func (m *MockStore) ListAIBridgeInterceptionsTelemetrySummaries(ctx context.Context, arg database.ListAIBridgeInterceptionsTelemetrySummariesParams) ([]database.ListAIBridgeInterceptionsTelemetrySummariesRow, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListAIBridgeInterceptionsTelemetrySummaries", ctx, arg) + ret0, _ := ret[0].([]database.ListAIBridgeInterceptionsTelemetrySummariesRow) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListAIBridgeInterceptionsTelemetrySummaries indicates an expected call of ListAIBridgeInterceptionsTelemetrySummaries. +func (mr *MockStoreMockRecorder) ListAIBridgeInterceptionsTelemetrySummaries(ctx, arg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAIBridgeInterceptionsTelemetrySummaries", reflect.TypeOf((*MockStore)(nil).ListAIBridgeInterceptionsTelemetrySummaries), ctx, arg) +} + // ListAIBridgeTokenUsagesByInterceptionIDs mocks base method. func (m *MockStore) ListAIBridgeTokenUsagesByInterceptionIDs(ctx context.Context, interceptionIds []uuid.UUID) ([]database.AIBridgeTokenUsage, error) { m.ctrl.T.Helper() diff --git a/coderd/database/dbpurge/dbpurge.go b/coderd/database/dbpurge/dbpurge.go index b9e0023f5a..067fe1f049 100644 --- a/coderd/database/dbpurge/dbpurge.go +++ b/coderd/database/dbpurge/dbpurge.go @@ -24,6 +24,12 @@ const ( // but we won't touch the `connection_logs` table. maxAuditLogConnectionEventAge = 90 * 24 * time.Hour // 90 days auditLogConnectionEventBatchSize = 1000 + // Telemetry heartbeats are used to deduplicate events across replicas. We + // don't need to persist heartbeat rows for longer than 24 hours, as they + // are only used for deduplication across replicas. The time needs to be + // long enough to cover the maximum interval of a heartbeat event (currently + // 1 hour) plus some buffer. + maxTelemetryHeartbeatAge = 24 * time.Hour ) // New creates a new periodically purging database instance. @@ -71,6 +77,10 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz. if err := tx.ExpirePrebuildsAPIKeys(ctx, dbtime.Time(start)); err != nil { return xerrors.Errorf("failed to expire prebuilds user api keys: %w", err) } + deleteOldTelemetryLocksBefore := start.Add(-maxTelemetryHeartbeatAge) + if err := tx.DeleteOldTelemetryLocks(ctx, deleteOldTelemetryLocksBefore); err != nil { + return xerrors.Errorf("failed to delete old telemetry locks: %w", err) + } deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge) if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{ diff --git a/coderd/database/dbpurge/dbpurge_test.go b/coderd/database/dbpurge/dbpurge_test.go index 02efd4a81f..74bf36639f 100644 --- a/coderd/database/dbpurge/dbpurge_test.go +++ b/coderd/database/dbpurge/dbpurge_test.go @@ -704,3 +704,56 @@ func TestExpireOldAPIKeys(t *testing.T) { // Out of an abundance of caution, we do not expire explicitly named prebuilds API keys. assertKeyActive(namedPrebuildsAPIKey.ID) } + +func TestDeleteOldTelemetryHeartbeats(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitLong) + + db, _, sqlDB := dbtestutil.NewDBWithSQLDB(t, dbtestutil.WithDumpOnFailure()) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}) + clk := quartz.NewMock(t) + now := clk.Now().UTC() + + // Insert telemetry heartbeats. + err := db.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{ + EventType: "aibridge_interceptions_summary", + PeriodEndingAt: now.Add(-25 * time.Hour), // should be purged + }) + require.NoError(t, err) + err = db.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{ + EventType: "aibridge_interceptions_summary", + PeriodEndingAt: now.Add(-23 * time.Hour), // should be kept + }) + require.NoError(t, err) + err = db.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{ + EventType: "aibridge_interceptions_summary", + PeriodEndingAt: now, // should be kept + }) + require.NoError(t, err) + + done := awaitDoTick(ctx, t, clk) + closer := dbpurge.New(ctx, logger, db, clk) + defer closer.Close() + <-done // doTick() has now run. + + require.Eventuallyf(t, func() bool { + // We use an SQL queries directly here because we don't expose queries + // for deleting heartbeats in the application code. + var totalCount int + err := sqlDB.QueryRowContext(ctx, ` + SELECT COUNT(*) FROM telemetry_locks; + `).Scan(&totalCount) + assert.NoError(t, err) + + var oldCount int + err = sqlDB.QueryRowContext(ctx, ` + SELECT COUNT(*) FROM telemetry_locks WHERE period_ending_at < $1; + `, now.Add(-24*time.Hour)).Scan(&oldCount) + assert.NoError(t, err) + + // Expect 2 heartbeats remaining and none older than 24 hours. + t.Logf("eventually: total count: %d, old count: %d", totalCount, oldCount) + return totalCount == 2 && oldCount == 0 + }, testutil.WaitShort, testutil.IntervalFast, "it should delete old telemetry heartbeats") +} diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 17ba8442f4..b61b678bba 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -2012,6 +2012,18 @@ CREATE TABLE telemetry_items ( updated_at timestamp with time zone DEFAULT now() NOT NULL ); +CREATE TABLE telemetry_locks ( + event_type text NOT NULL, + period_ending_at timestamp with time zone NOT NULL, + CONSTRAINT telemetry_lock_event_type_constraint CHECK ((event_type = 'aibridge_interceptions_summary'::text)) +); + +COMMENT ON TABLE telemetry_locks IS 'Telemetry lock tracking table for deduplication of heartbeat events across replicas.'; + +COMMENT ON COLUMN telemetry_locks.event_type IS 'The type of event that was sent.'; + +COMMENT ON COLUMN telemetry_locks.period_ending_at IS 'The heartbeat period end timestamp.'; + CREATE TABLE template_usage_stats ( start_time timestamp with time zone NOT NULL, end_time timestamp with time zone NOT NULL, @@ -3090,6 +3102,9 @@ ALTER TABLE ONLY tasks ALTER TABLE ONLY telemetry_items ADD CONSTRAINT telemetry_items_pkey PRIMARY KEY (key); +ALTER TABLE ONLY telemetry_locks + ADD CONSTRAINT telemetry_locks_pkey PRIMARY KEY (event_type, period_ending_at); + ALTER TABLE ONLY template_usage_stats ADD CONSTRAINT template_usage_stats_pkey PRIMARY KEY (start_time, template_id, user_id); @@ -3315,6 +3330,8 @@ CREATE INDEX idx_tailnet_tunnels_dst_id ON tailnet_tunnels USING hash (dst_id); CREATE INDEX idx_tailnet_tunnels_src_id ON tailnet_tunnels USING hash (src_id); +CREATE INDEX idx_telemetry_locks_period_ending_at ON telemetry_locks USING btree (period_ending_at); + CREATE UNIQUE INDEX idx_template_version_presets_default ON template_version_presets USING btree (template_version_id) WHERE (is_default = true); CREATE INDEX idx_template_versions_has_ai_task ON template_versions USING btree (has_ai_task); diff --git a/coderd/database/migrations/000390_telemetry_locks.down.sql b/coderd/database/migrations/000390_telemetry_locks.down.sql new file mode 100644 index 0000000000..b9ba97839f --- /dev/null +++ b/coderd/database/migrations/000390_telemetry_locks.down.sql @@ -0,0 +1 @@ +DROP TABLE telemetry_locks; diff --git a/coderd/database/migrations/000390_telemetry_locks.up.sql b/coderd/database/migrations/000390_telemetry_locks.up.sql new file mode 100644 index 0000000000..f791c83ba7 --- /dev/null +++ b/coderd/database/migrations/000390_telemetry_locks.up.sql @@ -0,0 +1,12 @@ +CREATE TABLE telemetry_locks ( + event_type TEXT NOT NULL CONSTRAINT telemetry_lock_event_type_constraint CHECK (event_type IN ('aibridge_interceptions_summary')), + period_ending_at TIMESTAMP WITH TIME ZONE NOT NULL, + + PRIMARY KEY (event_type, period_ending_at) +); + +COMMENT ON TABLE telemetry_locks IS 'Telemetry lock tracking table for deduplication of heartbeat events across replicas.'; +COMMENT ON COLUMN telemetry_locks.event_type IS 'The type of event that was sent.'; +COMMENT ON COLUMN telemetry_locks.period_ending_at IS 'The heartbeat period end timestamp.'; + +CREATE INDEX idx_telemetry_locks_period_ending_at ON telemetry_locks (period_ending_at); diff --git a/coderd/database/migrations/testdata/fixtures/000390_telemetry_locks.up.sql b/coderd/database/migrations/testdata/fixtures/000390_telemetry_locks.up.sql new file mode 100644 index 0000000000..f41f45a732 --- /dev/null +++ b/coderd/database/migrations/testdata/fixtures/000390_telemetry_locks.up.sql @@ -0,0 +1,8 @@ +INSERT INTO telemetry_locks ( + event_type, + period_ending_at +) +VALUES ( + 'aibridge_interceptions_summary', + '2025-01-01 00:00:00+00'::timestamptz +); diff --git a/coderd/database/models.go b/coderd/database/models.go index d5db3a3819..b3d41b25f9 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -4250,6 +4250,14 @@ type TelemetryItem struct { UpdatedAt time.Time `db:"updated_at" json:"updated_at"` } +// Telemetry lock tracking table for deduplication of heartbeat events across replicas. +type TelemetryLock struct { + // The type of event that was sent. + EventType string `db:"event_type" json:"event_type"` + // The heartbeat period end timestamp. + PeriodEndingAt time.Time `db:"period_ending_at" json:"period_ending_at"` +} + // Joins in the display name information such as username, avatar, and organization name. type Template struct { ID uuid.UUID `db:"id" json:"id"` diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 617959cb5b..2b96823028 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -60,6 +60,9 @@ type sqlcQuerier interface { BatchUpdateWorkspaceNextStartAt(ctx context.Context, arg BatchUpdateWorkspaceNextStartAtParams) error BulkMarkNotificationMessagesFailed(ctx context.Context, arg BulkMarkNotificationMessagesFailedParams) (int64, error) BulkMarkNotificationMessagesSent(ctx context.Context, arg BulkMarkNotificationMessagesSentParams) (int64, error) + // Calculates the telemetry summary for a given provider, model, and client + // combination for telemetry reporting. + CalculateAIBridgeInterceptionsTelemetrySummary(ctx context.Context, arg CalculateAIBridgeInterceptionsTelemetrySummaryParams) (CalculateAIBridgeInterceptionsTelemetrySummaryRow, error) ClaimPrebuiltWorkspace(ctx context.Context, arg ClaimPrebuiltWorkspaceParams) (ClaimPrebuiltWorkspaceRow, error) CleanTailnetCoordinators(ctx context.Context) error CleanTailnetLostPeers(ctx context.Context) error @@ -107,6 +110,8 @@ type sqlcQuerier interface { // A provisioner daemon with "zeroed" last_seen_at column indicates possible // connectivity issues (no provisioner daemon activity since registration). DeleteOldProvisionerDaemons(ctx context.Context) error + // Deletes old telemetry locks from the telemetry_locks table. + DeleteOldTelemetryLocks(ctx context.Context, periodEndingAtBefore time.Time) error // If an agent hasn't connected in the last 7 days, we purge it's logs. // Exception: if the logs are related to the latest build, we keep those around. // Logs can take up a lot of space, so it's important we clean up frequently. @@ -559,6 +564,12 @@ type sqlcQuerier interface { InsertReplica(ctx context.Context, arg InsertReplicaParams) (Replica, error) InsertTask(ctx context.Context, arg InsertTaskParams) (TaskTable, error) InsertTelemetryItemIfNotExists(ctx context.Context, arg InsertTelemetryItemIfNotExistsParams) error + // Inserts a new lock row into the telemetry_locks table. Replicas should call + // this function prior to attempting to generate or publish a heartbeat event to + // the telemetry service. + // If the query returns a duplicate primary key error, the replica should not + // attempt to generate or publish the event to the telemetry service. + InsertTelemetryLock(ctx context.Context, arg InsertTelemetryLockParams) error InsertTemplate(ctx context.Context, arg InsertTemplateParams) error InsertTemplateVersion(ctx context.Context, arg InsertTemplateVersionParams) error InsertTemplateVersionParameter(ctx context.Context, arg InsertTemplateVersionParameterParams) (TemplateVersionParameter, error) @@ -595,6 +606,9 @@ type sqlcQuerier interface { InsertWorkspaceResource(ctx context.Context, arg InsertWorkspaceResourceParams) (WorkspaceResource, error) InsertWorkspaceResourceMetadata(ctx context.Context, arg InsertWorkspaceResourceMetadataParams) ([]WorkspaceResourceMetadatum, error) ListAIBridgeInterceptions(ctx context.Context, arg ListAIBridgeInterceptionsParams) ([]ListAIBridgeInterceptionsRow, error) + // Finds all unique AIBridge interception telemetry summaries combinations + // (provider, model, client) in the given timeframe for telemetry reporting. + ListAIBridgeInterceptionsTelemetrySummaries(ctx context.Context, arg ListAIBridgeInterceptionsTelemetrySummariesParams) ([]ListAIBridgeInterceptionsTelemetrySummariesRow, error) ListAIBridgeTokenUsagesByInterceptionIDs(ctx context.Context, interceptionIds []uuid.UUID) ([]AIBridgeTokenUsage, error) ListAIBridgeToolUsagesByInterceptionIDs(ctx context.Context, interceptionIds []uuid.UUID) ([]AIBridgeToolUsage, error) ListAIBridgeUserPromptsByInterceptionIDs(ctx context.Context, interceptionIds []uuid.UUID) ([]AIBridgeUserPrompt, error) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index d7bb027628..e6dfa9afd0 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -111,6 +111,164 @@ func (q *sqlQuerier) ActivityBumpWorkspace(ctx context.Context, arg ActivityBump return err } +const calculateAIBridgeInterceptionsTelemetrySummary = `-- name: CalculateAIBridgeInterceptionsTelemetrySummary :one +WITH interceptions_in_range AS ( + -- Get all matching interceptions in the given timeframe. + SELECT + id, + initiator_id, + (ended_at - started_at) AS duration + FROM + aibridge_interceptions + WHERE + provider = $1::text + AND model = $2::text + -- TODO: use the client value once we have it (see https://github.com/coder/aibridge/issues/31) + AND 'unknown' = $3::text + AND ended_at IS NOT NULL -- incomplete interceptions are not included in summaries + AND ended_at >= $4::timestamptz + AND ended_at < $5::timestamptz +), +interception_counts AS ( + SELECT + COUNT(id) AS interception_count, + COUNT(DISTINCT initiator_id) AS unique_initiator_count + FROM + interceptions_in_range +), +duration_percentiles AS ( + SELECT + (COALESCE(PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p50_millis, + (COALESCE(PERCENTILE_CONT(0.90) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p90_millis, + (COALESCE(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p95_millis, + (COALESCE(PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p99_millis + FROM + interceptions_in_range +), +token_aggregates AS ( + SELECT + COALESCE(SUM(tu.input_tokens), 0) AS token_count_input, + COALESCE(SUM(tu.output_tokens), 0) AS token_count_output, + -- Cached tokens are stored in metadata JSON, extract if available. + -- Read tokens may be stored in: + -- - cache_read_input (Anthropic) + -- - prompt_cached (OpenAI) + COALESCE(SUM( + COALESCE((tu.metadata->>'cache_read_input')::bigint, 0) + + COALESCE((tu.metadata->>'prompt_cached')::bigint, 0) + ), 0) AS token_count_cached_read, + -- Written tokens may be stored in: + -- - cache_creation_input (Anthropic) + -- Note that cache_ephemeral_5m_input and cache_ephemeral_1h_input on + -- Anthropic are included in the cache_creation_input field. + COALESCE(SUM( + COALESCE((tu.metadata->>'cache_creation_input')::bigint, 0) + ), 0) AS token_count_cached_written, + COUNT(tu.id) AS token_usages_count + FROM + interceptions_in_range i + LEFT JOIN + aibridge_token_usages tu ON i.id = tu.interception_id +), +prompt_aggregates AS ( + SELECT + COUNT(up.id) AS user_prompts_count + FROM + interceptions_in_range i + LEFT JOIN + aibridge_user_prompts up ON i.id = up.interception_id +), +tool_aggregates AS ( + SELECT + COUNT(tu.id) FILTER (WHERE tu.injected = true) AS tool_calls_count_injected, + COUNT(tu.id) FILTER (WHERE tu.injected = false) AS tool_calls_count_non_injected, + COUNT(tu.id) FILTER (WHERE tu.injected = true AND tu.invocation_error IS NOT NULL) AS injected_tool_call_error_count + FROM + interceptions_in_range i + LEFT JOIN + aibridge_tool_usages tu ON i.id = tu.interception_id +) +SELECT + ic.interception_count::bigint AS interception_count, + dp.interception_duration_p50_millis::bigint AS interception_duration_p50_millis, + dp.interception_duration_p90_millis::bigint AS interception_duration_p90_millis, + dp.interception_duration_p95_millis::bigint AS interception_duration_p95_millis, + dp.interception_duration_p99_millis::bigint AS interception_duration_p99_millis, + ic.unique_initiator_count::bigint AS unique_initiator_count, + pa.user_prompts_count::bigint AS user_prompts_count, + tok_agg.token_usages_count::bigint AS token_usages_count, + tok_agg.token_count_input::bigint AS token_count_input, + tok_agg.token_count_output::bigint AS token_count_output, + tok_agg.token_count_cached_read::bigint AS token_count_cached_read, + tok_agg.token_count_cached_written::bigint AS token_count_cached_written, + tool_agg.tool_calls_count_injected::bigint AS tool_calls_count_injected, + tool_agg.tool_calls_count_non_injected::bigint AS tool_calls_count_non_injected, + tool_agg.injected_tool_call_error_count::bigint AS injected_tool_call_error_count +FROM + interception_counts ic, + duration_percentiles dp, + token_aggregates tok_agg, + prompt_aggregates pa, + tool_aggregates tool_agg +` + +type CalculateAIBridgeInterceptionsTelemetrySummaryParams struct { + Provider string `db:"provider" json:"provider"` + Model string `db:"model" json:"model"` + Client string `db:"client" json:"client"` + EndedAtAfter time.Time `db:"ended_at_after" json:"ended_at_after"` + EndedAtBefore time.Time `db:"ended_at_before" json:"ended_at_before"` +} + +type CalculateAIBridgeInterceptionsTelemetrySummaryRow struct { + InterceptionCount int64 `db:"interception_count" json:"interception_count"` + InterceptionDurationP50Millis int64 `db:"interception_duration_p50_millis" json:"interception_duration_p50_millis"` + InterceptionDurationP90Millis int64 `db:"interception_duration_p90_millis" json:"interception_duration_p90_millis"` + InterceptionDurationP95Millis int64 `db:"interception_duration_p95_millis" json:"interception_duration_p95_millis"` + InterceptionDurationP99Millis int64 `db:"interception_duration_p99_millis" json:"interception_duration_p99_millis"` + UniqueInitiatorCount int64 `db:"unique_initiator_count" json:"unique_initiator_count"` + UserPromptsCount int64 `db:"user_prompts_count" json:"user_prompts_count"` + TokenUsagesCount int64 `db:"token_usages_count" json:"token_usages_count"` + TokenCountInput int64 `db:"token_count_input" json:"token_count_input"` + TokenCountOutput int64 `db:"token_count_output" json:"token_count_output"` + TokenCountCachedRead int64 `db:"token_count_cached_read" json:"token_count_cached_read"` + TokenCountCachedWritten int64 `db:"token_count_cached_written" json:"token_count_cached_written"` + ToolCallsCountInjected int64 `db:"tool_calls_count_injected" json:"tool_calls_count_injected"` + ToolCallsCountNonInjected int64 `db:"tool_calls_count_non_injected" json:"tool_calls_count_non_injected"` + InjectedToolCallErrorCount int64 `db:"injected_tool_call_error_count" json:"injected_tool_call_error_count"` +} + +// Calculates the telemetry summary for a given provider, model, and client +// combination for telemetry reporting. +func (q *sqlQuerier) CalculateAIBridgeInterceptionsTelemetrySummary(ctx context.Context, arg CalculateAIBridgeInterceptionsTelemetrySummaryParams) (CalculateAIBridgeInterceptionsTelemetrySummaryRow, error) { + row := q.db.QueryRowContext(ctx, calculateAIBridgeInterceptionsTelemetrySummary, + arg.Provider, + arg.Model, + arg.Client, + arg.EndedAtAfter, + arg.EndedAtBefore, + ) + var i CalculateAIBridgeInterceptionsTelemetrySummaryRow + err := row.Scan( + &i.InterceptionCount, + &i.InterceptionDurationP50Millis, + &i.InterceptionDurationP90Millis, + &i.InterceptionDurationP95Millis, + &i.InterceptionDurationP99Millis, + &i.UniqueInitiatorCount, + &i.UserPromptsCount, + &i.TokenUsagesCount, + &i.TokenCountInput, + &i.TokenCountOutput, + &i.TokenCountCachedRead, + &i.TokenCountCachedWritten, + &i.ToolCallsCountInjected, + &i.ToolCallsCountNonInjected, + &i.InjectedToolCallErrorCount, + ) + return i, err +} + const countAIBridgeInterceptions = `-- name: CountAIBridgeInterceptions :one SELECT COUNT(*) @@ -647,6 +805,57 @@ func (q *sqlQuerier) ListAIBridgeInterceptions(ctx context.Context, arg ListAIBr return items, nil } +const listAIBridgeInterceptionsTelemetrySummaries = `-- name: ListAIBridgeInterceptionsTelemetrySummaries :many +SELECT + DISTINCT ON (provider, model, client) + provider, + model, + -- TODO: use the client value once we have it (see https://github.com/coder/aibridge/issues/31) + 'unknown' AS client +FROM + aibridge_interceptions +WHERE + ended_at IS NOT NULL -- incomplete interceptions are not included in summaries + AND ended_at >= $1::timestamptz + AND ended_at < $2::timestamptz +` + +type ListAIBridgeInterceptionsTelemetrySummariesParams struct { + EndedAtAfter time.Time `db:"ended_at_after" json:"ended_at_after"` + EndedAtBefore time.Time `db:"ended_at_before" json:"ended_at_before"` +} + +type ListAIBridgeInterceptionsTelemetrySummariesRow struct { + Provider string `db:"provider" json:"provider"` + Model string `db:"model" json:"model"` + Client string `db:"client" json:"client"` +} + +// Finds all unique AIBridge interception telemetry summaries combinations +// (provider, model, client) in the given timeframe for telemetry reporting. +func (q *sqlQuerier) ListAIBridgeInterceptionsTelemetrySummaries(ctx context.Context, arg ListAIBridgeInterceptionsTelemetrySummariesParams) ([]ListAIBridgeInterceptionsTelemetrySummariesRow, error) { + rows, err := q.db.QueryContext(ctx, listAIBridgeInterceptionsTelemetrySummaries, arg.EndedAtAfter, arg.EndedAtBefore) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListAIBridgeInterceptionsTelemetrySummariesRow + for rows.Next() { + var i ListAIBridgeInterceptionsTelemetrySummariesRow + if err := rows.Scan(&i.Provider, &i.Model, &i.Client); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listAIBridgeTokenUsagesByInterceptionIDs = `-- name: ListAIBridgeTokenUsagesByInterceptionIDs :many SELECT id, interception_id, provider_response_id, input_tokens, output_tokens, metadata, created_at @@ -13064,6 +13273,41 @@ func (q *sqlQuerier) UpsertTelemetryItem(ctx context.Context, arg UpsertTelemetr return err } +const deleteOldTelemetryLocks = `-- name: DeleteOldTelemetryLocks :exec +DELETE FROM + telemetry_locks +WHERE + period_ending_at < $1::timestamptz +` + +// Deletes old telemetry locks from the telemetry_locks table. +func (q *sqlQuerier) DeleteOldTelemetryLocks(ctx context.Context, periodEndingAtBefore time.Time) error { + _, err := q.db.ExecContext(ctx, deleteOldTelemetryLocks, periodEndingAtBefore) + return err +} + +const insertTelemetryLock = `-- name: InsertTelemetryLock :exec +INSERT INTO + telemetry_locks (event_type, period_ending_at) +VALUES + ($1, $2) +` + +type InsertTelemetryLockParams struct { + EventType string `db:"event_type" json:"event_type"` + PeriodEndingAt time.Time `db:"period_ending_at" json:"period_ending_at"` +} + +// Inserts a new lock row into the telemetry_locks table. Replicas should call +// this function prior to attempting to generate or publish a heartbeat event to +// the telemetry service. +// If the query returns a duplicate primary key error, the replica should not +// attempt to generate or publish the event to the telemetry service. +func (q *sqlQuerier) InsertTelemetryLock(ctx context.Context, arg InsertTelemetryLockParams) error { + _, err := q.db.ExecContext(ctx, insertTelemetryLock, arg.EventType, arg.PeriodEndingAt) + return err +} + const getTemplateAverageBuildTime = `-- name: GetTemplateAverageBuildTime :one WITH build_times AS ( SELECT diff --git a/coderd/database/queries/aibridge.sql b/coderd/database/queries/aibridge.sql index e32eb0ec29..fd5a9868bb 100644 --- a/coderd/database/queries/aibridge.sql +++ b/coderd/database/queries/aibridge.sql @@ -207,3 +207,122 @@ WHERE ORDER BY created_at ASC, id ASC; + +-- name: ListAIBridgeInterceptionsTelemetrySummaries :many +-- Finds all unique AIBridge interception telemetry summaries combinations +-- (provider, model, client) in the given timeframe for telemetry reporting. +SELECT + DISTINCT ON (provider, model, client) + provider, + model, + -- TODO: use the client value once we have it (see https://github.com/coder/aibridge/issues/31) + 'unknown' AS client +FROM + aibridge_interceptions +WHERE + ended_at IS NOT NULL -- incomplete interceptions are not included in summaries + AND ended_at >= @ended_at_after::timestamptz + AND ended_at < @ended_at_before::timestamptz; + +-- name: CalculateAIBridgeInterceptionsTelemetrySummary :one +-- Calculates the telemetry summary for a given provider, model, and client +-- combination for telemetry reporting. +WITH interceptions_in_range AS ( + -- Get all matching interceptions in the given timeframe. + SELECT + id, + initiator_id, + (ended_at - started_at) AS duration + FROM + aibridge_interceptions + WHERE + provider = @provider::text + AND model = @model::text + -- TODO: use the client value once we have it (see https://github.com/coder/aibridge/issues/31) + AND 'unknown' = @client::text + AND ended_at IS NOT NULL -- incomplete interceptions are not included in summaries + AND ended_at >= @ended_at_after::timestamptz + AND ended_at < @ended_at_before::timestamptz +), +interception_counts AS ( + SELECT + COUNT(id) AS interception_count, + COUNT(DISTINCT initiator_id) AS unique_initiator_count + FROM + interceptions_in_range +), +duration_percentiles AS ( + SELECT + (COALESCE(PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p50_millis, + (COALESCE(PERCENTILE_CONT(0.90) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p90_millis, + (COALESCE(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p95_millis, + (COALESCE(PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM duration)), 0) * 1000)::bigint AS interception_duration_p99_millis + FROM + interceptions_in_range +), +token_aggregates AS ( + SELECT + COALESCE(SUM(tu.input_tokens), 0) AS token_count_input, + COALESCE(SUM(tu.output_tokens), 0) AS token_count_output, + -- Cached tokens are stored in metadata JSON, extract if available. + -- Read tokens may be stored in: + -- - cache_read_input (Anthropic) + -- - prompt_cached (OpenAI) + COALESCE(SUM( + COALESCE((tu.metadata->>'cache_read_input')::bigint, 0) + + COALESCE((tu.metadata->>'prompt_cached')::bigint, 0) + ), 0) AS token_count_cached_read, + -- Written tokens may be stored in: + -- - cache_creation_input (Anthropic) + -- Note that cache_ephemeral_5m_input and cache_ephemeral_1h_input on + -- Anthropic are included in the cache_creation_input field. + COALESCE(SUM( + COALESCE((tu.metadata->>'cache_creation_input')::bigint, 0) + ), 0) AS token_count_cached_written, + COUNT(tu.id) AS token_usages_count + FROM + interceptions_in_range i + LEFT JOIN + aibridge_token_usages tu ON i.id = tu.interception_id +), +prompt_aggregates AS ( + SELECT + COUNT(up.id) AS user_prompts_count + FROM + interceptions_in_range i + LEFT JOIN + aibridge_user_prompts up ON i.id = up.interception_id +), +tool_aggregates AS ( + SELECT + COUNT(tu.id) FILTER (WHERE tu.injected = true) AS tool_calls_count_injected, + COUNT(tu.id) FILTER (WHERE tu.injected = false) AS tool_calls_count_non_injected, + COUNT(tu.id) FILTER (WHERE tu.injected = true AND tu.invocation_error IS NOT NULL) AS injected_tool_call_error_count + FROM + interceptions_in_range i + LEFT JOIN + aibridge_tool_usages tu ON i.id = tu.interception_id +) +SELECT + ic.interception_count::bigint AS interception_count, + dp.interception_duration_p50_millis::bigint AS interception_duration_p50_millis, + dp.interception_duration_p90_millis::bigint AS interception_duration_p90_millis, + dp.interception_duration_p95_millis::bigint AS interception_duration_p95_millis, + dp.interception_duration_p99_millis::bigint AS interception_duration_p99_millis, + ic.unique_initiator_count::bigint AS unique_initiator_count, + pa.user_prompts_count::bigint AS user_prompts_count, + tok_agg.token_usages_count::bigint AS token_usages_count, + tok_agg.token_count_input::bigint AS token_count_input, + tok_agg.token_count_output::bigint AS token_count_output, + tok_agg.token_count_cached_read::bigint AS token_count_cached_read, + tok_agg.token_count_cached_written::bigint AS token_count_cached_written, + tool_agg.tool_calls_count_injected::bigint AS tool_calls_count_injected, + tool_agg.tool_calls_count_non_injected::bigint AS tool_calls_count_non_injected, + tool_agg.injected_tool_call_error_count::bigint AS injected_tool_call_error_count +FROM + interception_counts ic, + duration_percentiles dp, + token_aggregates tok_agg, + prompt_aggregates pa, + tool_aggregates tool_agg +; diff --git a/coderd/database/queries/telemetrylocks.sql b/coderd/database/queries/telemetrylocks.sql new file mode 100644 index 0000000000..14e9730a69 --- /dev/null +++ b/coderd/database/queries/telemetrylocks.sql @@ -0,0 +1,17 @@ +-- name: InsertTelemetryLock :exec +-- Inserts a new lock row into the telemetry_locks table. Replicas should call +-- this function prior to attempting to generate or publish a heartbeat event to +-- the telemetry service. +-- If the query returns a duplicate primary key error, the replica should not +-- attempt to generate or publish the event to the telemetry service. +INSERT INTO + telemetry_locks (event_type, period_ending_at) +VALUES + ($1, $2); + +-- name: DeleteOldTelemetryLocks :exec +-- Deletes old telemetry locks from the telemetry_locks table. +DELETE FROM + telemetry_locks +WHERE + period_ending_at < @period_ending_at_before::timestamptz; diff --git a/coderd/database/unique_constraint.go b/coderd/database/unique_constraint.go index 03e2efa093..b804d9a730 100644 --- a/coderd/database/unique_constraint.go +++ b/coderd/database/unique_constraint.go @@ -62,6 +62,7 @@ const ( UniqueTaskWorkspaceAppsPkey UniqueConstraint = "task_workspace_apps_pkey" // ALTER TABLE ONLY task_workspace_apps ADD CONSTRAINT task_workspace_apps_pkey PRIMARY KEY (task_id, workspace_build_number); UniqueTasksPkey UniqueConstraint = "tasks_pkey" // ALTER TABLE ONLY tasks ADD CONSTRAINT tasks_pkey PRIMARY KEY (id); UniqueTelemetryItemsPkey UniqueConstraint = "telemetry_items_pkey" // ALTER TABLE ONLY telemetry_items ADD CONSTRAINT telemetry_items_pkey PRIMARY KEY (key); + UniqueTelemetryLocksPkey UniqueConstraint = "telemetry_locks_pkey" // ALTER TABLE ONLY telemetry_locks ADD CONSTRAINT telemetry_locks_pkey PRIMARY KEY (event_type, period_ending_at); UniqueTemplateUsageStatsPkey UniqueConstraint = "template_usage_stats_pkey" // ALTER TABLE ONLY template_usage_stats ADD CONSTRAINT template_usage_stats_pkey PRIMARY KEY (start_time, template_id, user_id); UniqueTemplateVersionParametersTemplateVersionIDNameKey UniqueConstraint = "template_version_parameters_template_version_id_name_key" // ALTER TABLE ONLY template_version_parameters ADD CONSTRAINT template_version_parameters_template_version_id_name_key UNIQUE (template_version_id, name); UniqueTemplateVersionPresetParametersPkey UniqueConstraint = "template_version_preset_parameters_pkey" // ALTER TABLE ONLY template_version_preset_parameters ADD CONSTRAINT template_version_preset_parameters_pkey PRIMARY KEY (id); diff --git a/coderd/telemetry/telemetry.go b/coderd/telemetry/telemetry.go index 1526f51f16..19873f99ee 100644 --- a/coderd/telemetry/telemetry.go +++ b/coderd/telemetry/telemetry.go @@ -28,7 +28,6 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "cdr.dev/slog" - "github.com/coder/coder/v2/buildinfo" clitelemetry "github.com/coder/coder/v2/cli/telemetry" "github.com/coder/coder/v2/coderd/database" @@ -36,6 +35,7 @@ import ( "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/codersdk" tailnetproto "github.com/coder/coder/v2/tailnet/proto" + "github.com/coder/quartz" ) const ( @@ -48,6 +48,7 @@ type Options struct { Disabled bool Database database.Store Logger slog.Logger + Clock quartz.Clock // URL is an endpoint to direct telemetry towards! URL *url.URL Experiments codersdk.Experiments @@ -65,6 +66,9 @@ type Options struct { // Duplicate data will be sent, it's on the server-side to index by UUID. // Data is anonymized prior to being sent! func New(options Options) (Reporter, error) { + if options.Clock == nil { + options.Clock = quartz.NewReal() + } if options.SnapshotFrequency == 0 { // Report once every 30mins by default! options.SnapshotFrequency = 30 * time.Minute @@ -86,7 +90,7 @@ func New(options Options) (Reporter, error) { options: options, deploymentURL: deploymentURL, snapshotURL: snapshotURL, - startedAt: dbtime.Now(), + startedAt: dbtime.Time(options.Clock.Now()).UTC(), client: &http.Client{}, } go reporter.runSnapshotter() @@ -166,7 +170,7 @@ func (r *remoteReporter) Close() { return } close(r.closed) - now := dbtime.Now() + now := dbtime.Time(r.options.Clock.Now()).UTC() r.shutdownAt = &now if r.Enabled() { // Report a final collection of telemetry prior to close! @@ -412,7 +416,7 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) { ctx = r.ctx // For resources that grow in size very quickly (like workspace builds), // we only report events that occurred within the past hour. - createdAfter = dbtime.Now().Add(-1 * time.Hour) + createdAfter = dbtime.Time(r.options.Clock.Now().Add(-1 * time.Hour)).UTC() eg errgroup.Group snapshot = &Snapshot{ DeploymentID: r.options.DeploymentID, @@ -744,6 +748,14 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) { } return nil }) + eg.Go(func() error { + summaries, err := r.generateAIBridgeInterceptionsSummaries(ctx) + if err != nil { + return xerrors.Errorf("generate AIBridge interceptions telemetry summaries: %w", err) + } + snapshot.AIBridgeInterceptionsSummaries = summaries + return nil + }) err := eg.Wait() if err != nil { @@ -752,6 +764,76 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) { return snapshot, nil } +func (r *remoteReporter) generateAIBridgeInterceptionsSummaries(ctx context.Context) ([]AIBridgeInterceptionsSummary, error) { + // Get the current timeframe, which is the previous hour. + now := dbtime.Time(r.options.Clock.Now()).UTC() + endedAtBefore := now.Truncate(time.Hour) + endedAtAfter := endedAtBefore.Add(-1 * time.Hour) + + // Note: we don't use a transaction for this function since we do tolerate + // some errors, like duplicate lock rows, and we also calculate + // summaries in parallel. + + // Claim the heartbeat lock row for this hour. + err := r.options.Database.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{ + EventType: "aibridge_interceptions_summary", + PeriodEndingAt: endedAtBefore, + }) + if database.IsUniqueViolation(err, database.UniqueTelemetryLocksPkey) { + // Another replica has already claimed the lock row for this hour. + r.options.Logger.Debug(ctx, "aibridge interceptions telemetry lock already claimed for this hour by another replica, skipping", slog.F("period_ending_at", endedAtBefore)) + return nil, nil + } + if err != nil { + return nil, xerrors.Errorf("insert AIBridge interceptions telemetry lock (period_ending_at=%q): %w", endedAtBefore, err) + } + + // List the summary categories that need to be calculated. + summaryCategories, err := r.options.Database.ListAIBridgeInterceptionsTelemetrySummaries(ctx, database.ListAIBridgeInterceptionsTelemetrySummariesParams{ + EndedAtAfter: endedAtAfter, // inclusive + EndedAtBefore: endedAtBefore, // exclusive + }) + if err != nil { + return nil, xerrors.Errorf("list AIBridge interceptions telemetry summaries (startedAtAfter=%q, endedAtBefore=%q): %w", endedAtAfter, endedAtBefore, err) + } + + // Calculate and convert the summaries for all categories. + var ( + eg, egCtx = errgroup.WithContext(ctx) + mu sync.Mutex + summaries = make([]AIBridgeInterceptionsSummary, 0, len(summaryCategories)) + ) + for _, category := range summaryCategories { + eg.Go(func() error { + summary, err := r.options.Database.CalculateAIBridgeInterceptionsTelemetrySummary(egCtx, database.CalculateAIBridgeInterceptionsTelemetrySummaryParams{ + Provider: category.Provider, + Model: category.Model, + Client: category.Client, + EndedAtAfter: endedAtAfter, + EndedAtBefore: endedAtBefore, + }) + if err != nil { + return xerrors.Errorf("calculate AIBridge interceptions telemetry summary (provider=%q, model=%q, client=%q, startedAtAfter=%q, endedAtBefore=%q): %w", category.Provider, category.Model, category.Client, endedAtAfter, endedAtBefore, err) + } + + // Double check that at least one interception was found in the + // timeframe. + if summary.InterceptionCount == 0 { + return nil + } + + converted := ConvertAIBridgeInterceptionsSummary(endedAtBefore, category.Provider, category.Model, category.Client, summary) + + mu.Lock() + defer mu.Unlock() + summaries = append(summaries, converted) + return nil + }) + } + + return summaries, eg.Wait() +} + // ConvertAPIKey anonymizes an API key. func ConvertAPIKey(apiKey database.APIKey) APIKey { a := APIKey{ @@ -1223,6 +1305,7 @@ type Snapshot struct { TelemetryItems []TelemetryItem `json:"telemetry_items"` UserTailnetConnections []UserTailnetConnection `json:"user_tailnet_connections"` PrebuiltWorkspaces []PrebuiltWorkspace `json:"prebuilt_workspaces"` + AIBridgeInterceptionsSummaries []AIBridgeInterceptionsSummary `json:"aibridge_interceptions_summaries"` } // Deployment contains information about the host running Coder. @@ -1859,6 +1942,89 @@ type PrebuiltWorkspace struct { Count int `json:"count"` } +type AIBridgeInterceptionsSummaryDurationMillis struct { + P50 int64 `json:"p50"` + P90 int64 `json:"p90"` + P95 int64 `json:"p95"` + P99 int64 `json:"p99"` +} + +type AIBridgeInterceptionsSummaryTokenCount struct { + Input int64 `json:"input"` + Output int64 `json:"output"` + CachedRead int64 `json:"cached_read"` + CachedWritten int64 `json:"cached_written"` +} + +type AIBridgeInterceptionsSummaryToolCallsCount struct { + Injected int64 `json:"injected"` + NonInjected int64 `json:"non_injected"` +} + +// AIBridgeInterceptionsSummary is a summary of aggregated AI Bridge +// interception data over a period of 1 hour. We send a summary each hour for +// each unique provider + model + client combination. +type AIBridgeInterceptionsSummary struct { + ID uuid.UUID `json:"id"` + + // The end of the hour for which the summary is taken. This will always be a + // UTC timestamp truncated to the hour. + Timestamp time.Time `json:"timestamp"` + Provider string `json:"provider"` + Model string `json:"model"` + Client string `json:"client"` + + InterceptionCount int64 `json:"interception_count"` + InterceptionDurationMillis AIBridgeInterceptionsSummaryDurationMillis `json:"interception_duration_millis"` + + // Map of route to number of interceptions. + // e.g. "/v1/chat/completions:blocking", "/v1/chat/completions:streaming" + InterceptionsByRoute map[string]int64 `json:"interceptions_by_route"` + + UniqueInitiatorCount int64 `json:"unique_initiator_count"` + + UserPromptsCount int64 `json:"user_prompts_count"` + + TokenUsagesCount int64 `json:"token_usages_count"` + TokenCount AIBridgeInterceptionsSummaryTokenCount `json:"token_count"` + + ToolCallsCount AIBridgeInterceptionsSummaryToolCallsCount `json:"tool_calls_count"` + InjectedToolCallErrorCount int64 `json:"injected_tool_call_error_count"` +} + +func ConvertAIBridgeInterceptionsSummary(endTime time.Time, provider, model, client string, summary database.CalculateAIBridgeInterceptionsTelemetrySummaryRow) AIBridgeInterceptionsSummary { + return AIBridgeInterceptionsSummary{ + ID: uuid.New(), + Timestamp: endTime, + Provider: provider, + Model: model, + Client: client, + InterceptionCount: summary.InterceptionCount, + InterceptionDurationMillis: AIBridgeInterceptionsSummaryDurationMillis{ + P50: summary.InterceptionDurationP50Millis, + P90: summary.InterceptionDurationP90Millis, + P95: summary.InterceptionDurationP95Millis, + P99: summary.InterceptionDurationP99Millis, + }, + // TODO: currently we don't track by route + InterceptionsByRoute: make(map[string]int64), + UniqueInitiatorCount: summary.UniqueInitiatorCount, + UserPromptsCount: summary.UserPromptsCount, + TokenUsagesCount: summary.TokenUsagesCount, + TokenCount: AIBridgeInterceptionsSummaryTokenCount{ + Input: summary.TokenCountInput, + Output: summary.TokenCountOutput, + CachedRead: summary.TokenCountCachedRead, + CachedWritten: summary.TokenCountCachedWritten, + }, + ToolCallsCount: AIBridgeInterceptionsSummaryToolCallsCount{ + Injected: summary.ToolCallsCountInjected, + NonInjected: summary.ToolCallsCountNonInjected, + }, + InjectedToolCallErrorCount: summary.InjectedToolCallErrorCount, + } +} + type noopReporter struct{} func (*noopReporter) Report(_ *Snapshot) {} diff --git a/coderd/telemetry/telemetry_test.go b/coderd/telemetry/telemetry_test.go index a48bdee299..dede229acd 100644 --- a/coderd/telemetry/telemetry_test.go +++ b/coderd/telemetry/telemetry_test.go @@ -28,6 +28,7 @@ import ( "github.com/coder/coder/v2/coderd/telemetry" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/testutil" + "github.com/coder/quartz" ) func TestMain(m *testing.M) { @@ -44,6 +45,7 @@ func TestTelemetry(t *testing.T) { db, _ := dbtestutil.NewDB(t) ctx := testutil.Context(t, testutil.WaitMedium) + now := dbtime.Now() org, err := db.GetDefaultOrganization(ctx) require.NoError(t, err) @@ -208,12 +210,88 @@ func TestTelemetry(t *testing.T) { AgentID: wsagent.ID, }) - _, snapshot := collectSnapshot(ctx, t, db, nil) + previousAIBridgeInterceptionPeriod := now.Truncate(time.Hour) + user2 := dbgen.User(t, db, database.User{}) + aiBridgeInterception1 := dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{ + InitiatorID: user.ID, + Provider: "anthropic", + Model: "deanseek", + StartedAt: previousAIBridgeInterceptionPeriod.Add(-30 * time.Minute), + }, nil) + _ = dbgen.AIBridgeTokenUsage(t, db, database.InsertAIBridgeTokenUsageParams{ + InterceptionID: aiBridgeInterception1.ID, + InputTokens: 100, + OutputTokens: 200, + Metadata: json.RawMessage(`{"cache_read_input":300,"cache_creation_input":400}`), + }) + _ = dbgen.AIBridgeUserPrompt(t, db, database.InsertAIBridgeUserPromptParams{ + InterceptionID: aiBridgeInterception1.ID, + }) + _ = dbgen.AIBridgeToolUsage(t, db, database.InsertAIBridgeToolUsageParams{ + InterceptionID: aiBridgeInterception1.ID, + Injected: true, + InvocationError: sql.NullString{String: "error1", Valid: true}, + }) + _, err = db.UpdateAIBridgeInterceptionEnded(ctx, database.UpdateAIBridgeInterceptionEndedParams{ + ID: aiBridgeInterception1.ID, + EndedAt: aiBridgeInterception1.StartedAt.Add(1 * time.Minute), // 1 minute duration + }) + require.NoError(t, err) + aiBridgeInterception2 := dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{ + InitiatorID: user2.ID, + Provider: aiBridgeInterception1.Provider, + Model: aiBridgeInterception1.Model, + StartedAt: aiBridgeInterception1.StartedAt, + }, nil) + _ = dbgen.AIBridgeTokenUsage(t, db, database.InsertAIBridgeTokenUsageParams{ + InterceptionID: aiBridgeInterception2.ID, + InputTokens: 100, + OutputTokens: 200, + Metadata: json.RawMessage(`{"cache_read_input":300,"cache_creation_input":400}`), + }) + _ = dbgen.AIBridgeUserPrompt(t, db, database.InsertAIBridgeUserPromptParams{ + InterceptionID: aiBridgeInterception2.ID, + }) + _ = dbgen.AIBridgeToolUsage(t, db, database.InsertAIBridgeToolUsageParams{ + InterceptionID: aiBridgeInterception2.ID, + Injected: false, + }) + _, err = db.UpdateAIBridgeInterceptionEnded(ctx, database.UpdateAIBridgeInterceptionEndedParams{ + ID: aiBridgeInterception2.ID, + EndedAt: aiBridgeInterception2.StartedAt.Add(2 * time.Minute), // 2 minute duration + }) + require.NoError(t, err) + aiBridgeInterception3 := dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{ + InitiatorID: user2.ID, + Provider: "openai", + Model: "gpt-5", + StartedAt: aiBridgeInterception1.StartedAt, + }, nil) + _, err = db.UpdateAIBridgeInterceptionEnded(ctx, database.UpdateAIBridgeInterceptionEndedParams{ + ID: aiBridgeInterception3.ID, + EndedAt: aiBridgeInterception3.StartedAt.Add(3 * time.Minute), // 3 minute duration + }) + require.NoError(t, err) + _ = dbgen.AIBridgeInterception(t, db, database.InsertAIBridgeInterceptionParams{ + InitiatorID: user2.ID, + Provider: "openai", + Model: "gpt-5", + StartedAt: aiBridgeInterception1.StartedAt, + }, nil) + // not ended, so it should not affect summaries + + clock := quartz.NewMock(t) + clock.Set(now) + + _, snapshot := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options { + opts.Clock = clock + return opts + }) require.Len(t, snapshot.ProvisionerJobs, 2) require.Len(t, snapshot.Licenses, 1) require.Len(t, snapshot.Templates, 2) require.Len(t, snapshot.TemplateVersions, 3) - require.Len(t, snapshot.Users, 1) + require.Len(t, snapshot.Users, 2) require.Len(t, snapshot.Groups, 2) // 1 member in the everyone group + 1 member in the custom group require.Len(t, snapshot.GroupMembers, 2) @@ -287,6 +365,53 @@ func TestTelemetry(t *testing.T) { for _, entity := range snapshot.Templates { require.Equal(t, entity.OrganizationID, org.ID) } + + // 2 unique provider + model + client combinations + require.Len(t, snapshot.AIBridgeInterceptionsSummaries, 2) + snapshot1 := snapshot.AIBridgeInterceptionsSummaries[0] + snapshot2 := snapshot.AIBridgeInterceptionsSummaries[1] + if snapshot1.Provider != aiBridgeInterception1.Provider { + snapshot1, snapshot2 = snapshot2, snapshot1 + } + + require.Equal(t, snapshot1.Provider, aiBridgeInterception1.Provider) + require.Equal(t, snapshot1.Model, aiBridgeInterception1.Model) + require.Equal(t, snapshot1.Client, "unknown") // no client info yet + require.EqualValues(t, snapshot1.InterceptionCount, 2) + require.EqualValues(t, snapshot1.InterceptionsByRoute, map[string]int64{}) // no route info yet + require.EqualValues(t, snapshot1.InterceptionDurationMillis.P50, 90_000) + require.EqualValues(t, snapshot1.InterceptionDurationMillis.P90, 114_000) + require.EqualValues(t, snapshot1.InterceptionDurationMillis.P95, 117_000) + require.EqualValues(t, snapshot1.InterceptionDurationMillis.P99, 119_400) + require.EqualValues(t, snapshot1.UniqueInitiatorCount, 2) + require.EqualValues(t, snapshot1.UserPromptsCount, 2) + require.EqualValues(t, snapshot1.TokenUsagesCount, 2) + require.EqualValues(t, snapshot1.TokenCount.Input, 200) + require.EqualValues(t, snapshot1.TokenCount.Output, 400) + require.EqualValues(t, snapshot1.TokenCount.CachedRead, 600) + require.EqualValues(t, snapshot1.TokenCount.CachedWritten, 800) + require.EqualValues(t, snapshot1.ToolCallsCount.Injected, 1) + require.EqualValues(t, snapshot1.ToolCallsCount.NonInjected, 1) + require.EqualValues(t, snapshot1.InjectedToolCallErrorCount, 1) + + require.Equal(t, snapshot2.Provider, aiBridgeInterception3.Provider) + require.Equal(t, snapshot2.Model, aiBridgeInterception3.Model) + require.Equal(t, snapshot2.Client, "unknown") // no client info yet + require.EqualValues(t, snapshot2.InterceptionCount, 1) + require.EqualValues(t, snapshot2.InterceptionsByRoute, map[string]int64{}) // no route info yet + require.EqualValues(t, snapshot2.InterceptionDurationMillis.P50, 180_000) + require.EqualValues(t, snapshot2.InterceptionDurationMillis.P90, 180_000) + require.EqualValues(t, snapshot2.InterceptionDurationMillis.P95, 180_000) + require.EqualValues(t, snapshot2.InterceptionDurationMillis.P99, 180_000) + require.EqualValues(t, snapshot2.UniqueInitiatorCount, 1) + require.EqualValues(t, snapshot2.UserPromptsCount, 0) + require.EqualValues(t, snapshot2.TokenUsagesCount, 0) + require.EqualValues(t, snapshot2.TokenCount.Input, 0) + require.EqualValues(t, snapshot2.TokenCount.Output, 0) + require.EqualValues(t, snapshot2.TokenCount.CachedRead, 0) + require.EqualValues(t, snapshot2.TokenCount.CachedWritten, 0) + require.EqualValues(t, snapshot2.ToolCallsCount.Injected, 0) + require.EqualValues(t, snapshot2.ToolCallsCount.NonInjected, 0) }) t.Run("HashedEmail", func(t *testing.T) { t.Parallel()