From 2204731ddbee82f46d5a73ee7c7c54af586ed50b Mon Sep 17 00:00:00 2001 From: Zach <3724288+zedkipp@users.noreply.github.com> Date: Tue, 27 Jan 2026 19:11:40 -0700 Subject: [PATCH] feat: implement boundary usage tracker and telemetry collection (#21716) Implements telemetry for boundary usage tracking across all Coder replicas and reports them via telemetry. Changes: - Implement Tracker with Track(), FlushToDB(), and StartFlushLoop() methods - Add telemetry integration via collectBoundaryUsageSummary() - Use telemetry lock to ensure only one replica collects per period The tracker accumulates unique workspaces, unique users, and request counts (allowed/denied) in memory, then flushes to the database periodically. During telemetry collection, stats are aggregated across all replicas and reset for the next period. --- coderd/agentapi/api.go | 12 +- coderd/agentapi/boundary_logs.go | 23 +- coderd/boundaryusage/tracker.go | 89 ++++- coderd/boundaryusage/tracker_test.go | 542 +++++++++++++++++++++++++++ coderd/coderd.go | 3 + coderd/telemetry/telemetry.go | 67 ++++ coderd/telemetry/telemetry_test.go | 127 +++++++ coderd/workspaceagentsrpc.go | 1 + enterprise/coderd/coderd.go | 6 + 9 files changed, 850 insertions(+), 20 deletions(-) create mode 100644 coderd/boundaryusage/tracker_test.go diff --git a/coderd/agentapi/api.go b/coderd/agentapi/api.go index 6907dcad75..9dc3811796 100644 --- a/coderd/agentapi/api.go +++ b/coderd/agentapi/api.go @@ -20,6 +20,7 @@ import ( "github.com/coder/coder/v2/coderd/agentapi/metadatabatcher" "github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor" "github.com/coder/coder/v2/coderd/appearance" + "github.com/coder/coder/v2/coderd/boundaryusage" "github.com/coder/coder/v2/coderd/connectionlog" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/pubsub" @@ -87,6 +88,7 @@ type Options struct { PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent) PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage) NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent) + BoundaryUsageTracker *boundaryusage.Tracker AccessURL *url.URL AppHostname string @@ -224,10 +226,12 @@ func New(opts Options, workspace database.Workspace) *API { } api.BoundaryLogsAPI = &BoundaryLogsAPI{ - Log: opts.Log, - WorkspaceID: opts.WorkspaceID, - TemplateID: workspace.TemplateID, - TemplateVersionID: opts.TemplateVersionID, + Log: opts.Log, + WorkspaceID: opts.WorkspaceID, + OwnerID: opts.OwnerID, + TemplateID: workspace.TemplateID, + TemplateVersionID: opts.TemplateVersionID, + BoundaryUsageTracker: opts.BoundaryUsageTracker, } // Start background cache refresh loop to handle workspace changes diff --git a/coderd/agentapi/boundary_logs.go b/coderd/agentapi/boundary_logs.go index 91d4f8227f..207d5590ac 100644 --- a/coderd/agentapi/boundary_logs.go +++ b/coderd/agentapi/boundary_logs.go @@ -8,16 +8,21 @@ import ( "cdr.dev/slog/v3" agentproto "github.com/coder/coder/v2/agent/proto" + "github.com/coder/coder/v2/coderd/boundaryusage" ) type BoundaryLogsAPI struct { - Log slog.Logger - WorkspaceID uuid.UUID - TemplateID uuid.UUID - TemplateVersionID uuid.UUID + Log slog.Logger + WorkspaceID uuid.UUID + OwnerID uuid.UUID + TemplateID uuid.UUID + TemplateVersionID uuid.UUID + BoundaryUsageTracker *boundaryusage.Tracker } func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentproto.ReportBoundaryLogsRequest) (*agentproto.ReportBoundaryLogsResponse, error) { + var allowed, denied int64 + for _, l := range req.Logs { var logTime time.Time if l.Time != nil { @@ -32,6 +37,12 @@ func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentprot continue } + if l.Allowed { + allowed++ + } else { + denied++ + } + fields := []slog.Field{ slog.F("decision", allowBoolToString(l.Allowed)), slog.F("workspace_id", a.WorkspaceID.String()), @@ -52,6 +63,10 @@ func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentprot } } + if a.BoundaryUsageTracker != nil && (allowed > 0 || denied > 0) { + a.BoundaryUsageTracker.Track(a.WorkspaceID, a.OwnerID, allowed, denied) + } + return &agentproto.ReportBoundaryLogsResponse{}, nil } diff --git a/coderd/boundaryusage/tracker.go b/coderd/boundaryusage/tracker.go index 1aebb0f53c..6c91af76c0 100644 --- a/coderd/boundaryusage/tracker.go +++ b/coderd/boundaryusage/tracker.go @@ -3,11 +3,13 @@ package boundaryusage import ( "context" "sync" + "time" "github.com/google/uuid" - "golang.org/x/xerrors" + "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" ) // Tracker tracks boundary usage for telemetry reporting. @@ -15,26 +17,89 @@ import ( // All stats accumulate in memory throughout a telemetry period and are only // reset when a new period begins. type Tracker struct { - mu sync.Mutex //nolint:unused // Will be used when implemented. - workspaces map[uuid.UUID]struct{} //nolint:unused // Will be used when implemented. - users map[uuid.UUID]struct{} //nolint:unused // Will be used when implemented. - allowedRequests int64 //nolint:unused // Will be used when implemented. - deniedRequests int64 //nolint:unused // Will be used when implemented. + mu sync.Mutex + workspaces map[uuid.UUID]struct{} + users map[uuid.UUID]struct{} + allowedRequests int64 + deniedRequests int64 } // NewTracker creates a new boundary usage tracker. -func NewTracker() (*Tracker, error) { - return nil, xerrors.New("not implemented") +func NewTracker() *Tracker { + return &Tracker{ + workspaces: make(map[uuid.UUID]struct{}), + users: make(map[uuid.UUID]struct{}), + } } // Track records boundary usage for a workspace. -func (*Tracker) Track(_, _ uuid.UUID, _, _ int64) error { - return xerrors.New("not implemented") +func (t *Tracker) Track(workspaceID, ownerID uuid.UUID, allowed, denied int64) { + t.mu.Lock() + defer t.mu.Unlock() + + t.workspaces[workspaceID] = struct{}{} + t.users[ownerID] = struct{}{} + t.allowedRequests += allowed + t.deniedRequests += denied } // FlushToDB writes the accumulated stats to the database. All values are // replaced in the database (they represent the current in-memory state). If the // database row was deleted (new telemetry period), all in-memory stats are reset. -func (*Tracker) FlushToDB(_ context.Context, _ database.Store, _ uuid.UUID) error { - return xerrors.New("not implemented") +func (t *Tracker) FlushToDB(ctx context.Context, db database.Store, replicaID uuid.UUID) error { + t.mu.Lock() + workspaceCount := int64(len(t.workspaces)) + userCount := int64(len(t.users)) + allowed := t.allowedRequests + denied := t.deniedRequests + t.mu.Unlock() + + // Don't flush if there's no activity. + if workspaceCount == 0 && userCount == 0 && allowed == 0 && denied == 0 { + return nil + } + + //nolint:gocritic // This is the actual package doing boundary usage tracking. + newPeriod, err := db.UpsertBoundaryUsageStats(dbauthz.AsBoundaryUsageTracker(ctx), database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replicaID, + UniqueWorkspacesCount: workspaceCount, + UniqueUsersCount: userCount, + AllowedRequests: allowed, + DeniedRequests: denied, + }) + if err != nil { + return err + } + + // If this was an insert (new period), reset all stats. Any Track() calls + // that occurred during the DB operation will be counted in the next period. + if newPeriod { + t.mu.Lock() + t.workspaces = make(map[uuid.UUID]struct{}) + t.users = make(map[uuid.UUID]struct{}) + t.allowedRequests = 0 + t.deniedRequests = 0 + t.mu.Unlock() + } + + return nil +} + +// StartFlushLoop begins the periodic flush loop that writes accumulated stats +// to the database. It blocks until the context is canceled. Flushes every +// minute to keep stats reasonably fresh for telemetry collection (which runs +// every 30 minutes by default) without excessive DB writes. +func (t *Tracker) StartFlushLoop(ctx context.Context, log slog.Logger, db database.Store, replicaID uuid.UUID) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := t.FlushToDB(ctx, db, replicaID); err != nil { + log.Warn(ctx, "failed to flush boundary usage stats", slog.Error(err)) + } + } + } } diff --git a/coderd/boundaryusage/tracker_test.go b/coderd/boundaryusage/tracker_test.go new file mode 100644 index 0000000000..31f11a91bc --- /dev/null +++ b/coderd/boundaryusage/tracker_test.go @@ -0,0 +1,542 @@ +package boundaryusage_test + +import ( + "context" + "sync" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/coder/coder/v2/coderd/boundaryusage" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/testutil" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, testutil.GoleakOptions...) +} + +func TestTracker_New(t *testing.T) { + t.Parallel() + + tracker := boundaryusage.NewTracker() + require.NotNil(t, tracker) +} + +func TestTracker_Track_Single(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + + tracker := boundaryusage.NewTracker() + workspaceID := uuid.New() + ownerID := uuid.New() + replicaID := uuid.New() + + tracker.Track(workspaceID, ownerID, 5, 2) + + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + // Verify the data was written correctly. + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(1), summary.UniqueWorkspaces) + require.Equal(t, int64(1), summary.UniqueUsers) + require.Equal(t, int64(5), summary.AllowedRequests) + require.Equal(t, int64(2), summary.DeniedRequests) +} + +func TestTracker_Track_DuplicateWorkspaceUser(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + + tracker := boundaryusage.NewTracker() + workspaceID := uuid.New() + ownerID := uuid.New() + replicaID := uuid.New() + + // Track same workspace/user multiple times. + tracker.Track(workspaceID, ownerID, 3, 1) + tracker.Track(workspaceID, ownerID, 4, 2) + tracker.Track(workspaceID, ownerID, 2, 0) + + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(1), summary.UniqueWorkspaces, "should be 1 unique workspace") + require.Equal(t, int64(1), summary.UniqueUsers, "should be 1 unique user") + require.Equal(t, int64(9), summary.AllowedRequests, "should accumulate: 3+4+2=9") + require.Equal(t, int64(3), summary.DeniedRequests, "should accumulate: 1+2+0=3") +} + +func TestTracker_Track_MultipleWorkspacesUsers(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + + tracker := boundaryusage.NewTracker() + replicaID := uuid.New() + + // Track 3 different workspaces with 2 different users. + workspace1, workspace2, workspace3 := uuid.New(), uuid.New(), uuid.New() + user1, user2 := uuid.New(), uuid.New() + + tracker.Track(workspace1, user1, 1, 0) + tracker.Track(workspace2, user1, 2, 1) + tracker.Track(workspace3, user2, 3, 2) + + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(3), summary.UniqueWorkspaces) + require.Equal(t, int64(2), summary.UniqueUsers) + require.Equal(t, int64(6), summary.AllowedRequests) + require.Equal(t, int64(3), summary.DeniedRequests) +} + +func TestTracker_Track_Concurrent(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + + tracker := boundaryusage.NewTracker() + replicaID := uuid.New() + + const numGoroutines = 100 + const requestsPerGoroutine = 10 + + var wg sync.WaitGroup + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + workspaceID := uuid.New() + ownerID := uuid.New() + for j := 0; j < requestsPerGoroutine; j++ { + tracker.Track(workspaceID, ownerID, 1, 1) + } + }() + } + wg.Wait() + + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(numGoroutines), summary.UniqueWorkspaces) + require.Equal(t, int64(numGoroutines), summary.UniqueUsers) + require.Equal(t, int64(numGoroutines*requestsPerGoroutine), summary.AllowedRequests) + require.Equal(t, int64(numGoroutines*requestsPerGoroutine), summary.DeniedRequests) +} + +func TestTracker_FlushToDB_Accumulates(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + + tracker := boundaryusage.NewTracker() + replicaID := uuid.New() + workspaceID := uuid.New() + ownerID := uuid.New() + + tracker.Track(workspaceID, ownerID, 5, 3) + + // First flush is an insert, which resets in-memory stats. + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + // Track more data after the reset. + tracker.Track(workspaceID, ownerID, 2, 1) + + // Second flush is an update so stats should accumulate. + err = tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + // Track even more data. + tracker.Track(workspaceID, ownerID, 3, 2) + + // Third flush stats should continue accumulating. + err = tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(1), summary.UniqueWorkspaces) + require.Equal(t, int64(1), summary.UniqueUsers) + require.Equal(t, int64(5), summary.AllowedRequests, "should accumulate after first reset: 2+3=5") + require.Equal(t, int64(3), summary.DeniedRequests, "should accumulate after first reset: 1+2=3") +} + +func TestTracker_FlushToDB_NewPeriod(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + + tracker := boundaryusage.NewTracker() + replicaID := uuid.New() + workspaceID := uuid.New() + ownerID := uuid.New() + + tracker.Track(workspaceID, ownerID, 10, 5) + + // First flush. + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + // Simulate telemetry reset (new period). + err = db.ResetBoundaryUsageStats(boundaryCtx) + require.NoError(t, err) + + // Track new data. + workspace2 := uuid.New() + owner2 := uuid.New() + tracker.Track(workspace2, owner2, 3, 1) + + // Flushing again should detect new period and reset in-memory stats. + err = tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + // The summary should only contain the new data after reset. + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(1), summary.UniqueWorkspaces, "should only count new workspace") + require.Equal(t, int64(1), summary.UniqueUsers, "should only count new user") + require.Equal(t, int64(3), summary.AllowedRequests, "should only count new requests") + require.Equal(t, int64(1), summary.DeniedRequests, "should only count new requests") +} + +func TestTracker_FlushToDB_NoActivity(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + + tracker := boundaryusage.NewTracker() + replicaID := uuid.New() + + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + // Verify nothing was written to DB. + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(0), summary.UniqueWorkspaces) + require.Equal(t, int64(0), summary.AllowedRequests) +} + +func TestUpsertBoundaryUsageStats_Insert(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := dbauthz.AsBoundaryUsageTracker(context.Background()) + + replicaID := uuid.New() + + newPeriod, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replicaID, + UniqueWorkspacesCount: 5, + UniqueUsersCount: 3, + AllowedRequests: 100, + DeniedRequests: 10, + }) + require.NoError(t, err) + require.True(t, newPeriod, "should return true for insert") +} + +func TestUpsertBoundaryUsageStats_Update(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := dbauthz.AsBoundaryUsageTracker(context.Background()) + + replicaID := uuid.New() + + // First insert. + _, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replicaID, + UniqueWorkspacesCount: 5, + UniqueUsersCount: 3, + AllowedRequests: 100, + DeniedRequests: 10, + }) + require.NoError(t, err) + + // Second upsert (update). + newPeriod, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replicaID, + UniqueWorkspacesCount: 8, + UniqueUsersCount: 5, + AllowedRequests: 200, + DeniedRequests: 20, + }) + require.NoError(t, err) + require.False(t, newPeriod, "should return false for update") + + // Verify the update took effect. + summary, err := db.GetBoundaryUsageSummary(ctx, 60000) + require.NoError(t, err) + require.Equal(t, int64(8), summary.UniqueWorkspaces) + require.Equal(t, int64(5), summary.UniqueUsers) + require.Equal(t, int64(200), summary.AllowedRequests) + require.Equal(t, int64(20), summary.DeniedRequests) +} + +func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := dbauthz.AsBoundaryUsageTracker(context.Background()) + + replica1 := uuid.New() + replica2 := uuid.New() + replica3 := uuid.New() + + // Insert stats for 3 replicas. + _, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replica1, + UniqueWorkspacesCount: 10, + UniqueUsersCount: 5, + AllowedRequests: 100, + DeniedRequests: 10, + }) + require.NoError(t, err) + + _, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replica2, + UniqueWorkspacesCount: 15, + UniqueUsersCount: 8, + AllowedRequests: 150, + DeniedRequests: 15, + }) + require.NoError(t, err) + + _, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replica3, + UniqueWorkspacesCount: 20, + UniqueUsersCount: 12, + AllowedRequests: 200, + DeniedRequests: 20, + }) + require.NoError(t, err) + + summary, err := db.GetBoundaryUsageSummary(ctx, 60000) + require.NoError(t, err) + + // Verify aggregation (SUM of all replicas). + require.Equal(t, int64(45), summary.UniqueWorkspaces) // 10 + 15 + 20 + require.Equal(t, int64(25), summary.UniqueUsers) // 5 + 8 + 12 + require.Equal(t, int64(450), summary.AllowedRequests) // 100 + 150 + 200 + require.Equal(t, int64(45), summary.DeniedRequests) // 10 + 15 + 20 +} + +func TestGetBoundaryUsageSummary_Empty(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := dbauthz.AsBoundaryUsageTracker(context.Background()) + + summary, err := db.GetBoundaryUsageSummary(ctx, 60000) + require.NoError(t, err) + + // COALESCE should return 0 for all columns. + require.Equal(t, int64(0), summary.UniqueWorkspaces) + require.Equal(t, int64(0), summary.UniqueUsers) + require.Equal(t, int64(0), summary.AllowedRequests) + require.Equal(t, int64(0), summary.DeniedRequests) +} + +func TestResetBoundaryUsageStats(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := dbauthz.AsBoundaryUsageTracker(context.Background()) + + // Insert stats for multiple replicas. + for i := 0; i < 5; i++ { + _, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: uuid.New(), + UniqueWorkspacesCount: int64(i + 1), + UniqueUsersCount: int64(i + 1), + AllowedRequests: int64((i + 1) * 10), + DeniedRequests: int64(i + 1), + }) + require.NoError(t, err) + } + + // Verify data exists. + summary, err := db.GetBoundaryUsageSummary(ctx, 60000) + require.NoError(t, err) + require.Greater(t, summary.AllowedRequests, int64(0)) + + // Reset. + err = db.ResetBoundaryUsageStats(ctx) + require.NoError(t, err) + + // Verify all data is gone. + summary, err = db.GetBoundaryUsageSummary(ctx, 60000) + require.NoError(t, err) + require.Equal(t, int64(0), summary.UniqueWorkspaces) + require.Equal(t, int64(0), summary.AllowedRequests) +} + +func TestDeleteBoundaryUsageStatsByReplicaID(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := dbauthz.AsBoundaryUsageTracker(context.Background()) + + replica1 := uuid.New() + replica2 := uuid.New() + + // Insert stats for 2 replicas. + _, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replica1, + UniqueWorkspacesCount: 10, + UniqueUsersCount: 5, + AllowedRequests: 100, + DeniedRequests: 10, + }) + require.NoError(t, err) + + _, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{ + ReplicaID: replica2, + UniqueWorkspacesCount: 20, + UniqueUsersCount: 10, + AllowedRequests: 200, + DeniedRequests: 20, + }) + require.NoError(t, err) + + // Delete replica1's stats. + err = db.DeleteBoundaryUsageStatsByReplicaID(ctx, replica1) + require.NoError(t, err) + + // Verify only replica2's stats remain. + summary, err := db.GetBoundaryUsageSummary(ctx, 60000) + require.NoError(t, err) + require.Equal(t, int64(20), summary.UniqueWorkspaces) + require.Equal(t, int64(200), summary.AllowedRequests) +} + +func TestTracker_TelemetryCycle(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitShort) + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + + // Simulate 3 replicas. + tracker1 := boundaryusage.NewTracker() + tracker2 := boundaryusage.NewTracker() + tracker3 := boundaryusage.NewTracker() + + replica1 := uuid.New() + replica2 := uuid.New() + replica3 := uuid.New() + + // Each tracker records different workspaces/users. + tracker1.Track(uuid.New(), uuid.New(), 10, 1) + tracker1.Track(uuid.New(), uuid.New(), 15, 2) + + tracker2.Track(uuid.New(), uuid.New(), 20, 3) + tracker2.Track(uuid.New(), uuid.New(), 25, 4) + tracker2.Track(uuid.New(), uuid.New(), 30, 5) + + tracker3.Track(uuid.New(), uuid.New(), 5, 0) + + // All replicas flush to database. + require.NoError(t, tracker1.FlushToDB(ctx, db, replica1)) + require.NoError(t, tracker2.FlushToDB(ctx, db, replica2)) + require.NoError(t, tracker3.FlushToDB(ctx, db, replica3)) + + // Telemetry aggregates. + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + + // Verify aggregation. + require.Equal(t, int64(6), summary.UniqueWorkspaces) // 2 + 3 + 1 + require.Equal(t, int64(6), summary.UniqueUsers) // 2 + 3 + 1 + require.Equal(t, int64(105), summary.AllowedRequests) // 25 + 75 + 5 + require.Equal(t, int64(15), summary.DeniedRequests) // 3 + 12 + 0 + + // Telemetry resets stats (simulating telemetry report sent). + require.NoError(t, db.ResetBoundaryUsageStats(boundaryCtx)) + + // Next flush from trackers should detect new period. + tracker1.Track(uuid.New(), uuid.New(), 1, 0) + require.NoError(t, tracker1.FlushToDB(ctx, db, replica1)) + + // Verify trackers reset their in-memory state. + summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.Equal(t, int64(1), summary.UniqueWorkspaces) + require.Equal(t, int64(1), summary.AllowedRequests) +} + +func TestTracker_ConcurrentFlushAndTrack(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitMedium) + + tracker := boundaryusage.NewTracker() + replicaID := uuid.New() + + const numOperations = 50 + + var wg sync.WaitGroup + + // Goroutine 1: Continuously track. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < numOperations; i++ { + tracker.Track(uuid.New(), uuid.New(), 1, 1) + } + }() + + // Goroutine 2: Continuously flush. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < numOperations; i++ { + _ = tracker.FlushToDB(ctx, db, replicaID) + } + }() + + wg.Wait() + + // Final flush to capture any remaining data. + require.NoError(t, tracker.FlushToDB(ctx, db, replicaID)) + + // Verify stats are non-negative. + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000) + require.NoError(t, err) + require.GreaterOrEqual(t, summary.AllowedRequests, int64(0)) + require.GreaterOrEqual(t, summary.DeniedRequests, int64(0)) +} diff --git a/coderd/coderd.go b/coderd/coderd.go index eeda351b52..2efdbd361f 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -49,6 +49,7 @@ import ( "github.com/coder/coder/v2/coderd/appearance" "github.com/coder/coder/v2/coderd/audit" "github.com/coder/coder/v2/coderd/awsidentity" + "github.com/coder/coder/v2/coderd/boundaryusage" "github.com/coder/coder/v2/coderd/connectionlog" "github.com/coder/coder/v2/coderd/cryptokeys" "github.com/coder/coder/v2/coderd/database" @@ -266,6 +267,8 @@ type Options struct { DatabaseRolluper *dbrollup.Rolluper // WorkspaceUsageTracker tracks workspace usage by the CLI. WorkspaceUsageTracker *workspacestats.UsageTracker + // BoundaryUsageTracker tracks boundary usage for telemetry. + BoundaryUsageTracker *boundaryusage.Tracker // NotificationsEnqueuer handles enqueueing notifications for delivery by SMTP, webhook, etc. NotificationsEnqueuer notifications.Enqueuer diff --git a/coderd/telemetry/telemetry.go b/coderd/telemetry/telemetry.go index b3df9d1ac0..bb9008b744 100644 --- a/coderd/telemetry/telemetry.go +++ b/coderd/telemetry/telemetry.go @@ -31,6 +31,7 @@ import ( "github.com/coder/coder/v2/buildinfo" clitelemetry "github.com/coder/coder/v2/cli/telemetry" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/codersdk" @@ -759,6 +760,17 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) { snapshot.AIBridgeInterceptionsSummaries = summaries return nil }) + eg.Go(func() error { + summary, err := r.collectBoundaryUsageSummary(ctx) + if err != nil { + return xerrors.Errorf("collect boundary usage summary: %w", err) + } + // Only send a summary if there was actual usage. + if summary != nil && summary.UniqueUsers > 0 { + snapshot.BoundaryUsageSummary = summary + } + return nil + }) err := eg.Wait() if err != nil { @@ -837,6 +849,51 @@ func (r *remoteReporter) generateAIBridgeInterceptionsSummaries(ctx context.Cont return summaries, eg.Wait() } +// collectBoundaryUsageSummary collects boundary usage statistics from all +// replicas and resets the stats for the next telemetry period. Returns nil if +// another replica has already collected for this period. +func (r *remoteReporter) collectBoundaryUsageSummary(ctx context.Context) (*BoundaryUsageSummary, error) { + // Use twice the snapshot frequency as the staleness limit to ensure we + // capture data from replicas that may have slightly different flush times. + maxStaleness := r.options.SnapshotFrequency * 2 + //nolint:gocritic // This is the actual collection of boundary usage tracking. + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + + // Claim the telemetry lock for this period. Use snapshot frequency so each + // telemetry snapshot period gets exactly one collection. + now := dbtime.Time(r.options.Clock.Now()).UTC() + periodEndingAt := now.Truncate(r.options.SnapshotFrequency) + err := r.options.Database.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{ + EventType: "boundary_usage_summary", + PeriodEndingAt: periodEndingAt, + }) + if database.IsUniqueViolation(err, database.UniqueTelemetryLocksPkey) { + r.options.Logger.Debug(ctx, "boundary usage telemetry lock already claimed by another replica, skipping", slog.F("period_ending_at", periodEndingAt)) + return nil, nil //nolint:nilnil // This is simple to handle when dealing with telemetry. + } + if err != nil { + return nil, xerrors.Errorf("insert boundary usage telemetry lock (period_ending_at=%q): %w", periodEndingAt, err) + } + + summary, err := r.options.Database.GetBoundaryUsageSummary(boundaryCtx, maxStaleness.Milliseconds()) + if err != nil { + return nil, xerrors.Errorf("get boundary usage summary: %w", err) + } + + // Reset stats after capturing the summary. This deletes all rows so each + // replica will detect a new period on their next flush. + if err := r.options.Database.ResetBoundaryUsageStats(boundaryCtx); err != nil { + return nil, xerrors.Errorf("reset boundary usage stats: %w", err) + } + + return &BoundaryUsageSummary{ + UniqueWorkspaces: summary.UniqueWorkspaces, + UniqueUsers: summary.UniqueUsers, + AllowedRequests: summary.AllowedRequests, + DeniedRequests: summary.DeniedRequests, + }, nil +} + // ConvertAPIKey anonymizes an API key. func ConvertAPIKey(apiKey database.APIKey) APIKey { a := APIKey{ @@ -1309,6 +1366,7 @@ type Snapshot struct { UserTailnetConnections []UserTailnetConnection `json:"user_tailnet_connections"` PrebuiltWorkspaces []PrebuiltWorkspace `json:"prebuilt_workspaces"` AIBridgeInterceptionsSummaries []AIBridgeInterceptionsSummary `json:"aibridge_interceptions_summaries"` + BoundaryUsageSummary *BoundaryUsageSummary `json:"boundary_usage_summary"` } // Deployment contains information about the host running Coder. @@ -1995,6 +2053,15 @@ type AIBridgeInterceptionsSummary struct { InjectedToolCallErrorCount int64 `json:"injected_tool_call_error_count"` } +// BoundaryUsageSummary contains aggregated boundary usage statistics across all +// replicas for the telemetry period. +type BoundaryUsageSummary struct { + UniqueWorkspaces int64 `json:"unique_workspaces"` + UniqueUsers int64 `json:"unique_users"` + AllowedRequests int64 `json:"allowed_requests"` + DeniedRequests int64 `json:"denied_requests"` +} + func ConvertAIBridgeInterceptionsSummary(endTime time.Time, provider, model, client string, summary database.CalculateAIBridgeInterceptionsTelemetrySummaryRow) AIBridgeInterceptionsSummary { return AIBridgeInterceptionsSummary{ ID: uuid.New(), diff --git a/coderd/telemetry/telemetry_test.go b/coderd/telemetry/telemetry_test.go index a818b66db2..f28836d89d 100644 --- a/coderd/telemetry/telemetry_test.go +++ b/coderd/telemetry/telemetry_test.go @@ -19,7 +19,9 @@ import ( "go.uber.org/goleak" "github.com/coder/coder/v2/buildinfo" + "github.com/coder/coder/v2/coderd/boundaryusage" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" @@ -841,3 +843,128 @@ func collectSnapshot( return testutil.RequireReceive(ctx, t, deployment), testutil.RequireReceive(ctx, t, snapshot) } + +func TestTelemetry_BoundaryUsageSummary(t *testing.T) { + t.Parallel() + + t.Run("IncludedInSnapshot", func(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitMedium) + + tracker := boundaryusage.NewTracker() + workspace1, workspace2 := uuid.New(), uuid.New() + user1, user2 := uuid.New(), uuid.New() + replicaID := uuid.New() + + tracker.Track(workspace1, user1, 10, 2) + tracker.Track(workspace2, user1, 5, 1) + tracker.Track(workspace2, user2, 3, 0) + + // Flush the tracker to the database. + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + // Collect a snapshot and verify boundary usage is included. + clock := quartz.NewMock(t) + clock.Set(dbtime.Now()) + + _, snapshot := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options { + opts.Clock = clock + return opts + }) + + require.NotNil(t, snapshot.BoundaryUsageSummary) + require.Equal(t, int64(2), snapshot.BoundaryUsageSummary.UniqueWorkspaces) + require.Equal(t, int64(2), snapshot.BoundaryUsageSummary.UniqueUsers) + require.Equal(t, int64(10+5+3), snapshot.BoundaryUsageSummary.AllowedRequests) + require.Equal(t, int64(2+1+0), snapshot.BoundaryUsageSummary.DeniedRequests) + }) + + t.Run("ResetAfterCollection", func(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitMedium) + + tracker := boundaryusage.NewTracker() + replicaID := uuid.New() + + tracker.Track(uuid.New(), uuid.New(), 5, 1) + err := tracker.FlushToDB(ctx, db, replicaID) + require.NoError(t, err) + + clock := quartz.NewMock(t) + clock.Set(dbtime.Now()) + + // First snapshot should have the data. + _, snapshot1 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options { + opts.Clock = clock + return opts + }) + require.NotNil(t, snapshot1.BoundaryUsageSummary) + require.Equal(t, int64(5), snapshot1.BoundaryUsageSummary.AllowedRequests) + + // Advance clock to next snapshot period to avoid lock conflict. + clock.Advance(30 * time.Minute) + + // Second snapshot should have no data (stats were reset). + _, snapshot2 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options { + opts.Clock = clock + return opts + }) + // Summary should be nil or have zero values since stats were reset. + if snapshot2.BoundaryUsageSummary != nil { + require.Equal(t, int64(0), snapshot2.BoundaryUsageSummary.AllowedRequests) + } + }) + + t.Run("OnlyOneReplicaCollects", func(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + ctx := testutil.Context(t, testutil.WaitMedium) + + // Set up boundary usage stats from two replicas. + tracker1 := boundaryusage.NewTracker() + tracker2 := boundaryusage.NewTracker() + replica1ID := uuid.New() + replica2ID := uuid.New() + + tracker1.Track(uuid.New(), uuid.New(), 10, 1) + tracker2.Track(uuid.New(), uuid.New(), 20, 2) + + err := tracker1.FlushToDB(ctx, db, replica1ID) + require.NoError(t, err) + err = tracker2.FlushToDB(ctx, db, replica2ID) + require.NoError(t, err) + + // Verify both replicas' data is in the database. + boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx) + const maxStalenessMs = 60000 + summary, err := db.GetBoundaryUsageSummary(boundaryCtx, maxStalenessMs) + require.NoError(t, err) + require.Equal(t, int64(10+20), summary.AllowedRequests) + + clock := quartz.NewMock(t) + clock.Set(dbtime.Now()) + + // First snapshot collects and resets. + _, snapshot1 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options { + opts.Clock = clock + return opts + }) + require.NotNil(t, snapshot1.BoundaryUsageSummary) + require.Equal(t, int64(10+20), snapshot1.BoundaryUsageSummary.AllowedRequests) + + // Second snapshot in same period should skip (lock already claimed). + _, snapshot2 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options { + opts.Clock = clock + return opts + }) + // The second snapshot should have nil because another "replica" already + // claimed the lock for this period. + require.Nil(t, snapshot2.BoundaryUsageSummary) + }) +} diff --git a/coderd/workspaceagentsrpc.go b/coderd/workspaceagentsrpc.go index b4e9cc7650..ae8682cc59 100644 --- a/coderd/workspaceagentsrpc.go +++ b/coderd/workspaceagentsrpc.go @@ -148,6 +148,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) { PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate, PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate, NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler, + BoundaryUsageTracker: api.BoundaryUsageTracker, AccessURL: api.AccessURL, AppHostname: api.AppHostname, diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index 9ad5369666..83c2402451 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -24,6 +24,7 @@ import ( "github.com/coder/coder/v2/coderd" "github.com/coder/coder/v2/coderd/appearance" agplaudit "github.com/coder/coder/v2/coderd/audit" + "github.com/coder/coder/v2/coderd/boundaryusage" agplconnectionlog "github.com/coder/coder/v2/coderd/connectionlog" "github.com/coder/coder/v2/coderd/database" agpldbauthz "github.com/coder/coder/v2/coderd/database/dbauthz" @@ -645,6 +646,11 @@ func New(ctx context.Context, options *Options) (_ *API, err error) { } go api.runEntitlementsLoop(ctx) + api.BoundaryUsageTracker = boundaryusage.NewTracker() + // If there is no boundary usage nothing gets written to the database and + // nothing gets reported in telemetry, so we launch this unconditionally. + go api.BoundaryUsageTracker.StartFlushLoop(ctx, options.Logger.Named("boundary_usage_tracker"), options.Database, api.AGPL.ID) + return api, nil }