fix: make boundary usage telemetry collection atomic (#21907)

Previously, UpsertBoundaryUsageStats (INSERT...ON CONFLICT DO UPDATE) and
GetAndResetBoundaryUsageSummary (DELETE...RETURNING) could race during
telemetry period cutover. Without serialization, an upsert concurrent with the
delete could lose data (deleted right after being written) or commit after the
delete (miscounted in the next period). Both operations now acquire
LockIDBoundaryUsageStats within a transaction to ensure a clean cutover.
This commit is contained in:
Zach
2026-02-06 09:52:17 -07:00
committed by GitHub
parent e5c3d151bb
commit a31e476623
12 changed files with 162 additions and 277 deletions
+12 -1
View File
@@ -95,7 +95,16 @@ func (t *Tracker) FlushToDB(ctx context.Context, db database.Store, replicaID uu
t.mu.Unlock()
//nolint:gocritic // This is the actual package doing boundary usage tracking.
_, err := db.UpsertBoundaryUsageStats(dbauthz.AsBoundaryUsageTracker(ctx), database.UpsertBoundaryUsageStatsParams{
authCtx := dbauthz.AsBoundaryUsageTracker(ctx)
err := db.InTx(func(tx database.Store) error {
// The advisory lock ensures a clean period cutover by preventing
// this upsert from racing with the aggregate+delete in
// GetAndResetBoundaryUsageSummary. Without it, upserted data
// could be lost or miscounted across periods.
if err := tx.AcquireLock(authCtx, database.LockIDBoundaryUsageStats); err != nil {
return err
}
_, err := tx.UpsertBoundaryUsageStats(authCtx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replicaID,
UniqueWorkspacesCount: workspaceCount, // cumulative, for UPDATE
UniqueUsersCount: userCount, // cumulative, for UPDATE
@@ -104,6 +113,8 @@ func (t *Tracker) FlushToDB(ctx context.Context, db database.Store, replicaID uu
AllowedRequests: allowed,
DeniedRequests: denied,
})
return err
}, nil)
// Always reset cumulative counts to prevent unbounded memory growth (e.g.
// if the DB is unreachable). Copy delta maps to preserve any Track() calls
+42 -87
View File
@@ -45,7 +45,7 @@ func TestTracker_Track_Single(t *testing.T) {
// Verify the data was written correctly.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces)
require.Equal(t, int64(1), summary.UniqueUsers)
@@ -73,7 +73,7 @@ func TestTracker_Track_DuplicateWorkspaceUser(t *testing.T) {
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces, "should be 1 unique workspace")
require.Equal(t, int64(1), summary.UniqueUsers, "should be 1 unique user")
@@ -102,7 +102,7 @@ func TestTracker_Track_MultipleWorkspacesUsers(t *testing.T) {
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(3), summary.UniqueWorkspaces)
require.Equal(t, int64(2), summary.UniqueUsers)
@@ -140,7 +140,7 @@ func TestTracker_Track_Concurrent(t *testing.T) {
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(numGoroutines), summary.UniqueWorkspaces)
require.Equal(t, int64(numGoroutines), summary.UniqueUsers)
@@ -175,7 +175,7 @@ func TestTracker_FlushToDB_Accumulates(t *testing.T) {
require.NoError(t, err)
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces)
require.Equal(t, int64(1), summary.UniqueUsers)
@@ -202,7 +202,7 @@ func TestTracker_FlushToDB_NewPeriod(t *testing.T) {
require.NoError(t, err)
// Simulate telemetry reset (new period).
err = db.ResetBoundaryUsageStats(boundaryCtx)
_, err = db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
// Track new data.
@@ -215,7 +215,7 @@ func TestTracker_FlushToDB_NewPeriod(t *testing.T) {
require.NoError(t, err)
// The summary should only contain the new data after reset.
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces, "should only count new workspace")
require.Equal(t, int64(1), summary.UniqueUsers, "should only count new user")
@@ -237,7 +237,7 @@ func TestTracker_FlushToDB_NoActivity(t *testing.T) {
// Verify nothing was written to DB.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(0), summary.UniqueWorkspaces)
require.Equal(t, int64(0), summary.AllowedRequests)
@@ -265,7 +265,7 @@ func TestUpsertBoundaryUsageStats_Insert(t *testing.T) {
require.True(t, newPeriod, "should return true for insert")
// Verify INSERT used the delta values, not cumulative.
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(5), summary.UniqueWorkspaces)
require.Equal(t, int64(3), summary.UniqueUsers)
@@ -301,7 +301,7 @@ func TestUpsertBoundaryUsageStats_Update(t *testing.T) {
require.False(t, newPeriod, "should return false for update")
// Verify UPDATE used cumulative values.
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(8), summary.UniqueWorkspaces)
require.Equal(t, int64(5), summary.UniqueUsers)
@@ -309,7 +309,7 @@ func TestUpsertBoundaryUsageStats_Update(t *testing.T) {
require.Equal(t, int64(10+20), summary.DeniedRequests)
}
func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
func TestGetAndResetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
@@ -347,7 +347,7 @@ func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
})
require.NoError(t, err)
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
// Verify aggregation (SUM of all replicas).
@@ -357,13 +357,13 @@ func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
require.Equal(t, int64(45), summary.DeniedRequests) // 10 + 15 + 20
}
func TestGetBoundaryUsageSummary_Empty(t *testing.T) {
func TestGetAndResetBoundaryUsageSummary_Empty(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
// COALESCE should return 0 for all columns.
@@ -373,7 +373,7 @@ func TestGetBoundaryUsageSummary_Empty(t *testing.T) {
require.Equal(t, int64(0), summary.DeniedRequests)
}
func TestResetBoundaryUsageStats(t *testing.T) {
func TestGetAndResetBoundaryUsageSummary_DeletesData(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
@@ -391,61 +391,19 @@ func TestResetBoundaryUsageStats(t *testing.T) {
require.NoError(t, err)
}
// Verify data exists.
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Greater(t, summary.AllowedRequests, int64(0))
// Reset.
err = db.ResetBoundaryUsageStats(ctx)
// Should return the summary AND delete all data.
summary, err := db.GetAndResetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1+2+3+4+5), summary.UniqueWorkspaces)
require.Equal(t, int64(10+20+30+40+50), summary.AllowedRequests)
// Verify all data is gone.
summary, err = db.GetBoundaryUsageSummary(ctx, 60000)
summary, err = db.GetAndResetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(0), summary.UniqueWorkspaces)
require.Equal(t, int64(0), summary.AllowedRequests)
}
func TestDeleteBoundaryUsageStatsByReplicaID(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
replica1 := uuid.New()
replica2 := uuid.New()
// Insert stats for 2 replicas. Delta fields are used for INSERT.
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replica1,
UniqueWorkspacesDelta: 10,
UniqueUsersDelta: 5,
AllowedRequests: 100,
DeniedRequests: 10,
})
require.NoError(t, err)
_, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
ReplicaID: replica2,
UniqueWorkspacesDelta: 20,
UniqueUsersDelta: 10,
AllowedRequests: 200,
DeniedRequests: 20,
})
require.NoError(t, err)
// Delete replica1's stats.
err = db.DeleteBoundaryUsageStatsByReplicaID(ctx, replica1)
require.NoError(t, err)
// Verify only replica2's stats remain.
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
require.NoError(t, err)
require.Equal(t, int64(20), summary.UniqueWorkspaces)
require.Equal(t, int64(200), summary.AllowedRequests)
}
func TestTracker_TelemetryCycle(t *testing.T) {
t.Parallel()
@@ -477,8 +435,8 @@ func TestTracker_TelemetryCycle(t *testing.T) {
require.NoError(t, tracker2.FlushToDB(ctx, db, replica2))
require.NoError(t, tracker3.FlushToDB(ctx, db, replica3))
// Telemetry aggregates.
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
// Telemetry aggregates and resets (simulating telemetry report sent).
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
// Verify aggregation.
@@ -487,15 +445,12 @@ func TestTracker_TelemetryCycle(t *testing.T) {
require.Equal(t, int64(105), summary.AllowedRequests) // 25 + 75 + 5
require.Equal(t, int64(15), summary.DeniedRequests) // 3 + 12 + 0
// Telemetry resets stats (simulating telemetry report sent).
require.NoError(t, db.ResetBoundaryUsageStats(boundaryCtx))
// Next flush from trackers should detect new period.
tracker1.Track(uuid.New(), uuid.New(), 1, 0)
require.NoError(t, tracker1.FlushToDB(ctx, db, replica1))
// Verify trackers reset their in-memory state.
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err = db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces)
require.Equal(t, int64(1), summary.AllowedRequests)
@@ -513,30 +468,24 @@ func TestTracker_FlushToDB_NoStaleDataAfterReset(t *testing.T) {
workspaceID := uuid.New()
ownerID := uuid.New()
// Track some data, flush, and verify.
// Track some data and flush.
tracker.Track(workspaceID, ownerID, 10, 5)
err := tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
// Simulate telemetry reset (new period) - this also verifies the data.
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(1), summary.UniqueWorkspaces)
require.Equal(t, int64(10), summary.AllowedRequests)
// Simulate telemetry reset (new period).
err = db.ResetBoundaryUsageStats(boundaryCtx)
require.NoError(t, err)
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(0), summary.AllowedRequests)
// Flush again without any new Track() calls. This should not write stale
// data back to the DB.
err = tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
// Summary should be empty (no stale data written).
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err = db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(0), summary.UniqueWorkspaces)
require.Equal(t, int64(0), summary.UniqueUsers)
@@ -582,7 +531,7 @@ func TestTracker_ConcurrentFlushAndTrack(t *testing.T) {
// Verify stats are non-negative.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.GreaterOrEqual(t, summary.AllowedRequests, int64(0))
require.GreaterOrEqual(t, summary.DeniedRequests, int64(0))
@@ -597,6 +546,17 @@ type trackDuringUpsertDB struct {
userID uuid.UUID
}
func (s *trackDuringUpsertDB) InTx(fn func(database.Store) error, opts *database.TxOptions) error {
return s.Store.InTx(func(tx database.Store) error {
return fn(&trackDuringUpsertDB{
Store: tx,
tracker: s.tracker,
workspaceID: s.workspaceID,
userID: s.userID,
})
}, opts)
}
func (s *trackDuringUpsertDB) UpsertBoundaryUsageStats(ctx context.Context, arg database.UpsertBoundaryUsageStatsParams) (bool, error) {
s.tracker.Track(s.workspaceID, s.userID, 20, 10)
return s.Store.UpsertBoundaryUsageStats(ctx, arg)
@@ -626,17 +586,12 @@ func TestTracker_TrackDuringFlush(t *testing.T) {
err := tracker.FlushToDB(ctx, trackingDB, replicaID)
require.NoError(t, err)
// Verify first flush only wrote the initial data.
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(10), summary.AllowedRequests)
// The second flush should include the Track() call that happened during the
// first flush's DB operation.
// Second flush captures the Track() that happened during the first flush.
err = tracker.FlushToDB(ctx, db, replicaID)
require.NoError(t, err)
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
// Verify both flushes are in the summary.
summary, err := db.GetAndResetBoundaryUsageSummary(boundaryCtx, 60000)
require.NoError(t, err)
require.Equal(t, int64(10+20), summary.AllowedRequests)
require.Equal(t, int64(5+10), summary.DeniedRequests)
+7 -21
View File
@@ -1703,13 +1703,6 @@ func (q *querier) DeleteApplicationConnectAPIKeysByUserID(ctx context.Context, u
return q.db.DeleteApplicationConnectAPIKeysByUserID(ctx, userID)
}
func (q *querier) DeleteBoundaryUsageStatsByReplicaID(ctx context.Context, replicaID uuid.UUID) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceBoundaryUsage); err != nil {
return err
}
return q.db.DeleteBoundaryUsageStatsByReplicaID(ctx, replicaID)
}
func (q *querier) DeleteCryptoKey(ctx context.Context, arg database.DeleteCryptoKeyParams) (database.CryptoKey, error) {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceCryptoKey); err != nil {
return database.CryptoKey{}, err
@@ -2223,6 +2216,13 @@ func (q *querier) GetAllTailnetTunnels(ctx context.Context) ([]database.TailnetT
return q.db.GetAllTailnetTunnels(ctx)
}
func (q *querier) GetAndResetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (database.GetAndResetBoundaryUsageSummaryRow, error) {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceBoundaryUsage); err != nil {
return database.GetAndResetBoundaryUsageSummaryRow{}, err
}
return q.db.GetAndResetBoundaryUsageSummary(ctx, maxStalenessMs)
}
func (q *querier) GetAnnouncementBanners(ctx context.Context) (string, error) {
// No authz checks
return q.db.GetAnnouncementBanners(ctx)
@@ -2271,13 +2271,6 @@ func (q *querier) GetAuthorizationUserRoles(ctx context.Context, userID uuid.UUI
return q.db.GetAuthorizationUserRoles(ctx, userID)
}
func (q *querier) GetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (database.GetBoundaryUsageSummaryRow, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceBoundaryUsage); err != nil {
return database.GetBoundaryUsageSummaryRow{}, err
}
return q.db.GetBoundaryUsageSummary(ctx, maxStalenessMs)
}
func (q *querier) GetConnectionLogsOffset(ctx context.Context, arg database.GetConnectionLogsOffsetParams) ([]database.GetConnectionLogsOffsetRow, error) {
// Just like with the audit logs query, shortcut if the user is an owner.
err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceConnectionLog)
@@ -4891,13 +4884,6 @@ func (q *querier) RemoveUserFromGroups(ctx context.Context, arg database.RemoveU
return q.db.RemoveUserFromGroups(ctx, arg)
}
func (q *querier) ResetBoundaryUsageStats(ctx context.Context) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceBoundaryUsage); err != nil {
return err
}
return q.db.ResetBoundaryUsageStats(ctx)
}
func (q *querier) RevokeDBCryptKey(ctx context.Context, activeKeyDigest string) error {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
return err
+3 -12
View File
@@ -277,11 +277,6 @@ func (s *MethodTestSuite) TestAPIKey() {
dbm.EXPECT().DeleteApplicationConnectAPIKeysByUserID(gomock.Any(), a.UserID).Return(nil).AnyTimes()
check.Args(a.UserID).Asserts(rbac.ResourceApiKey.WithOwner(a.UserID.String()), policy.ActionDelete).Returns()
}))
s.Run("DeleteBoundaryUsageStatsByReplicaID", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) {
replicaID := uuid.New()
dbm.EXPECT().DeleteBoundaryUsageStatsByReplicaID(gomock.Any(), replicaID).Return(nil).AnyTimes()
check.Args(replicaID).Asserts(rbac.ResourceBoundaryUsage, policy.ActionDelete)
}))
s.Run("DeleteExternalAuthLink", s.Mocked(func(dbm *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
a := testutil.Fake(s.T(), faker, database.ExternalAuthLink{})
dbm.EXPECT().GetExternalAuthLink(gomock.Any(), database.GetExternalAuthLinkParams{ProviderID: a.ProviderID, UserID: a.UserID}).Return(a, nil).AnyTimes()
@@ -532,9 +527,9 @@ func (s *MethodTestSuite) TestGroup() {
dbm.EXPECT().RemoveUserFromGroups(gomock.Any(), arg).Return(slice.New(g1.ID, g2.ID), nil).AnyTimes()
check.Args(arg).Asserts(rbac.ResourceSystem, policy.ActionUpdate).Returns(slice.New(g1.ID, g2.ID))
}))
s.Run("ResetBoundaryUsageStats", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) {
dbm.EXPECT().ResetBoundaryUsageStats(gomock.Any()).Return(nil).AnyTimes()
check.Args().Asserts(rbac.ResourceBoundaryUsage, policy.ActionDelete)
s.Run("GetAndResetBoundaryUsageSummary", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) {
dbm.EXPECT().GetAndResetBoundaryUsageSummary(gomock.Any(), int64(1000)).Return(database.GetAndResetBoundaryUsageSummaryRow{}, nil).AnyTimes()
check.Args(int64(1000)).Asserts(rbac.ResourceBoundaryUsage, policy.ActionDelete)
}))
s.Run("UpdateGroupByID", s.Mocked(func(dbm *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
@@ -2991,10 +2986,6 @@ func (s *MethodTestSuite) TestSystemFunctions() {
dbm.EXPECT().GetAuthorizationUserRoles(gomock.Any(), u.ID).Return(database.GetAuthorizationUserRolesRow{}, nil).AnyTimes()
check.Args(u.ID).Asserts(rbac.ResourceSystem, policy.ActionRead)
}))
s.Run("GetBoundaryUsageSummary", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) {
dbm.EXPECT().GetBoundaryUsageSummary(gomock.Any(), int64(1000)).Return(database.GetBoundaryUsageSummaryRow{}, nil).AnyTimes()
check.Args(int64(1000)).Asserts(rbac.ResourceBoundaryUsage, policy.ActionRead)
}))
s.Run("GetDERPMeshKey", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) {
dbm.EXPECT().GetDERPMeshKey(gomock.Any()).Return("testing", nil).AnyTimes()
check.Args().Asserts(rbac.ResourceSystem, policy.ActionRead)
+8 -24
View File
@@ -335,14 +335,6 @@ func (m queryMetricsStore) DeleteApplicationConnectAPIKeysByUserID(ctx context.C
return r0
}
func (m queryMetricsStore) DeleteBoundaryUsageStatsByReplicaID(ctx context.Context, replicaID uuid.UUID) error {
start := time.Now()
r0 := m.s.DeleteBoundaryUsageStatsByReplicaID(ctx, replicaID)
m.queryLatencies.WithLabelValues("DeleteBoundaryUsageStatsByReplicaID").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "DeleteBoundaryUsageStatsByReplicaID").Inc()
return r0
}
func (m queryMetricsStore) DeleteCryptoKey(ctx context.Context, arg database.DeleteCryptoKeyParams) (database.CryptoKey, error) {
start := time.Now()
r0, r1 := m.s.DeleteCryptoKey(ctx, arg)
@@ -854,6 +846,14 @@ func (m queryMetricsStore) GetAllTailnetTunnels(ctx context.Context) ([]database
return r0, r1
}
func (m queryMetricsStore) GetAndResetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (database.GetAndResetBoundaryUsageSummaryRow, error) {
start := time.Now()
r0, r1 := m.s.GetAndResetBoundaryUsageSummary(ctx, maxStalenessMs)
m.queryLatencies.WithLabelValues("GetAndResetBoundaryUsageSummary").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetAndResetBoundaryUsageSummary").Inc()
return r0, r1
}
func (m queryMetricsStore) GetAnnouncementBanners(ctx context.Context) (string, error) {
start := time.Now()
r0, r1 := m.s.GetAnnouncementBanners(ctx)
@@ -902,14 +902,6 @@ func (m queryMetricsStore) GetAuthorizationUserRoles(ctx context.Context, userID
return r0, r1
}
func (m queryMetricsStore) GetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (database.GetBoundaryUsageSummaryRow, error) {
start := time.Now()
r0, r1 := m.s.GetBoundaryUsageSummary(ctx, maxStalenessMs)
m.queryLatencies.WithLabelValues("GetBoundaryUsageSummary").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetBoundaryUsageSummary").Inc()
return r0, r1
}
func (m queryMetricsStore) GetConnectionLogsOffset(ctx context.Context, arg database.GetConnectionLogsOffsetParams) ([]database.GetConnectionLogsOffsetRow, error) {
start := time.Now()
r0, r1 := m.s.GetConnectionLogsOffset(ctx, arg)
@@ -3334,14 +3326,6 @@ func (m queryMetricsStore) RemoveUserFromGroups(ctx context.Context, arg databas
return r0, r1
}
func (m queryMetricsStore) ResetBoundaryUsageStats(ctx context.Context) error {
start := time.Now()
r0 := m.s.ResetBoundaryUsageStats(ctx)
m.queryLatencies.WithLabelValues("ResetBoundaryUsageStats").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "ResetBoundaryUsageStats").Inc()
return r0
}
func (m queryMetricsStore) RevokeDBCryptKey(ctx context.Context, activeKeyDigest string) error {
start := time.Now()
r0 := m.s.RevokeDBCryptKey(ctx, activeKeyDigest)
+15 -43
View File
@@ -511,20 +511,6 @@ func (mr *MockStoreMockRecorder) DeleteApplicationConnectAPIKeysByUserID(ctx, us
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteApplicationConnectAPIKeysByUserID", reflect.TypeOf((*MockStore)(nil).DeleteApplicationConnectAPIKeysByUserID), ctx, userID)
}
// DeleteBoundaryUsageStatsByReplicaID mocks base method.
func (m *MockStore) DeleteBoundaryUsageStatsByReplicaID(ctx context.Context, replicaID uuid.UUID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteBoundaryUsageStatsByReplicaID", ctx, replicaID)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteBoundaryUsageStatsByReplicaID indicates an expected call of DeleteBoundaryUsageStatsByReplicaID.
func (mr *MockStoreMockRecorder) DeleteBoundaryUsageStatsByReplicaID(ctx, replicaID any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteBoundaryUsageStatsByReplicaID", reflect.TypeOf((*MockStore)(nil).DeleteBoundaryUsageStatsByReplicaID), ctx, replicaID)
}
// DeleteCryptoKey mocks base method.
func (m *MockStore) DeleteCryptoKey(ctx context.Context, arg database.DeleteCryptoKeyParams) (database.CryptoKey, error) {
m.ctrl.T.Helper()
@@ -1453,6 +1439,21 @@ func (mr *MockStoreMockRecorder) GetAllTailnetTunnels(ctx any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllTailnetTunnels", reflect.TypeOf((*MockStore)(nil).GetAllTailnetTunnels), ctx)
}
// GetAndResetBoundaryUsageSummary mocks base method.
func (m *MockStore) GetAndResetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (database.GetAndResetBoundaryUsageSummaryRow, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetAndResetBoundaryUsageSummary", ctx, maxStalenessMs)
ret0, _ := ret[0].(database.GetAndResetBoundaryUsageSummaryRow)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetAndResetBoundaryUsageSummary indicates an expected call of GetAndResetBoundaryUsageSummary.
func (mr *MockStoreMockRecorder) GetAndResetBoundaryUsageSummary(ctx, maxStalenessMs any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAndResetBoundaryUsageSummary", reflect.TypeOf((*MockStore)(nil).GetAndResetBoundaryUsageSummary), ctx, maxStalenessMs)
}
// GetAnnouncementBanners mocks base method.
func (m *MockStore) GetAnnouncementBanners(ctx context.Context) (string, error) {
m.ctrl.T.Helper()
@@ -1648,21 +1649,6 @@ func (mr *MockStoreMockRecorder) GetAuthorizedWorkspacesAndAgentsByOwnerID(ctx,
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAuthorizedWorkspacesAndAgentsByOwnerID", reflect.TypeOf((*MockStore)(nil).GetAuthorizedWorkspacesAndAgentsByOwnerID), ctx, ownerID, prepared)
}
// GetBoundaryUsageSummary mocks base method.
func (m *MockStore) GetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (database.GetBoundaryUsageSummaryRow, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetBoundaryUsageSummary", ctx, maxStalenessMs)
ret0, _ := ret[0].(database.GetBoundaryUsageSummaryRow)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetBoundaryUsageSummary indicates an expected call of GetBoundaryUsageSummary.
func (mr *MockStoreMockRecorder) GetBoundaryUsageSummary(ctx, maxStalenessMs any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBoundaryUsageSummary", reflect.TypeOf((*MockStore)(nil).GetBoundaryUsageSummary), ctx, maxStalenessMs)
}
// GetConnectionLogsOffset mocks base method.
func (m *MockStore) GetConnectionLogsOffset(ctx context.Context, arg database.GetConnectionLogsOffsetParams) ([]database.GetConnectionLogsOffsetRow, error) {
m.ctrl.T.Helper()
@@ -6278,20 +6264,6 @@ func (mr *MockStoreMockRecorder) RemoveUserFromGroups(ctx, arg any) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveUserFromGroups", reflect.TypeOf((*MockStore)(nil).RemoveUserFromGroups), ctx, arg)
}
// ResetBoundaryUsageStats mocks base method.
func (m *MockStore) ResetBoundaryUsageStats(ctx context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ResetBoundaryUsageStats", ctx)
ret0, _ := ret[0].(error)
return ret0
}
// ResetBoundaryUsageStats indicates an expected call of ResetBoundaryUsageStats.
func (mr *MockStoreMockRecorder) ResetBoundaryUsageStats(ctx any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetBoundaryUsageStats", reflect.TypeOf((*MockStore)(nil).ResetBoundaryUsageStats), ctx)
}
// RevokeDBCryptKey mocks base method.
func (m *MockStore) RevokeDBCryptKey(ctx context.Context, activeKeyDigest string) error {
m.ctrl.T.Helper()
+1
View File
@@ -14,6 +14,7 @@ const (
LockIDCryptoKeyRotation
LockIDReconcilePrebuilds
LockIDReconcileSystemRoles
LockIDBoundaryUsageStats
)
// GenLockID generates a unique and consistent lock ID from a given string.
+5 -9
View File
@@ -88,8 +88,6 @@ type sqlcQuerier interface {
// be recreated.
DeleteAllWebpushSubscriptions(ctx context.Context) error
DeleteApplicationConnectAPIKeysByUserID(ctx context.Context, userID uuid.UUID) error
// Deletes boundary usage statistics for a specific replica.
DeleteBoundaryUsageStatsByReplicaID(ctx context.Context, replicaID uuid.UUID) error
DeleteCryptoKey(ctx context.Context, arg DeleteCryptoKeyParams) (CryptoKey, error)
DeleteCustomRole(ctx context.Context, arg DeleteCustomRoleParams) error
DeleteExpiredAPIKeys(ctx context.Context, arg DeleteExpiredAPIKeysParams) (int64, error)
@@ -181,6 +179,11 @@ type sqlcQuerier interface {
GetAllTailnetCoordinators(ctx context.Context) ([]TailnetCoordinator, error)
GetAllTailnetPeers(ctx context.Context) ([]TailnetPeer, error)
GetAllTailnetTunnels(ctx context.Context) ([]TailnetTunnel, error)
// Atomic read+delete prevents replicas that flush between a separate read and
// reset from having their data deleted before the next snapshot. Uses a common
// table expression with DELETE...RETURNING so the rows we sum are exactly the
// rows we delete. Stale rows are excluded from the sum but still deleted.
GetAndResetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (GetAndResetBoundaryUsageSummaryRow, error)
GetAnnouncementBanners(ctx context.Context) (string, error)
GetAppSecurityKey(ctx context.Context) (string, error)
GetApplicationName(ctx context.Context) (string, error)
@@ -196,10 +199,6 @@ type sqlcQuerier interface {
// This function returns roles for authorization purposes. Implied member roles
// are included.
GetAuthorizationUserRoles(ctx context.Context, userID uuid.UUID) (GetAuthorizationUserRolesRow, error)
// Aggregates boundary usage statistics across all replicas. Filters to only
// include data where window_start is within the given interval to exclude
// stale data.
GetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (GetBoundaryUsageSummaryRow, error)
GetConnectionLogsOffset(ctx context.Context, arg GetConnectionLogsOffsetParams) ([]GetConnectionLogsOffsetRow, error)
GetCoordinatorResumeTokenSigningKey(ctx context.Context) (string, error)
GetCryptoKeyByFeatureAndSequence(ctx context.Context, arg GetCryptoKeyByFeatureAndSequenceParams) (CryptoKey, error)
@@ -652,9 +651,6 @@ type sqlcQuerier interface {
RegisterWorkspaceProxy(ctx context.Context, arg RegisterWorkspaceProxyParams) (WorkspaceProxy, error)
RemoveUserFromAllGroups(ctx context.Context, userID uuid.UUID) error
RemoveUserFromGroups(ctx context.Context, arg RemoveUserFromGroupsParams) ([]uuid.UUID, error)
// Deletes all boundary usage statistics. Called after telemetry reports the
// aggregated stats. Each replica will insert a fresh row on its next flush.
ResetBoundaryUsageStats(ctx context.Context) error
RevokeDBCryptKey(ctx context.Context, activeKeyDigest string) error
// Note that this selects from the CTE, not the original table. The CTE is named
// the same as the original table to trick sqlc into reusing the existing struct
+26 -35
View File
@@ -1980,39 +1980,41 @@ func (q *sqlQuerier) InsertAuditLog(ctx context.Context, arg InsertAuditLogParam
return i, err
}
const deleteBoundaryUsageStatsByReplicaID = `-- name: DeleteBoundaryUsageStatsByReplicaID :exec
DELETE FROM boundary_usage_stats WHERE replica_id = $1
`
// Deletes boundary usage statistics for a specific replica.
func (q *sqlQuerier) DeleteBoundaryUsageStatsByReplicaID(ctx context.Context, replicaID uuid.UUID) error {
_, err := q.db.ExecContext(ctx, deleteBoundaryUsageStatsByReplicaID, replicaID)
return err
}
const getBoundaryUsageSummary = `-- name: GetBoundaryUsageSummary :one
const getAndResetBoundaryUsageSummary = `-- name: GetAndResetBoundaryUsageSummary :one
WITH deleted AS (
DELETE FROM boundary_usage_stats
RETURNING replica_id, unique_workspaces_count, unique_users_count, allowed_requests, denied_requests, window_start, updated_at
)
SELECT
COALESCE(SUM(unique_workspaces_count), 0)::bigint AS unique_workspaces,
COALESCE(SUM(unique_users_count), 0)::bigint AS unique_users,
COALESCE(SUM(allowed_requests), 0)::bigint AS allowed_requests,
COALESCE(SUM(denied_requests), 0)::bigint AS denied_requests
FROM boundary_usage_stats
WHERE window_start >= NOW() - ($1::bigint || ' ms')::interval
COALESCE(SUM(unique_workspaces_count) FILTER (
WHERE window_start >= NOW() - ($1::bigint || ' ms')::interval
), 0)::bigint AS unique_workspaces,
COALESCE(SUM(unique_users_count) FILTER (
WHERE window_start >= NOW() - ($1::bigint || ' ms')::interval
), 0)::bigint AS unique_users,
COALESCE(SUM(allowed_requests) FILTER (
WHERE window_start >= NOW() - ($1::bigint || ' ms')::interval
), 0)::bigint AS allowed_requests,
COALESCE(SUM(denied_requests) FILTER (
WHERE window_start >= NOW() - ($1::bigint || ' ms')::interval
), 0)::bigint AS denied_requests
FROM deleted
`
type GetBoundaryUsageSummaryRow struct {
type GetAndResetBoundaryUsageSummaryRow struct {
UniqueWorkspaces int64 `db:"unique_workspaces" json:"unique_workspaces"`
UniqueUsers int64 `db:"unique_users" json:"unique_users"`
AllowedRequests int64 `db:"allowed_requests" json:"allowed_requests"`
DeniedRequests int64 `db:"denied_requests" json:"denied_requests"`
}
// Aggregates boundary usage statistics across all replicas. Filters to only
// include data where window_start is within the given interval to exclude
// stale data.
func (q *sqlQuerier) GetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (GetBoundaryUsageSummaryRow, error) {
row := q.db.QueryRowContext(ctx, getBoundaryUsageSummary, maxStalenessMs)
var i GetBoundaryUsageSummaryRow
// Atomic read+delete prevents replicas that flush between a separate read and
// reset from having their data deleted before the next snapshot. Uses a common
// table expression with DELETE...RETURNING so the rows we sum are exactly the
// rows we delete. Stale rows are excluded from the sum but still deleted.
func (q *sqlQuerier) GetAndResetBoundaryUsageSummary(ctx context.Context, maxStalenessMs int64) (GetAndResetBoundaryUsageSummaryRow, error) {
row := q.db.QueryRowContext(ctx, getAndResetBoundaryUsageSummary, maxStalenessMs)
var i GetAndResetBoundaryUsageSummaryRow
err := row.Scan(
&i.UniqueWorkspaces,
&i.UniqueUsers,
@@ -2022,17 +2024,6 @@ func (q *sqlQuerier) GetBoundaryUsageSummary(ctx context.Context, maxStalenessMs
return i, err
}
const resetBoundaryUsageStats = `-- name: ResetBoundaryUsageStats :exec
DELETE FROM boundary_usage_stats
`
// Deletes all boundary usage statistics. Called after telemetry reports the
// aggregated stats. Each replica will insert a fresh row on its next flush.
func (q *sqlQuerier) ResetBoundaryUsageStats(ctx context.Context) error {
_, err := q.db.ExecContext(ctx, resetBoundaryUsageStats)
return err
}
const upsertBoundaryUsageStats = `-- name: UpsertBoundaryUsageStats :one
INSERT INTO boundary_usage_stats (
replica_id,
+22 -19
View File
@@ -27,23 +27,26 @@ INSERT INTO boundary_usage_stats (
updated_at = NOW()
RETURNING (xmax = 0) AS new_period;
-- name: GetBoundaryUsageSummary :one
-- Aggregates boundary usage statistics across all replicas. Filters to only
-- include data where window_start is within the given interval to exclude
-- stale data.
-- name: GetAndResetBoundaryUsageSummary :one
-- Atomic read+delete prevents replicas that flush between a separate read and
-- reset from having their data deleted before the next snapshot. Uses a common
-- table expression with DELETE...RETURNING so the rows we sum are exactly the
-- rows we delete. Stale rows are excluded from the sum but still deleted.
WITH deleted AS (
DELETE FROM boundary_usage_stats
RETURNING *
)
SELECT
COALESCE(SUM(unique_workspaces_count), 0)::bigint AS unique_workspaces,
COALESCE(SUM(unique_users_count), 0)::bigint AS unique_users,
COALESCE(SUM(allowed_requests), 0)::bigint AS allowed_requests,
COALESCE(SUM(denied_requests), 0)::bigint AS denied_requests
FROM boundary_usage_stats
WHERE window_start >= NOW() - (@max_staleness_ms::bigint || ' ms')::interval;
-- name: ResetBoundaryUsageStats :exec
-- Deletes all boundary usage statistics. Called after telemetry reports the
-- aggregated stats. Each replica will insert a fresh row on its next flush.
DELETE FROM boundary_usage_stats;
-- name: DeleteBoundaryUsageStatsByReplicaID :exec
-- Deletes boundary usage statistics for a specific replica.
DELETE FROM boundary_usage_stats WHERE replica_id = @replica_id;
COALESCE(SUM(unique_workspaces_count) FILTER (
WHERE window_start >= NOW() - (@max_staleness_ms::bigint || ' ms')::interval
), 0)::bigint AS unique_workspaces,
COALESCE(SUM(unique_users_count) FILTER (
WHERE window_start >= NOW() - (@max_staleness_ms::bigint || ' ms')::interval
), 0)::bigint AS unique_users,
COALESCE(SUM(allowed_requests) FILTER (
WHERE window_start >= NOW() - (@max_staleness_ms::bigint || ' ms')::interval
), 0)::bigint AS allowed_requests,
COALESCE(SUM(denied_requests) FILTER (
WHERE window_start >= NOW() - (@max_staleness_ms::bigint || ' ms')::interval
), 0)::bigint AS denied_requests
FROM deleted;
+13 -10
View File
@@ -876,17 +876,20 @@ func (r *remoteReporter) collectBoundaryUsageSummary(ctx context.Context) (*Boun
return nil, xerrors.Errorf("insert boundary usage telemetry lock (period_ending_at=%q): %w", periodEndingAt, err)
}
summary, err := r.options.Database.GetBoundaryUsageSummary(boundaryCtx, maxStaleness.Milliseconds())
if err != nil {
return nil, xerrors.Errorf("get boundary usage summary: %w", err)
var summary database.GetAndResetBoundaryUsageSummaryRow
err = r.options.Database.InTx(func(tx database.Store) error {
// The advisory lock use here ensures a clean transition to the next snapshot by
// preventing replicas from upserting row(s) at the same time as we aggregate and
// delete all rows here.
var txErr error
if txErr = tx.AcquireLock(boundaryCtx, database.LockIDBoundaryUsageStats); txErr != nil {
return txErr
}
// Reset stats after capturing the summary. This deletes all rows so each
// replica will detect a new period on their next flush. Note: there is a
// known race condition here that may result in a small telemetry inaccuracy
// with multiple replicas (https://github.com/coder/coder/issues/21770).
if err := r.options.Database.ResetBoundaryUsageStats(boundaryCtx); err != nil {
return nil, xerrors.Errorf("reset boundary usage stats: %w", err)
summary, txErr = tx.GetAndResetBoundaryUsageSummary(boundaryCtx, maxStaleness.Milliseconds())
return txErr
}, nil)
if err != nil {
return nil, xerrors.Errorf("get and reset boundary usage summary: %w", err)
}
return &BoundaryUsageSummary{
-8
View File
@@ -21,7 +21,6 @@ import (
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/coderd/boundaryusage"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
@@ -942,13 +941,6 @@ func TestTelemetry_BoundaryUsageSummary(t *testing.T) {
err = tracker2.FlushToDB(ctx, db, replica2ID)
require.NoError(t, err)
// Verify both replicas' data is in the database.
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
const maxStalenessMs = 60000
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, maxStalenessMs)
require.NoError(t, err)
require.Equal(t, int64(10+20), summary.AllowedRequests)
clock := quartz.NewMock(t)
clock.Set(dbtime.Now())