feat: implement boundary usage tracker and telemetry collection (#21716)

Implements telemetry for boundary usage tracking across all Coder
replicas and reports them via telemetry.

Changes:
- Implement Tracker with Track(), FlushToDB(), and StartFlushLoop() methods
- Add telemetry integration via collectBoundaryUsageSummary()
- Use telemetry lock to ensure only one replica collects per period

The tracker accumulates unique workspaces, unique users, and request
counts (allowed/denied) in memory, then flushes to the database
periodically. During telemetry collection, stats are aggregated across
all replicas and reset for the next period.
This commit is contained in:
Zach
2026-01-27 19:11:40 -07:00
committed by GitHub
parent d7037280da
commit 2204731ddb
9 changed files with 850 additions and 20 deletions
+8 -4
View File
@@ -20,6 +20,7 @@ import (
"github.com/coder/coder/v2/coderd/agentapi/metadatabatcher"
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
"github.com/coder/coder/v2/coderd/appearance"
"github.com/coder/coder/v2/coderd/boundaryusage"
"github.com/coder/coder/v2/coderd/connectionlog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/pubsub"
@@ -87,6 +88,7 @@ type Options struct {
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
BoundaryUsageTracker *boundaryusage.Tracker
AccessURL *url.URL
AppHostname string
@@ -224,10 +226,12 @@ func New(opts Options, workspace database.Workspace) *API {
}
api.BoundaryLogsAPI = &BoundaryLogsAPI{
Log: opts.Log,
WorkspaceID: opts.WorkspaceID,
TemplateID: workspace.TemplateID,
TemplateVersionID: opts.TemplateVersionID,
Log: opts.Log,
WorkspaceID: opts.WorkspaceID,
OwnerID: opts.OwnerID,
TemplateID: workspace.TemplateID,
TemplateVersionID: opts.TemplateVersionID,
BoundaryUsageTracker: opts.BoundaryUsageTracker,
}
// Start background cache refresh loop to handle workspace changes
+19 -4
View File
@@ -8,16 +8,21 @@ import (
"cdr.dev/slog/v3"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/boundaryusage"
)
type BoundaryLogsAPI struct {
Log slog.Logger
WorkspaceID uuid.UUID
TemplateID uuid.UUID
TemplateVersionID uuid.UUID
Log slog.Logger
WorkspaceID uuid.UUID
OwnerID uuid.UUID
TemplateID uuid.UUID
TemplateVersionID uuid.UUID
BoundaryUsageTracker *boundaryusage.Tracker
}
func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentproto.ReportBoundaryLogsRequest) (*agentproto.ReportBoundaryLogsResponse, error) {
var allowed, denied int64
for _, l := range req.Logs {
var logTime time.Time
if l.Time != nil {
@@ -32,6 +37,12 @@ func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentprot
continue
}
if l.Allowed {
allowed++
} else {
denied++
}
fields := []slog.Field{
slog.F("decision", allowBoolToString(l.Allowed)),
slog.F("workspace_id", a.WorkspaceID.String()),
@@ -52,6 +63,10 @@ func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentprot
}
}
if a.BoundaryUsageTracker != nil && (allowed > 0 || denied > 0) {
a.BoundaryUsageTracker.Track(a.WorkspaceID, a.OwnerID, allowed, denied)
}
return &agentproto.ReportBoundaryLogsResponse{}, nil
}
+77 -12
View File
@@ -3,11 +3,13 @@ package boundaryusage
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
)
// Tracker tracks boundary usage for telemetry reporting.
@@ -15,26 +17,89 @@ import (
// All stats accumulate in memory throughout a telemetry period and are only
// reset when a new period begins.
type Tracker struct {
mu sync.Mutex //nolint:unused // Will be used when implemented.
workspaces map[uuid.UUID]struct{} //nolint:unused // Will be used when implemented.
users map[uuid.UUID]struct{} //nolint:unused // Will be used when implemented.
allowedRequests int64 //nolint:unused // Will be used when implemented.
deniedRequests int64 //nolint:unused // Will be used when implemented.
mu sync.Mutex
workspaces map[uuid.UUID]struct{}
users map[uuid.UUID]struct{}
allowedRequests int64
deniedRequests int64
}
// NewTracker creates a new boundary usage tracker.
func NewTracker() (*Tracker, error) {
return nil, xerrors.New("not implemented")
func NewTracker() *Tracker {
return &Tracker{
workspaces: make(map[uuid.UUID]struct{}),
users: make(map[uuid.UUID]struct{}),
}
}
// Track records boundary usage for a workspace.
func (*Tracker) Track(_, _ uuid.UUID, _, _ int64) error {
return xerrors.New("not implemented")
func (t *Tracker) Track(workspaceID, ownerID uuid.UUID, allowed, denied int64) {
t.mu.Lock()
defer t.mu.Unlock()
t.workspaces[workspaceID] = struct{}{}
t.users[ownerID] = struct{}{}
t.allowedRequests += allowed
t.deniedRequests += denied
}
// FlushToDB writes the accumulated stats to the database. All values are
// replaced in the database (they represent the current in-memory state). If the
// database row was deleted (new telemetry period), all in-memory stats are reset.
func (*Tracker) FlushToDB(_ context.Context, _ database.Store, _ uuid.UUID) error {
return xerrors.New("not implemented")
func (t *Tracker) FlushToDB(ctx context.Context, db database.Store, replicaID uuid.UUID) error {
t.mu.Lock()
workspaceCount := int64(len(t.workspaces))
userCount := int64(len(t.users))
allowed := t.allowedRequests
denied := t.deniedRequests
t.mu.Unlock()
// Don't flush if there's no activity.
if workspaceCount == 0 && userCount == 0 && allowed == 0 && denied == 0 {
return nil
}
//nolint:gocritic // This is the actual package doing boundary usage tracking.
newPeriod, err := db.UpsertBoundaryUsageStats(dbauthz.AsBoundaryUsageTracker(ctx), database.UpsertBoundaryUsageStatsParams{
ReplicaID: replicaID,
UniqueWorkspacesCount: workspaceCount,
UniqueUsersCount: userCount,
AllowedRequests: allowed,
DeniedRequests: denied,
})
if err != nil {
return err
}
// If this was an insert (new period), reset all stats. Any Track() calls
// that occurred during the DB operation will be counted in the next period.
if newPeriod {
t.mu.Lock()
t.workspaces = make(map[uuid.UUID]struct{})
t.users = make(map[uuid.UUID]struct{})
t.allowedRequests = 0
t.deniedRequests = 0
t.mu.Unlock()
}
return nil
}
// StartFlushLoop begins the periodic flush loop that writes accumulated stats
// to the database. It blocks until the context is canceled. Flushes every
// minute to keep stats reasonably fresh for telemetry collection (which runs
// every 30 minutes by default) without excessive DB writes.
func (t *Tracker) StartFlushLoop(ctx context.Context, log slog.Logger, db database.Store, replicaID uuid.UUID) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := t.FlushToDB(ctx, db, replicaID); err != nil {
log.Warn(ctx, "failed to flush boundary usage stats", slog.Error(err))
}
}
}
}
+542
View File
@@ -0,0 +1,542 @@
package boundaryusage_test
import (
"context"
"sync"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/coder/coder/v2/coderd/boundaryusage"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/testutil"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.GoleakOptions...)
}
func TestTracker_New(t *testing.T) {
t.Parallel()
tracker := boundaryusage.NewTracker()
require.NotNil(t, tracker)
}
func TestTracker_Track_Single(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
tracker := boundaryusage.NewTracker()
workspaceID := uuid.New()
ownerID := uuid.New()
replicaID := uuid.New()
tracker.Track(workspaceID, ownerID, 5, 2)
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// Verify the data was written correctly.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces)
require.Equal(t, int64(1), summary.UniqueUsers)
require.Equal(t, int64(5), summary.AllowedRequests)
require.Equal(t, int64(2), summary.DeniedRequests)
}
func TestTracker_Track_DuplicateWorkspaceUser(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
tracker := boundaryusage.NewTracker()
workspaceID := uuid.New()
ownerID := uuid.New()
replicaID := uuid.New()
// Track same workspace/user multiple times.
tracker.Track(workspaceID, ownerID, 3, 1)
tracker.Track(workspaceID, ownerID, 4, 2)
tracker.Track(workspaceID, ownerID, 2, 0)
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces, "should be 1 unique workspace")
require.Equal(t, int64(1), summary.UniqueUsers, "should be 1 unique user")
require.Equal(t, int64(9), summary.AllowedRequests, "should accumulate: 3+4+2=9")
require.Equal(t, int64(3), summary.DeniedRequests, "should accumulate: 1+2+0=3")
}
func TestTracker_Track_MultipleWorkspacesUsers(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
tracker := boundaryusage.NewTracker()
replicaID := uuid.New()
// Track 3 different workspaces with 2 different users.
workspace1, workspace2, workspace3 := uuid.New(), uuid.New(), uuid.New()
user1, user2 := uuid.New(), uuid.New()
tracker.Track(workspace1, user1, 1, 0)
tracker.Track(workspace2, user1, 2, 1)
tracker.Track(workspace3, user2, 3, 2)
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(3), summary.UniqueWorkspaces)
require.Equal(t, int64(2), summary.UniqueUsers)
require.Equal(t, int64(6), summary.AllowedRequests)
require.Equal(t, int64(3), summary.DeniedRequests)
}
func TestTracker_Track_Concurrent(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
tracker := boundaryusage.NewTracker()
replicaID := uuid.New()
const numGoroutines = 100
const requestsPerGoroutine = 10
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
workspaceID := uuid.New()
ownerID := uuid.New()
for j := 0; j < requestsPerGoroutine; j++ {
tracker.Track(workspaceID, ownerID, 1, 1)
}
}()
}
wg.Wait()
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(numGoroutines), summary.UniqueWorkspaces)
require.Equal(t, int64(numGoroutines), summary.UniqueUsers)
require.Equal(t, int64(numGoroutines*requestsPerGoroutine), summary.AllowedRequests)
require.Equal(t, int64(numGoroutines*requestsPerGoroutine), summary.DeniedRequests)
}
func TestTracker_FlushToDB_Accumulates(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
tracker := boundaryusage.NewTracker()
replicaID := uuid.New()
workspaceID := uuid.New()
ownerID := uuid.New()
tracker.Track(workspaceID, ownerID, 5, 3)
// First flush is an insert, which resets in-memory stats.
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// Track more data after the reset.
tracker.Track(workspaceID, ownerID, 2, 1)
// Second flush is an update so stats should accumulate.
err = tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// Track even more data.
tracker.Track(workspaceID, ownerID, 3, 2)
// Third flush stats should continue accumulating.
err = tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces)
require.Equal(t, int64(1), summary.UniqueUsers)
require.Equal(t, int64(5), summary.AllowedRequests, "should accumulate after first reset: 2+3=5")
require.Equal(t, int64(3), summary.DeniedRequests, "should accumulate after first reset: 1+2=3")
}
func TestTracker_FlushToDB_NewPeriod(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
tracker := boundaryusage.NewTracker()
replicaID := uuid.New()
workspaceID := uuid.New()
ownerID := uuid.New()
tracker.Track(workspaceID, ownerID, 10, 5)
// First flush.
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// Simulate telemetry reset (new period).
err = db.ResetBoundaryUsageStats(boundaryCtx)
require.NoError(t, err)
// Track new data.
workspace2 := uuid.New()
owner2 := uuid.New()
tracker.Track(workspace2, owner2, 3, 1)
// Flushing again should detect new period and reset in-memory stats.
err = tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// The summary should only contain the new data after reset.
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces, "should only count new workspace")
require.Equal(t, int64(1), summary.UniqueUsers, "should only count new user")
require.Equal(t, int64(3), summary.AllowedRequests, "should only count new requests")
require.Equal(t, int64(1), summary.DeniedRequests, "should only count new requests")
}
func TestTracker_FlushToDB_NoActivity(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
tracker := boundaryusage.NewTracker()
replicaID := uuid.New()
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// Verify nothing was written to DB.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(0), summary.UniqueWorkspaces)
require.Equal(t, int64(0), summary.AllowedRequests)
}
func TestUpsertBoundaryUsageStats_Insert(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
replicaID := uuid.New()
newPeriod, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replicaID,
UniqueWorkspacesCount: 5,
UniqueUsersCount: 3,
AllowedRequests: 100,
DeniedRequests: 10,
})
require.NoError(t, err)
require.True(t, newPeriod, "should return true for insert")
}
func TestUpsertBoundaryUsageStats_Update(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
replicaID := uuid.New()
// First insert.
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replicaID,
UniqueWorkspacesCount: 5,
UniqueUsersCount: 3,
AllowedRequests: 100,
DeniedRequests: 10,
})
require.NoError(t, err)
// Second upsert (update).
newPeriod, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replicaID,
UniqueWorkspacesCount: 8,
UniqueUsersCount: 5,
AllowedRequests: 200,
DeniedRequests: 20,
})
require.NoError(t, err)
require.False(t, newPeriod, "should return false for update")
// Verify the update took effect.
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(8), summary.UniqueWorkspaces)
require.Equal(t, int64(5), summary.UniqueUsers)
require.Equal(t, int64(200), summary.AllowedRequests)
require.Equal(t, int64(20), summary.DeniedRequests)
}
func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
replica1 := uuid.New()
replica2 := uuid.New()
replica3 := uuid.New()
// Insert stats for 3 replicas.
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replica1,
UniqueWorkspacesCount: 10,
UniqueUsersCount: 5,
AllowedRequests: 100,
DeniedRequests: 10,
})
require.NoError(t, err)
_, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replica2,
UniqueWorkspacesCount: 15,
UniqueUsersCount: 8,
AllowedRequests: 150,
DeniedRequests: 15,
})
require.NoError(t, err)
_, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replica3,
UniqueWorkspacesCount: 20,
UniqueUsersCount: 12,
AllowedRequests: 200,
DeniedRequests: 20,
})
require.NoError(t, err)
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
// Verify aggregation (SUM of all replicas).
require.Equal(t, int64(45), summary.UniqueWorkspaces) // 10 + 15 + 20
require.Equal(t, int64(25), summary.UniqueUsers) // 5 + 8 + 12
require.Equal(t, int64(450), summary.AllowedRequests) // 100 + 150 + 200
require.Equal(t, int64(45), summary.DeniedRequests) // 10 + 15 + 20
}
func TestGetBoundaryUsageSummary_Empty(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
// COALESCE should return 0 for all columns.
require.Equal(t, int64(0), summary.UniqueWorkspaces)
require.Equal(t, int64(0), summary.UniqueUsers)
require.Equal(t, int64(0), summary.AllowedRequests)
require.Equal(t, int64(0), summary.DeniedRequests)
}
func TestResetBoundaryUsageStats(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
// Insert stats for multiple replicas.
for i := 0; i < 5; i++ {
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: uuid.New(),
UniqueWorkspacesCount: int64(i + 1),
UniqueUsersCount: int64(i + 1),
AllowedRequests: int64((i + 1) * 10),
DeniedRequests: int64(i + 1),
})
require.NoError(t, err)
}
// Verify data exists.
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Greater(t, summary.AllowedRequests, int64(0))
// Reset.
err = db.ResetBoundaryUsageStats(ctx)
require.NoError(t, err)
// Verify all data is gone.
summary, err = db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(0), summary.UniqueWorkspaces)
require.Equal(t, int64(0), summary.AllowedRequests)
}
func TestDeleteBoundaryUsageStatsByReplicaID(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
replica1 := uuid.New()
replica2 := uuid.New()
// Insert stats for 2 replicas.
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replica1,
UniqueWorkspacesCount: 10,
UniqueUsersCount: 5,
AllowedRequests: 100,
DeniedRequests: 10,
})
require.NoError(t, err)
_, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replica2,
UniqueWorkspacesCount: 20,
UniqueUsersCount: 10,
AllowedRequests: 200,
DeniedRequests: 20,
})
require.NoError(t, err)
// Delete replica1's stats.
err = db.DeleteBoundaryUsageStatsByReplicaID(ctx, replica1)
require.NoError(t, err)
// Verify only replica2's stats remain.
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(20), summary.UniqueWorkspaces)
require.Equal(t, int64(200), summary.AllowedRequests)
}
func TestTracker_TelemetryCycle(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
// Simulate 3 replicas.
tracker1 := boundaryusage.NewTracker()
tracker2 := boundaryusage.NewTracker()
tracker3 := boundaryusage.NewTracker()
replica1 := uuid.New()
replica2 := uuid.New()
replica3 := uuid.New()
// Each tracker records different workspaces/users.
tracker1.Track(uuid.New(), uuid.New(), 10, 1)
tracker1.Track(uuid.New(), uuid.New(), 15, 2)
tracker2.Track(uuid.New(), uuid.New(), 20, 3)
tracker2.Track(uuid.New(), uuid.New(), 25, 4)
tracker2.Track(uuid.New(), uuid.New(), 30, 5)
tracker3.Track(uuid.New(), uuid.New(), 5, 0)
// All replicas flush to database.
require.NoError(t, tracker1.FlushToDB(ctx, db, replica1))
require.NoError(t, tracker2.FlushToDB(ctx, db, replica2))
require.NoError(t, tracker3.FlushToDB(ctx, db, replica3))
// Telemetry aggregates.
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
// Verify aggregation.
require.Equal(t, int64(6), summary.UniqueWorkspaces) // 2 + 3 + 1
require.Equal(t, int64(6), summary.UniqueUsers) // 2 + 3 + 1
require.Equal(t, int64(105), summary.AllowedRequests) // 25 + 75 + 5
require.Equal(t, int64(15), summary.DeniedRequests) // 3 + 12 + 0
// Telemetry resets stats (simulating telemetry report sent).
require.NoError(t, db.ResetBoundaryUsageStats(boundaryCtx))
// Next flush from trackers should detect new period.
tracker1.Track(uuid.New(), uuid.New(), 1, 0)
require.NoError(t, tracker1.FlushToDB(ctx, db, replica1))
// Verify trackers reset their in-memory state.
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces)
require.Equal(t, int64(1), summary.AllowedRequests)
}
func TestTracker_ConcurrentFlushAndTrack(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitMedium)
tracker := boundaryusage.NewTracker()
replicaID := uuid.New()
const numOperations = 50
var wg sync.WaitGroup
// Goroutine 1: Continuously track.
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numOperations; i++ {
tracker.Track(uuid.New(), uuid.New(), 1, 1)
}
}()
// Goroutine 2: Continuously flush.
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numOperations; i++ {
_ = tracker.FlushToDB(ctx, db, replicaID)
}
}()
wg.Wait()
// Final flush to capture any remaining data.
require.NoError(t, tracker.FlushToDB(ctx, db, replicaID))
// Verify stats are non-negative.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.GreaterOrEqual(t, summary.AllowedRequests, int64(0))
require.GreaterOrEqual(t, summary.DeniedRequests, int64(0))
}
+3
View File
@@ -49,6 +49,7 @@ import (
"github.com/coder/coder/v2/coderd/appearance"
"github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/awsidentity"
"github.com/coder/coder/v2/coderd/boundaryusage"
"github.com/coder/coder/v2/coderd/connectionlog"
"github.com/coder/coder/v2/coderd/cryptokeys"
"github.com/coder/coder/v2/coderd/database"
@@ -266,6 +267,8 @@ type Options struct {
DatabaseRolluper *dbrollup.Rolluper
// WorkspaceUsageTracker tracks workspace usage by the CLI.
WorkspaceUsageTracker *workspacestats.UsageTracker
// BoundaryUsageTracker tracks boundary usage for telemetry.
BoundaryUsageTracker *boundaryusage.Tracker
// NotificationsEnqueuer handles enqueueing notifications for delivery by SMTP, webhook, etc.
NotificationsEnqueuer notifications.Enqueuer
+67
View File
@@ -31,6 +31,7 @@ import (
"github.com/coder/coder/v2/buildinfo"
clitelemetry "github.com/coder/coder/v2/cli/telemetry"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk"
@@ -759,6 +760,17 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) {
snapshot.AIBridgeInterceptionsSummaries = summaries
return nil
})
eg.Go(func() error {
summary, err := r.collectBoundaryUsageSummary(ctx)
if err != nil {
return xerrors.Errorf("collect boundary usage summary: %w", err)
}
// Only send a summary if there was actual usage.
if summary != nil && summary.UniqueUsers > 0 {
snapshot.BoundaryUsageSummary = summary
}
return nil
})
err := eg.Wait()
if err != nil {
@@ -837,6 +849,51 @@ func (r *remoteReporter) generateAIBridgeInterceptionsSummaries(ctx context.Cont
return summaries, eg.Wait()
}
// collectBoundaryUsageSummary collects boundary usage statistics from all
// replicas and resets the stats for the next telemetry period. Returns nil if
// another replica has already collected for this period.
func (r *remoteReporter) collectBoundaryUsageSummary(ctx context.Context) (*BoundaryUsageSummary, error) {
// Use twice the snapshot frequency as the staleness limit to ensure we
// capture data from replicas that may have slightly different flush times.
maxStaleness := r.options.SnapshotFrequency * 2
//nolint:gocritic // This is the actual collection of boundary usage tracking.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
// Claim the telemetry lock for this period. Use snapshot frequency so each
// telemetry snapshot period gets exactly one collection.
now := dbtime.Time(r.options.Clock.Now()).UTC()
periodEndingAt := now.Truncate(r.options.SnapshotFrequency)
err := r.options.Database.InsertTelemetryLock(ctx, database.InsertTelemetryLockParams{
EventType: "boundary_usage_summary",
PeriodEndingAt: periodEndingAt,
})
if database.IsUniqueViolation(err, database.UniqueTelemetryLocksPkey) {
r.options.Logger.Debug(ctx, "boundary usage telemetry lock already claimed by another replica, skipping", slog.F("period_ending_at", periodEndingAt))
return nil, nil //nolint:nilnil // This is simple to handle when dealing with telemetry.
}
if err != nil {
return nil, xerrors.Errorf("insert boundary usage telemetry lock (period_ending_at=%q): %w", periodEndingAt, err)
}
summary, err := r.options.Database.GetBoundaryUsageSummary(boundaryCtx, maxStaleness.Milliseconds())
if err != nil {
return nil, xerrors.Errorf("get boundary usage summary: %w", err)
}
// Reset stats after capturing the summary. This deletes all rows so each
// replica will detect a new period on their next flush.
if err := r.options.Database.ResetBoundaryUsageStats(boundaryCtx); err != nil {
return nil, xerrors.Errorf("reset boundary usage stats: %w", err)
}
return &BoundaryUsageSummary{
UniqueWorkspaces: summary.UniqueWorkspaces,
UniqueUsers: summary.UniqueUsers,
AllowedRequests: summary.AllowedRequests,
DeniedRequests: summary.DeniedRequests,
}, nil
}
// ConvertAPIKey anonymizes an API key.
func ConvertAPIKey(apiKey database.APIKey) APIKey {
a := APIKey{
@@ -1309,6 +1366,7 @@ type Snapshot struct {
UserTailnetConnections []UserTailnetConnection `json:"user_tailnet_connections"`
PrebuiltWorkspaces []PrebuiltWorkspace `json:"prebuilt_workspaces"`
AIBridgeInterceptionsSummaries []AIBridgeInterceptionsSummary `json:"aibridge_interceptions_summaries"`
BoundaryUsageSummary *BoundaryUsageSummary `json:"boundary_usage_summary"`
}
// Deployment contains information about the host running Coder.
@@ -1995,6 +2053,15 @@ type AIBridgeInterceptionsSummary struct {
InjectedToolCallErrorCount int64 `json:"injected_tool_call_error_count"`
}
// BoundaryUsageSummary contains aggregated boundary usage statistics across all
// replicas for the telemetry period.
type BoundaryUsageSummary struct {
UniqueWorkspaces int64 `json:"unique_workspaces"`
UniqueUsers int64 `json:"unique_users"`
AllowedRequests int64 `json:"allowed_requests"`
DeniedRequests int64 `json:"denied_requests"`
}
func ConvertAIBridgeInterceptionsSummary(endTime time.Time, provider, model, client string, summary database.CalculateAIBridgeInterceptionsTelemetrySummaryRow) AIBridgeInterceptionsSummary {
return AIBridgeInterceptionsSummary{
ID: uuid.New(),
+127
View File
@@ -19,7 +19,9 @@ import (
"go.uber.org/goleak"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/coderd/boundaryusage"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
@@ -841,3 +843,128 @@ func collectSnapshot(
return testutil.RequireReceive(ctx, t, deployment), testutil.RequireReceive(ctx, t, snapshot)
}
func TestTelemetry_BoundaryUsageSummary(t *testing.T) {
t.Parallel()
t.Run("IncludedInSnapshot", func(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitMedium)
tracker := boundaryusage.NewTracker()
workspace1, workspace2 := uuid.New(), uuid.New()
user1, user2 := uuid.New(), uuid.New()
replicaID := uuid.New()
tracker.Track(workspace1, user1, 10, 2)
tracker.Track(workspace2, user1, 5, 1)
tracker.Track(workspace2, user2, 3, 0)
// Flush the tracker to the database.
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// Collect a snapshot and verify boundary usage is included.
clock := quartz.NewMock(t)
clock.Set(dbtime.Now())
_, snapshot := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options {
opts.Clock = clock
return opts
})
require.NotNil(t, snapshot.BoundaryUsageSummary)
require.Equal(t, int64(2), snapshot.BoundaryUsageSummary.UniqueWorkspaces)
require.Equal(t, int64(2), snapshot.BoundaryUsageSummary.UniqueUsers)
require.Equal(t, int64(10+5+3), snapshot.BoundaryUsageSummary.AllowedRequests)
require.Equal(t, int64(2+1+0), snapshot.BoundaryUsageSummary.DeniedRequests)
})
t.Run("ResetAfterCollection", func(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitMedium)
tracker := boundaryusage.NewTracker()
replicaID := uuid.New()
tracker.Track(uuid.New(), uuid.New(), 5, 1)
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
clock := quartz.NewMock(t)
clock.Set(dbtime.Now())
// First snapshot should have the data.
_, snapshot1 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options {
opts.Clock = clock
return opts
})
require.NotNil(t, snapshot1.BoundaryUsageSummary)
require.Equal(t, int64(5), snapshot1.BoundaryUsageSummary.AllowedRequests)
// Advance clock to next snapshot period to avoid lock conflict.
clock.Advance(30 * time.Minute)
// Second snapshot should have no data (stats were reset).
_, snapshot2 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options {
opts.Clock = clock
return opts
})
// Summary should be nil or have zero values since stats were reset.
if snapshot2.BoundaryUsageSummary != nil {
require.Equal(t, int64(0), snapshot2.BoundaryUsageSummary.AllowedRequests)
}
})
t.Run("OnlyOneReplicaCollects", func(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitMedium)
// Set up boundary usage stats from two replicas.
tracker1 := boundaryusage.NewTracker()
tracker2 := boundaryusage.NewTracker()
replica1ID := uuid.New()
replica2ID := uuid.New()
tracker1.Track(uuid.New(), uuid.New(), 10, 1)
tracker2.Track(uuid.New(), uuid.New(), 20, 2)
err := tracker1.FlushToDB(ctx, db, replica1ID)
require.NoError(t, err)
err = tracker2.FlushToDB(ctx, db, replica2ID)
require.NoError(t, err)
// Verify both replicas' data is in the database.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
const maxStalenessMs = 60000
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, maxStalenessMs)
require.NoError(t, err)
require.Equal(t, int64(10+20), summary.AllowedRequests)
clock := quartz.NewMock(t)
clock.Set(dbtime.Now())
// First snapshot collects and resets.
_, snapshot1 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options {
opts.Clock = clock
return opts
})
require.NotNil(t, snapshot1.BoundaryUsageSummary)
require.Equal(t, int64(10+20), snapshot1.BoundaryUsageSummary.AllowedRequests)
// Second snapshot in same period should skip (lock already claimed).
_, snapshot2 := collectSnapshot(ctx, t, db, func(opts telemetry.Options) telemetry.Options {
opts.Clock = clock
return opts
})
// The second snapshot should have nil because another "replica" already
// claimed the lock for this period.
require.Nil(t, snapshot2.BoundaryUsageSummary)
})
}
+1
View File
@@ -148,6 +148,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate,
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
BoundaryUsageTracker: api.BoundaryUsageTracker,
AccessURL: api.AccessURL,
AppHostname: api.AppHostname,
+6
View File
@@ -24,6 +24,7 @@ import (
"github.com/coder/coder/v2/coderd"
"github.com/coder/coder/v2/coderd/appearance"
agplaudit "github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/boundaryusage"
agplconnectionlog "github.com/coder/coder/v2/coderd/connectionlog"
"github.com/coder/coder/v2/coderd/database"
agpldbauthz "github.com/coder/coder/v2/coderd/database/dbauthz"
@@ -645,6 +646,11 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
}
go api.runEntitlementsLoop(ctx)
api.BoundaryUsageTracker = boundaryusage.NewTracker()
// If there is no boundary usage nothing gets written to the database and
// nothing gets reported in telemetry, so we launch this unconditionally.
go api.BoundaryUsageTracker.StartFlushLoop(ctx, options.Logger.Named("boundary_usage_tracker"), options.Database, api.AGPL.ID)
return api, nil
}