mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix: handle boundary usage across snapshots and flush races (#21805)
Previously there were two issues that could cause incorrect boundary usage telemetry data. 1. Bad handling across snapshot intervals: After telemetry snapshot deleted the DB row, the next flush would INSERT the stale cumulative data (which included already-reported usage). This would then be overwritten by subsequent UPDATE flushes, causing the delta between the last snapshot and the reset to be lost (under-reporting usage). Additionally, if there was no new usage after the reset, the tracker would carry over all usage from the previous period into the next period (over-reporting usage). 2. Missed usage from a race condition: Track() calls between the first mutex unlock and second mutex lock in FlushToDB() were lost. The data wasn't included in the current flush (already snapshotted) and was wiped by the subsequent reset. This is likely low impact to overall usage numbers in the real world. Fix by tracking unique workspace/user deltas separately from cumulative values and always tracking delta allowed/denied requests. Deltas are used for INSERT (fresh start after reset), cumulative for UPDATE (accurate unique counts within a period). All counters reset atomically before the DB operation so Track() calls during the operation are preserved for the next flush.
This commit is contained in:
@@ -40,8 +40,10 @@
|
||||
// counters. When boundary logs are reported, Track() adds the IDs to the sets
|
||||
// and increments request counters.
|
||||
//
|
||||
// FlushToDB() writes stats to the database, replacing all values with the current
|
||||
// in-memory state. Stats accumulate in memory throughout the telemetry period.
|
||||
// FlushToDB() writes stats to the database only when there's been new activity
|
||||
// since the last flush. This prevents stale data from being written after a
|
||||
// telemetry reset when no new usage occurred. Stats accumulate in memory
|
||||
// throughout the telemetry period.
|
||||
//
|
||||
// A new period is detected when the upsert results in an INSERT (meaning
|
||||
// telemetry deleted the replica's row). At that point, all in-memory stats are
|
||||
|
||||
@@ -14,21 +14,40 @@ import (
|
||||
|
||||
// Tracker tracks boundary usage for telemetry reporting.
|
||||
//
|
||||
// All stats accumulate in memory throughout a telemetry period and are only
|
||||
// reset when a new period begins.
|
||||
// Unique user/workspace counts are tracked both cumulatively and as deltas since
|
||||
// the last flush. The delta is needed because when a new telemetry period starts
|
||||
// (the DB row is deleted), we must only insert data accumulated since the last
|
||||
// flush. If we used cumulative values, stale data from the previous period would
|
||||
// be written to the new row and then lost when subsequent updates overwrite it.
|
||||
//
|
||||
// Request counts are tracked as deltas and accumulated in the database.
|
||||
type Tracker struct {
|
||||
mu sync.Mutex
|
||||
workspaces map[uuid.UUID]struct{}
|
||||
users map[uuid.UUID]struct{}
|
||||
mu sync.Mutex
|
||||
|
||||
// Cumulative unique counts for the current period (used on UPDATE to
|
||||
// replace the DB value with accurate totals).
|
||||
workspaces map[uuid.UUID]struct{}
|
||||
users map[uuid.UUID]struct{}
|
||||
|
||||
// Delta unique counts since last flush (used on INSERT to avoid writing
|
||||
// stale data from the previous period).
|
||||
workspacesDelta map[uuid.UUID]struct{}
|
||||
usersDelta map[uuid.UUID]struct{}
|
||||
|
||||
// Request deltas (always reset when flushing, accumulated in DB).
|
||||
allowedRequests int64
|
||||
deniedRequests int64
|
||||
|
||||
usageSinceLastFlush bool
|
||||
}
|
||||
|
||||
// NewTracker creates a new boundary usage tracker.
|
||||
func NewTracker() *Tracker {
|
||||
return &Tracker{
|
||||
workspaces: make(map[uuid.UUID]struct{}),
|
||||
users: make(map[uuid.UUID]struct{}),
|
||||
workspaces: make(map[uuid.UUID]struct{}),
|
||||
users: make(map[uuid.UUID]struct{}),
|
||||
workspacesDelta: make(map[uuid.UUID]struct{}),
|
||||
usersDelta: make(map[uuid.UUID]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,50 +58,68 @@ func (t *Tracker) Track(workspaceID, ownerID uuid.UUID, allowed, denied int64) {
|
||||
|
||||
t.workspaces[workspaceID] = struct{}{}
|
||||
t.users[ownerID] = struct{}{}
|
||||
t.workspacesDelta[workspaceID] = struct{}{}
|
||||
t.usersDelta[ownerID] = struct{}{}
|
||||
t.allowedRequests += allowed
|
||||
t.deniedRequests += denied
|
||||
t.usageSinceLastFlush = true
|
||||
}
|
||||
|
||||
// FlushToDB writes the accumulated stats to the database. All values are
|
||||
// replaced in the database (they represent the current in-memory state). If the
|
||||
// database row was deleted (new telemetry period), all in-memory stats are reset.
|
||||
// FlushToDB writes stats to the database. For unique counts, cumulative values
|
||||
// are used on UPDATE (replacing the DB value) while delta values are used on
|
||||
// INSERT (starting fresh). Request counts are always deltas, accumulated in DB.
|
||||
// All deltas are reset immediately after snapshot so Track() calls during the
|
||||
// DB operation are preserved for the next flush.
|
||||
func (t *Tracker) FlushToDB(ctx context.Context, db database.Store, replicaID uuid.UUID) error {
|
||||
t.mu.Lock()
|
||||
workspaceCount := int64(len(t.workspaces))
|
||||
userCount := int64(len(t.users))
|
||||
allowed := t.allowedRequests
|
||||
denied := t.deniedRequests
|
||||
t.mu.Unlock()
|
||||
|
||||
// Don't flush if there's no activity.
|
||||
if workspaceCount == 0 && userCount == 0 && allowed == 0 && denied == 0 {
|
||||
if !t.usageSinceLastFlush {
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Snapshot all values.
|
||||
workspaceCount := int64(len(t.workspaces)) // cumulative, for UPDATE
|
||||
userCount := int64(len(t.users)) // cumulative, for UPDATE
|
||||
workspaceDelta := int64(len(t.workspacesDelta)) // delta, for INSERT
|
||||
userDelta := int64(len(t.usersDelta)) // delta, for INSERT
|
||||
allowed := t.allowedRequests // delta, accumulated in DB
|
||||
denied := t.deniedRequests // delta, accumulated in DB
|
||||
|
||||
// Reset all deltas immediately so Track() calls during the DB operation
|
||||
// below are preserved for the next flush.
|
||||
t.workspacesDelta = make(map[uuid.UUID]struct{})
|
||||
t.usersDelta = make(map[uuid.UUID]struct{})
|
||||
t.allowedRequests = 0
|
||||
t.deniedRequests = 0
|
||||
t.usageSinceLastFlush = false
|
||||
t.mu.Unlock()
|
||||
|
||||
//nolint:gocritic // This is the actual package doing boundary usage tracking.
|
||||
newPeriod, err := db.UpsertBoundaryUsageStats(dbauthz.AsBoundaryUsageTracker(ctx), database.UpsertBoundaryUsageStatsParams{
|
||||
_, err := db.UpsertBoundaryUsageStats(dbauthz.AsBoundaryUsageTracker(ctx), database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replicaID,
|
||||
UniqueWorkspacesCount: workspaceCount,
|
||||
UniqueUsersCount: userCount,
|
||||
UniqueWorkspacesCount: workspaceCount, // cumulative, for UPDATE
|
||||
UniqueUsersCount: userCount, // cumulative, for UPDATE
|
||||
UniqueWorkspacesDelta: workspaceDelta, // delta, for INSERT
|
||||
UniqueUsersDelta: userDelta, // delta, for INSERT
|
||||
AllowedRequests: allowed,
|
||||
DeniedRequests: denied,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If this was an insert (new period), reset all stats. Any Track() calls
|
||||
// that occurred during the DB operation will be counted in the next period.
|
||||
if newPeriod {
|
||||
t.mu.Lock()
|
||||
t.workspaces = make(map[uuid.UUID]struct{})
|
||||
t.users = make(map[uuid.UUID]struct{})
|
||||
t.allowedRequests = 0
|
||||
t.deniedRequests = 0
|
||||
t.mu.Unlock()
|
||||
// Always reset cumulative counts to prevent unbounded memory growth (e.g.
|
||||
// if the DB is unreachable). Copy delta maps to preserve any Track() calls
|
||||
// that occurred during the DB operation above.
|
||||
t.mu.Lock()
|
||||
t.workspaces = make(map[uuid.UUID]struct{})
|
||||
t.users = make(map[uuid.UUID]struct{})
|
||||
for id := range t.workspacesDelta {
|
||||
t.workspaces[id] = struct{}{}
|
||||
}
|
||||
for id := range t.usersDelta {
|
||||
t.users[id] = struct{}{}
|
||||
}
|
||||
t.mu.Unlock()
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// StartFlushLoop begins the periodic flush loop that writes accumulated stats
|
||||
|
||||
@@ -159,23 +159,18 @@ func TestTracker_FlushToDB_Accumulates(t *testing.T) {
|
||||
workspaceID := uuid.New()
|
||||
ownerID := uuid.New()
|
||||
|
||||
// First flush is an insert, resets unique counts (new period).
|
||||
tracker.Track(workspaceID, ownerID, 5, 3)
|
||||
|
||||
// First flush is an insert, which resets in-memory stats.
|
||||
err := tracker.FlushToDB(ctx, db, replicaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Track more data after the reset.
|
||||
// Track & flush more data. Same workspace/user, so unique counts stay at 1.
|
||||
tracker.Track(workspaceID, ownerID, 2, 1)
|
||||
|
||||
// Second flush is an update so stats should accumulate.
|
||||
err = tracker.FlushToDB(ctx, db, replicaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Track even more data.
|
||||
// Track & flush even more data to continue accumulation.
|
||||
tracker.Track(workspaceID, ownerID, 3, 2)
|
||||
|
||||
// Third flush stats should continue accumulating.
|
||||
err = tracker.FlushToDB(ctx, db, replicaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -184,8 +179,8 @@ func TestTracker_FlushToDB_Accumulates(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), summary.UniqueWorkspaces)
|
||||
require.Equal(t, int64(1), summary.UniqueUsers)
|
||||
require.Equal(t, int64(5), summary.AllowedRequests, "should accumulate after first reset: 2+3=5")
|
||||
require.Equal(t, int64(3), summary.DeniedRequests, "should accumulate after first reset: 1+2=3")
|
||||
require.Equal(t, int64(5+2+3), summary.AllowedRequests)
|
||||
require.Equal(t, int64(3+1+2), summary.DeniedRequests)
|
||||
}
|
||||
|
||||
func TestTracker_FlushToDB_NewPeriod(t *testing.T) {
|
||||
@@ -256,15 +251,24 @@ func TestUpsertBoundaryUsageStats_Insert(t *testing.T) {
|
||||
|
||||
replicaID := uuid.New()
|
||||
|
||||
// Set different values for delta vs cumulative to verify INSERT uses delta.
|
||||
newPeriod, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replicaID,
|
||||
UniqueWorkspacesCount: 5,
|
||||
UniqueUsersCount: 3,
|
||||
UniqueWorkspacesDelta: 5,
|
||||
UniqueUsersDelta: 3,
|
||||
UniqueWorkspacesCount: 999, // should be ignored on INSERT
|
||||
UniqueUsersCount: 999, // should be ignored on INSERT
|
||||
AllowedRequests: 100,
|
||||
DeniedRequests: 10,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, newPeriod, "should return true for insert")
|
||||
|
||||
// Verify INSERT used the delta values, not cumulative.
|
||||
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(5), summary.UniqueWorkspaces)
|
||||
require.Equal(t, int64(3), summary.UniqueUsers)
|
||||
}
|
||||
|
||||
func TestUpsertBoundaryUsageStats_Update(t *testing.T) {
|
||||
@@ -275,34 +279,34 @@ func TestUpsertBoundaryUsageStats_Update(t *testing.T) {
|
||||
|
||||
replicaID := uuid.New()
|
||||
|
||||
// First insert.
|
||||
// First insert uses delta fields.
|
||||
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replicaID,
|
||||
UniqueWorkspacesCount: 5,
|
||||
UniqueUsersCount: 3,
|
||||
UniqueWorkspacesDelta: 5,
|
||||
UniqueUsersDelta: 3,
|
||||
AllowedRequests: 100,
|
||||
DeniedRequests: 10,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Second upsert (update).
|
||||
// Second upsert (update). Set different delta vs cumulative to verify UPDATE uses cumulative.
|
||||
newPeriod, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replicaID,
|
||||
UniqueWorkspacesCount: 8,
|
||||
UniqueUsersCount: 5,
|
||||
UniqueWorkspacesCount: 8, // cumulative, should be used
|
||||
UniqueUsersCount: 5, // cumulative, should be used
|
||||
AllowedRequests: 200,
|
||||
DeniedRequests: 20,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.False(t, newPeriod, "should return false for update")
|
||||
|
||||
// Verify the update took effect.
|
||||
// Verify UPDATE used cumulative values.
|
||||
summary, err := db.GetBoundaryUsageSummary(ctx, 60000)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(8), summary.UniqueWorkspaces)
|
||||
require.Equal(t, int64(5), summary.UniqueUsers)
|
||||
require.Equal(t, int64(200), summary.AllowedRequests)
|
||||
require.Equal(t, int64(20), summary.DeniedRequests)
|
||||
require.Equal(t, int64(100+200), summary.AllowedRequests)
|
||||
require.Equal(t, int64(10+20), summary.DeniedRequests)
|
||||
}
|
||||
|
||||
func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
|
||||
@@ -315,11 +319,11 @@ func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
|
||||
replica2 := uuid.New()
|
||||
replica3 := uuid.New()
|
||||
|
||||
// Insert stats for 3 replicas.
|
||||
// Insert stats for 3 replicas. Delta fields are used for INSERT.
|
||||
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replica1,
|
||||
UniqueWorkspacesCount: 10,
|
||||
UniqueUsersCount: 5,
|
||||
UniqueWorkspacesDelta: 10,
|
||||
UniqueUsersDelta: 5,
|
||||
AllowedRequests: 100,
|
||||
DeniedRequests: 10,
|
||||
})
|
||||
@@ -327,8 +331,8 @@ func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
|
||||
|
||||
_, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replica2,
|
||||
UniqueWorkspacesCount: 15,
|
||||
UniqueUsersCount: 8,
|
||||
UniqueWorkspacesDelta: 15,
|
||||
UniqueUsersDelta: 8,
|
||||
AllowedRequests: 150,
|
||||
DeniedRequests: 15,
|
||||
})
|
||||
@@ -336,8 +340,8 @@ func TestGetBoundaryUsageSummary_MultipleReplicas(t *testing.T) {
|
||||
|
||||
_, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replica3,
|
||||
UniqueWorkspacesCount: 20,
|
||||
UniqueUsersCount: 12,
|
||||
UniqueWorkspacesDelta: 20,
|
||||
UniqueUsersDelta: 12,
|
||||
AllowedRequests: 200,
|
||||
DeniedRequests: 20,
|
||||
})
|
||||
@@ -375,12 +379,12 @@ func TestResetBoundaryUsageStats(t *testing.T) {
|
||||
db, _ := dbtestutil.NewDB(t)
|
||||
ctx := dbauthz.AsBoundaryUsageTracker(context.Background())
|
||||
|
||||
// Insert stats for multiple replicas.
|
||||
// Insert stats for multiple replicas. Delta fields are used for INSERT.
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: uuid.New(),
|
||||
UniqueWorkspacesCount: int64(i + 1),
|
||||
UniqueUsersCount: int64(i + 1),
|
||||
UniqueWorkspacesDelta: int64(i + 1),
|
||||
UniqueUsersDelta: int64(i + 1),
|
||||
AllowedRequests: int64((i + 1) * 10),
|
||||
DeniedRequests: int64(i + 1),
|
||||
})
|
||||
@@ -412,11 +416,11 @@ func TestDeleteBoundaryUsageStatsByReplicaID(t *testing.T) {
|
||||
replica1 := uuid.New()
|
||||
replica2 := uuid.New()
|
||||
|
||||
// Insert stats for 2 replicas.
|
||||
// Insert stats for 2 replicas. Delta fields are used for INSERT.
|
||||
_, err := db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replica1,
|
||||
UniqueWorkspacesCount: 10,
|
||||
UniqueUsersCount: 5,
|
||||
UniqueWorkspacesDelta: 10,
|
||||
UniqueUsersDelta: 5,
|
||||
AllowedRequests: 100,
|
||||
DeniedRequests: 10,
|
||||
})
|
||||
@@ -424,8 +428,8 @@ func TestDeleteBoundaryUsageStatsByReplicaID(t *testing.T) {
|
||||
|
||||
_, err = db.UpsertBoundaryUsageStats(ctx, database.UpsertBoundaryUsageStatsParams{
|
||||
ReplicaID: replica2,
|
||||
UniqueWorkspacesCount: 20,
|
||||
UniqueUsersCount: 10,
|
||||
UniqueWorkspacesDelta: 20,
|
||||
UniqueUsersDelta: 10,
|
||||
AllowedRequests: 200,
|
||||
DeniedRequests: 20,
|
||||
})
|
||||
@@ -497,6 +501,49 @@ func TestTracker_TelemetryCycle(t *testing.T) {
|
||||
require.Equal(t, int64(1), summary.AllowedRequests)
|
||||
}
|
||||
|
||||
func TestTracker_FlushToDB_NoStaleDataAfterReset(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, _ := dbtestutil.NewDB(t)
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
|
||||
|
||||
tracker := boundaryusage.NewTracker()
|
||||
replicaID := uuid.New()
|
||||
workspaceID := uuid.New()
|
||||
ownerID := uuid.New()
|
||||
|
||||
// Track some data, flush, and verify.
|
||||
tracker.Track(workspaceID, ownerID, 10, 5)
|
||||
err := tracker.FlushToDB(ctx, db, replicaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), summary.UniqueWorkspaces)
|
||||
require.Equal(t, int64(10), summary.AllowedRequests)
|
||||
|
||||
// Simulate telemetry reset (new period).
|
||||
err = db.ResetBoundaryUsageStats(boundaryCtx)
|
||||
require.NoError(t, err)
|
||||
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(0), summary.AllowedRequests)
|
||||
|
||||
// Flush again without any new Track() calls. This should not write stale
|
||||
// data back to the DB.
|
||||
err = tracker.FlushToDB(ctx, db, replicaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Summary should be empty (no stale data written).
|
||||
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(0), summary.UniqueWorkspaces)
|
||||
require.Equal(t, int64(0), summary.UniqueUsers)
|
||||
require.Equal(t, int64(0), summary.AllowedRequests)
|
||||
require.Equal(t, int64(0), summary.DeniedRequests)
|
||||
}
|
||||
|
||||
func TestTracker_ConcurrentFlushAndTrack(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -540,3 +587,57 @@ func TestTracker_ConcurrentFlushAndTrack(t *testing.T) {
|
||||
require.GreaterOrEqual(t, summary.AllowedRequests, int64(0))
|
||||
require.GreaterOrEqual(t, summary.DeniedRequests, int64(0))
|
||||
}
|
||||
|
||||
// trackDuringUpsertDB wraps a database.Store to call Track() during the
|
||||
// UpsertBoundaryUsageStats operation, simulating a concurrent Track() call.
|
||||
type trackDuringUpsertDB struct {
|
||||
database.Store
|
||||
tracker *boundaryusage.Tracker
|
||||
workspaceID uuid.UUID
|
||||
userID uuid.UUID
|
||||
}
|
||||
|
||||
func (s *trackDuringUpsertDB) UpsertBoundaryUsageStats(ctx context.Context, arg database.UpsertBoundaryUsageStatsParams) (bool, error) {
|
||||
s.tracker.Track(s.workspaceID, s.userID, 20, 10)
|
||||
return s.Store.UpsertBoundaryUsageStats(ctx, arg)
|
||||
}
|
||||
|
||||
func TestTracker_TrackDuringFlush(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, _ := dbtestutil.NewDB(t)
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
boundaryCtx := dbauthz.AsBoundaryUsageTracker(ctx)
|
||||
|
||||
tracker := boundaryusage.NewTracker()
|
||||
replicaID := uuid.New()
|
||||
|
||||
// Track some initial data.
|
||||
tracker.Track(uuid.New(), uuid.New(), 10, 5)
|
||||
|
||||
trackingDB := &trackDuringUpsertDB{
|
||||
Store: db,
|
||||
tracker: tracker,
|
||||
workspaceID: uuid.New(),
|
||||
userID: uuid.New(),
|
||||
}
|
||||
|
||||
// Flush will call Track() during the DB operation.
|
||||
err := tracker.FlushToDB(ctx, trackingDB, replicaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify first flush only wrote the initial data.
|
||||
summary, err := db.GetBoundaryUsageSummary(boundaryCtx, 60000)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(10), summary.AllowedRequests)
|
||||
|
||||
// The second flush should include the Track() call that happened during the
|
||||
// first flush's DB operation.
|
||||
err = tracker.FlushToDB(ctx, db, replicaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
summary, err = db.GetBoundaryUsageSummary(boundaryCtx, 60000)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(10+20), summary.AllowedRequests)
|
||||
require.Equal(t, int64(5+10), summary.DeniedRequests)
|
||||
}
|
||||
|
||||
@@ -762,9 +762,10 @@ type sqlcQuerier interface {
|
||||
UpsertAnnouncementBanners(ctx context.Context, value string) error
|
||||
UpsertAppSecurityKey(ctx context.Context, value string) error
|
||||
UpsertApplicationName(ctx context.Context, value string) error
|
||||
// Upserts boundary usage statistics for a replica. All values are replaced with
|
||||
// the current in-memory state. Returns true if this was an insert (new period),
|
||||
// false if update.
|
||||
// Upserts boundary usage statistics for a replica. On INSERT (new period), uses
|
||||
// delta values for unique counts (only data since last flush). On UPDATE, uses
|
||||
// cumulative values for unique counts (accurate period totals). Request counts
|
||||
// are always deltas, accumulated in DB. Returns true if insert, false if update.
|
||||
UpsertBoundaryUsageStats(ctx context.Context, arg UpsertBoundaryUsageStatsParams) (bool, error)
|
||||
UpsertConnectionLog(ctx context.Context, arg UpsertConnectionLogParams) (ConnectionLog, error)
|
||||
UpsertCoordinatorResumeTokenSigningKey(ctx context.Context, value string) error
|
||||
|
||||
@@ -2051,32 +2051,37 @@ INSERT INTO boundary_usage_stats (
|
||||
NOW(),
|
||||
NOW()
|
||||
) ON CONFLICT (replica_id) DO UPDATE SET
|
||||
unique_workspaces_count = EXCLUDED.unique_workspaces_count,
|
||||
unique_users_count = EXCLUDED.unique_users_count,
|
||||
allowed_requests = EXCLUDED.allowed_requests,
|
||||
denied_requests = EXCLUDED.denied_requests,
|
||||
unique_workspaces_count = $6,
|
||||
unique_users_count = $7,
|
||||
allowed_requests = boundary_usage_stats.allowed_requests + EXCLUDED.allowed_requests,
|
||||
denied_requests = boundary_usage_stats.denied_requests + EXCLUDED.denied_requests,
|
||||
updated_at = NOW()
|
||||
RETURNING (xmax = 0) AS new_period
|
||||
`
|
||||
|
||||
type UpsertBoundaryUsageStatsParams struct {
|
||||
ReplicaID uuid.UUID `db:"replica_id" json:"replica_id"`
|
||||
UniqueWorkspacesCount int64 `db:"unique_workspaces_count" json:"unique_workspaces_count"`
|
||||
UniqueUsersCount int64 `db:"unique_users_count" json:"unique_users_count"`
|
||||
UniqueWorkspacesDelta int64 `db:"unique_workspaces_delta" json:"unique_workspaces_delta"`
|
||||
UniqueUsersDelta int64 `db:"unique_users_delta" json:"unique_users_delta"`
|
||||
AllowedRequests int64 `db:"allowed_requests" json:"allowed_requests"`
|
||||
DeniedRequests int64 `db:"denied_requests" json:"denied_requests"`
|
||||
UniqueWorkspacesCount int64 `db:"unique_workspaces_count" json:"unique_workspaces_count"`
|
||||
UniqueUsersCount int64 `db:"unique_users_count" json:"unique_users_count"`
|
||||
}
|
||||
|
||||
// Upserts boundary usage statistics for a replica. All values are replaced with
|
||||
// the current in-memory state. Returns true if this was an insert (new period),
|
||||
// false if update.
|
||||
// Upserts boundary usage statistics for a replica. On INSERT (new period), uses
|
||||
// delta values for unique counts (only data since last flush). On UPDATE, uses
|
||||
// cumulative values for unique counts (accurate period totals). Request counts
|
||||
// are always deltas, accumulated in DB. Returns true if insert, false if update.
|
||||
func (q *sqlQuerier) UpsertBoundaryUsageStats(ctx context.Context, arg UpsertBoundaryUsageStatsParams) (bool, error) {
|
||||
row := q.db.QueryRowContext(ctx, upsertBoundaryUsageStats,
|
||||
arg.ReplicaID,
|
||||
arg.UniqueWorkspacesCount,
|
||||
arg.UniqueUsersCount,
|
||||
arg.UniqueWorkspacesDelta,
|
||||
arg.UniqueUsersDelta,
|
||||
arg.AllowedRequests,
|
||||
arg.DeniedRequests,
|
||||
arg.UniqueWorkspacesCount,
|
||||
arg.UniqueUsersCount,
|
||||
)
|
||||
var new_period bool
|
||||
err := row.Scan(&new_period)
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
-- name: UpsertBoundaryUsageStats :one
|
||||
-- Upserts boundary usage statistics for a replica. All values are replaced with
|
||||
-- the current in-memory state. Returns true if this was an insert (new period),
|
||||
-- false if update.
|
||||
-- Upserts boundary usage statistics for a replica. On INSERT (new period), uses
|
||||
-- delta values for unique counts (only data since last flush). On UPDATE, uses
|
||||
-- cumulative values for unique counts (accurate period totals). Request counts
|
||||
-- are always deltas, accumulated in DB. Returns true if insert, false if update.
|
||||
INSERT INTO boundary_usage_stats (
|
||||
replica_id,
|
||||
unique_workspaces_count,
|
||||
@@ -12,17 +13,17 @@ INSERT INTO boundary_usage_stats (
|
||||
updated_at
|
||||
) VALUES (
|
||||
@replica_id,
|
||||
@unique_workspaces_count,
|
||||
@unique_users_count,
|
||||
@unique_workspaces_delta,
|
||||
@unique_users_delta,
|
||||
@allowed_requests,
|
||||
@denied_requests,
|
||||
NOW(),
|
||||
NOW()
|
||||
) ON CONFLICT (replica_id) DO UPDATE SET
|
||||
unique_workspaces_count = EXCLUDED.unique_workspaces_count,
|
||||
unique_users_count = EXCLUDED.unique_users_count,
|
||||
allowed_requests = EXCLUDED.allowed_requests,
|
||||
denied_requests = EXCLUDED.denied_requests,
|
||||
unique_workspaces_count = @unique_workspaces_count,
|
||||
unique_users_count = @unique_users_count,
|
||||
allowed_requests = boundary_usage_stats.allowed_requests + EXCLUDED.allowed_requests,
|
||||
denied_requests = boundary_usage_stats.denied_requests + EXCLUDED.denied_requests,
|
||||
updated_at = NOW()
|
||||
RETURNING (xmax = 0) AS new_period;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user