mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
feat: implement non-brittle TestDBPurgeAuthorization (#21442)
Closes #21440 The `TestDBPurgeAuthorization` test was overfitting by calling each purge method individually, which reimplemented dbpurge logic in the test and created a maintenance burden. When new purge steps are added, they either need to be reflected in the test or there will be a testing blindspot. This change extracts the `doTick` closure into an exported `PurgeTick` function that returns an error, making the core purge logic testable. The test now calls `PurgeTick` directly to exercise the actual dbpurge behavior rather than reimplementing it. Retention values are configured to ensure all purge operations run, so we test RBAC permissions for all code paths. - Tests actual dbpurge behavior instead of reimplementing it - Automatically covers new purge steps when they're added - Still validates that all operations have proper RBAC permissions The test focuses on authorization (checking for RBAC errors) rather than verifying deletion behavior, which is already covered by other tests like `TestDeleteExpiredAPIKeys` and `TestDeleteOldAuditLogs`.
This commit is contained in:
+152
-131
@@ -64,140 +64,27 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder
|
||||
}, []string{"record_type"})
|
||||
reg.MustRegister(recordsPurged)
|
||||
|
||||
inst := &instance{
|
||||
cancel: cancelFunc,
|
||||
closed: closed,
|
||||
logger: logger,
|
||||
vals: vals,
|
||||
clk: clk,
|
||||
iterationDuration: iterationDuration,
|
||||
recordsPurged: recordsPurged,
|
||||
}
|
||||
|
||||
// Start the ticker with the initial delay.
|
||||
ticker := clk.NewTicker(delay)
|
||||
doTick := func(ctx context.Context, start time.Time) {
|
||||
defer ticker.Reset(delay)
|
||||
// Start a transaction to grab advisory lock, we don't want to run
|
||||
// multiple purges at the same time (multiple replicas).
|
||||
if err := db.InTx(func(tx database.Store) error {
|
||||
// Acquire a lock to ensure that only one instance of the
|
||||
// purge is running at a time.
|
||||
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBPurge)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
logger.Debug(ctx, "unable to acquire lock for purging old database entries, skipping")
|
||||
return nil
|
||||
}
|
||||
|
||||
var purgedWorkspaceAgentLogs int64
|
||||
workspaceAgentLogsRetention := vals.Retention.WorkspaceAgentLogs.Value()
|
||||
if workspaceAgentLogsRetention > 0 {
|
||||
deleteOldWorkspaceAgentLogsBefore := start.Add(-workspaceAgentLogsRetention)
|
||||
purgedWorkspaceAgentLogs, err = tx.DeleteOldWorkspaceAgentLogs(ctx, deleteOldWorkspaceAgentLogsBefore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old workspace agent logs: %w", err)
|
||||
}
|
||||
}
|
||||
if err := tx.DeleteOldWorkspaceAgentStats(ctx); err != nil {
|
||||
return xerrors.Errorf("failed to delete old workspace agent stats: %w", err)
|
||||
}
|
||||
if err := tx.DeleteOldProvisionerDaemons(ctx); err != nil {
|
||||
return xerrors.Errorf("failed to delete old provisioner daemons: %w", err)
|
||||
}
|
||||
if err := tx.DeleteOldNotificationMessages(ctx); err != nil {
|
||||
return xerrors.Errorf("failed to delete old notification messages: %w", err)
|
||||
}
|
||||
if err := tx.ExpirePrebuildsAPIKeys(ctx, dbtime.Time(start)); err != nil {
|
||||
return xerrors.Errorf("failed to expire prebuilds user api keys: %w", err)
|
||||
}
|
||||
|
||||
var expiredAPIKeys int64
|
||||
apiKeysRetention := vals.Retention.APIKeys.Value()
|
||||
if apiKeysRetention > 0 {
|
||||
// Delete keys that have been expired for at least the retention period.
|
||||
// A higher retention period allows the backend to return a more helpful
|
||||
// error message when a user tries to use an expired key.
|
||||
deleteExpiredKeysBefore := start.Add(-apiKeysRetention)
|
||||
expiredAPIKeys, err = tx.DeleteExpiredAPIKeys(ctx, database.DeleteExpiredAPIKeysParams{
|
||||
Before: dbtime.Time(deleteExpiredKeysBefore),
|
||||
// There could be a lot of expired keys here, so set a limit to prevent
|
||||
// this taking too long. This runs every 10 minutes, so it deletes
|
||||
// ~1.5m keys per day at most.
|
||||
LimitCount: 10000,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete expired api keys: %w", err)
|
||||
}
|
||||
}
|
||||
deleteOldTelemetryLocksBefore := start.Add(-maxTelemetryHeartbeatAge)
|
||||
if err := tx.DeleteOldTelemetryLocks(ctx, deleteOldTelemetryLocksBefore); err != nil {
|
||||
return xerrors.Errorf("failed to delete old telemetry locks: %w", err)
|
||||
}
|
||||
|
||||
deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge)
|
||||
if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{
|
||||
BeforeTime: deleteOldAuditLogConnectionEventsBefore,
|
||||
LimitCount: auditLogConnectionEventBatchSize,
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("failed to delete old audit log connection events: %w", err)
|
||||
}
|
||||
|
||||
var purgedAIBridgeRecords int64
|
||||
aibridgeRetention := vals.AI.BridgeConfig.Retention.Value()
|
||||
if aibridgeRetention > 0 {
|
||||
deleteAIBridgeRecordsBefore := start.Add(-aibridgeRetention)
|
||||
// nolint:gocritic // Needs to run as aibridge context.
|
||||
purgedAIBridgeRecords, err = tx.DeleteOldAIBridgeRecords(dbauthz.AsAIBridged(ctx), deleteAIBridgeRecordsBefore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old aibridge records: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var purgedConnectionLogs int64
|
||||
connectionLogsRetention := vals.Retention.ConnectionLogs.Value()
|
||||
if connectionLogsRetention > 0 {
|
||||
deleteConnectionLogsBefore := start.Add(-connectionLogsRetention)
|
||||
purgedConnectionLogs, err = tx.DeleteOldConnectionLogs(ctx, database.DeleteOldConnectionLogsParams{
|
||||
BeforeTime: deleteConnectionLogsBefore,
|
||||
LimitCount: connectionLogsBatchSize,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old connection logs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var purgedAuditLogs int64
|
||||
auditLogsRetention := vals.Retention.AuditLogs.Value()
|
||||
if auditLogsRetention > 0 {
|
||||
deleteAuditLogsBefore := start.Add(-auditLogsRetention)
|
||||
purgedAuditLogs, err = tx.DeleteOldAuditLogs(ctx, database.DeleteOldAuditLogsParams{
|
||||
BeforeTime: deleteAuditLogsBefore,
|
||||
LimitCount: auditLogsBatchSize,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old audit logs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "purged old database entries",
|
||||
slog.F("workspace_agent_logs", purgedWorkspaceAgentLogs),
|
||||
slog.F("expired_api_keys", expiredAPIKeys),
|
||||
slog.F("aibridge_records", purgedAIBridgeRecords),
|
||||
slog.F("connection_logs", purgedConnectionLogs),
|
||||
slog.F("audit_logs", purgedAuditLogs),
|
||||
slog.F("duration", clk.Since(start)),
|
||||
)
|
||||
|
||||
duration := clk.Since(start)
|
||||
iterationDuration.WithLabelValues("true").Observe(duration.Seconds())
|
||||
recordsPurged.WithLabelValues("workspace_agent_logs").Add(float64(purgedWorkspaceAgentLogs))
|
||||
recordsPurged.WithLabelValues("expired_api_keys").Add(float64(expiredAPIKeys))
|
||||
recordsPurged.WithLabelValues("aibridge_records").Add(float64(purgedAIBridgeRecords))
|
||||
recordsPurged.WithLabelValues("connection_logs").Add(float64(purgedConnectionLogs))
|
||||
recordsPurged.WithLabelValues("audit_logs").Add(float64(purgedAuditLogs))
|
||||
|
||||
return nil
|
||||
}, database.DefaultTXOptions().WithID("db_purge")); err != nil {
|
||||
err := inst.purgeTick(ctx, db, start)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "failed to purge old database entries", slog.Error(err))
|
||||
|
||||
// Record metrics for failed purge iteration.
|
||||
duration := clk.Since(start)
|
||||
iterationDuration.WithLabelValues("false").Observe(duration.Seconds())
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,15 +103,149 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder
|
||||
}
|
||||
}
|
||||
})
|
||||
return &instance{
|
||||
cancel: cancelFunc,
|
||||
closed: closed,
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
// purgeTick performs a single purge iteration. It returns an error if the
|
||||
// purge fails.
|
||||
func (i *instance) purgeTick(ctx context.Context, db database.Store, start time.Time) error {
|
||||
// Start a transaction to grab advisory lock, we don't want to run
|
||||
// multiple purges at the same time (multiple replicas).
|
||||
return db.InTx(func(tx database.Store) error {
|
||||
// Acquire a lock to ensure that only one instance of the
|
||||
// purge is running at a time.
|
||||
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBPurge)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
i.logger.Debug(ctx, "unable to acquire lock for purging old database entries, skipping")
|
||||
return nil
|
||||
}
|
||||
|
||||
var purgedWorkspaceAgentLogs int64
|
||||
workspaceAgentLogsRetention := i.vals.Retention.WorkspaceAgentLogs.Value()
|
||||
if workspaceAgentLogsRetention > 0 {
|
||||
deleteOldWorkspaceAgentLogsBefore := start.Add(-workspaceAgentLogsRetention)
|
||||
purgedWorkspaceAgentLogs, err = tx.DeleteOldWorkspaceAgentLogs(ctx, deleteOldWorkspaceAgentLogsBefore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old workspace agent logs: %w", err)
|
||||
}
|
||||
}
|
||||
if err := tx.DeleteOldWorkspaceAgentStats(ctx); err != nil {
|
||||
return xerrors.Errorf("failed to delete old workspace agent stats: %w", err)
|
||||
}
|
||||
if err := tx.DeleteOldProvisionerDaemons(ctx); err != nil {
|
||||
return xerrors.Errorf("failed to delete old provisioner daemons: %w", err)
|
||||
}
|
||||
if err := tx.DeleteOldNotificationMessages(ctx); err != nil {
|
||||
return xerrors.Errorf("failed to delete old notification messages: %w", err)
|
||||
}
|
||||
if err := tx.ExpirePrebuildsAPIKeys(ctx, dbtime.Time(start)); err != nil {
|
||||
return xerrors.Errorf("failed to expire prebuilds user api keys: %w", err)
|
||||
}
|
||||
|
||||
var expiredAPIKeys int64
|
||||
apiKeysRetention := i.vals.Retention.APIKeys.Value()
|
||||
if apiKeysRetention > 0 {
|
||||
// Delete keys that have been expired for at least the retention period.
|
||||
// A higher retention period allows the backend to return a more helpful
|
||||
// error message when a user tries to use an expired key.
|
||||
deleteExpiredKeysBefore := start.Add(-apiKeysRetention)
|
||||
expiredAPIKeys, err = tx.DeleteExpiredAPIKeys(ctx, database.DeleteExpiredAPIKeysParams{
|
||||
Before: dbtime.Time(deleteExpiredKeysBefore),
|
||||
// There could be a lot of expired keys here, so set a limit to prevent
|
||||
// this taking too long. This runs every 10 minutes, so it deletes
|
||||
// ~1.5m keys per day at most.
|
||||
LimitCount: 10000,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete expired api keys: %w", err)
|
||||
}
|
||||
}
|
||||
deleteOldTelemetryLocksBefore := start.Add(-maxTelemetryHeartbeatAge)
|
||||
if err := tx.DeleteOldTelemetryLocks(ctx, deleteOldTelemetryLocksBefore); err != nil {
|
||||
return xerrors.Errorf("failed to delete old telemetry locks: %w", err)
|
||||
}
|
||||
|
||||
deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge)
|
||||
if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{
|
||||
BeforeTime: deleteOldAuditLogConnectionEventsBefore,
|
||||
LimitCount: auditLogConnectionEventBatchSize,
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("failed to delete old audit log connection events: %w", err)
|
||||
}
|
||||
|
||||
var purgedAIBridgeRecords int64
|
||||
aibridgeRetention := i.vals.AI.BridgeConfig.Retention.Value()
|
||||
if aibridgeRetention > 0 {
|
||||
deleteAIBridgeRecordsBefore := start.Add(-aibridgeRetention)
|
||||
// nolint:gocritic // Needs to run as aibridge context.
|
||||
purgedAIBridgeRecords, err = tx.DeleteOldAIBridgeRecords(dbauthz.AsAIBridged(ctx), deleteAIBridgeRecordsBefore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old aibridge records: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var purgedConnectionLogs int64
|
||||
connectionLogsRetention := i.vals.Retention.ConnectionLogs.Value()
|
||||
if connectionLogsRetention > 0 {
|
||||
deleteConnectionLogsBefore := start.Add(-connectionLogsRetention)
|
||||
purgedConnectionLogs, err = tx.DeleteOldConnectionLogs(ctx, database.DeleteOldConnectionLogsParams{
|
||||
BeforeTime: deleteConnectionLogsBefore,
|
||||
LimitCount: connectionLogsBatchSize,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old connection logs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var purgedAuditLogs int64
|
||||
auditLogsRetention := i.vals.Retention.AuditLogs.Value()
|
||||
if auditLogsRetention > 0 {
|
||||
deleteAuditLogsBefore := start.Add(-auditLogsRetention)
|
||||
purgedAuditLogs, err = tx.DeleteOldAuditLogs(ctx, database.DeleteOldAuditLogsParams{
|
||||
BeforeTime: deleteAuditLogsBefore,
|
||||
LimitCount: auditLogsBatchSize,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to delete old audit logs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
i.logger.Debug(ctx, "purged old database entries",
|
||||
slog.F("workspace_agent_logs", purgedWorkspaceAgentLogs),
|
||||
slog.F("expired_api_keys", expiredAPIKeys),
|
||||
slog.F("aibridge_records", purgedAIBridgeRecords),
|
||||
slog.F("connection_logs", purgedConnectionLogs),
|
||||
slog.F("audit_logs", purgedAuditLogs),
|
||||
slog.F("duration", i.clk.Since(start)),
|
||||
)
|
||||
|
||||
if i.iterationDuration != nil {
|
||||
duration := i.clk.Since(start)
|
||||
i.iterationDuration.WithLabelValues("true").Observe(duration.Seconds())
|
||||
}
|
||||
if i.recordsPurged != nil {
|
||||
i.recordsPurged.WithLabelValues("workspace_agent_logs").Add(float64(purgedWorkspaceAgentLogs))
|
||||
i.recordsPurged.WithLabelValues("expired_api_keys").Add(float64(expiredAPIKeys))
|
||||
i.recordsPurged.WithLabelValues("aibridge_records").Add(float64(purgedAIBridgeRecords))
|
||||
i.recordsPurged.WithLabelValues("connection_logs").Add(float64(purgedConnectionLogs))
|
||||
i.recordsPurged.WithLabelValues("audit_logs").Add(float64(purgedAuditLogs))
|
||||
}
|
||||
|
||||
return nil
|
||||
}, database.DefaultTXOptions().WithID("db_purge"))
|
||||
}
|
||||
|
||||
type instance struct {
|
||||
cancel context.CancelFunc
|
||||
closed chan struct{}
|
||||
cancel context.CancelFunc
|
||||
closed chan struct{}
|
||||
logger slog.Logger
|
||||
vals *codersdk.DeploymentValues
|
||||
clk quartz.Clock
|
||||
iterationDuration *prometheus.HistogramVec
|
||||
recordsPurged *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func (i *instance) Close() error {
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package dbpurge
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/coderdtest"
|
||||
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
||||
"github.com/coder/coder/v2/coderd/rbac"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
"github.com/coder/quartz"
|
||||
)
|
||||
|
||||
func TestDBPurgeAuthorization(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
rawDB, _ := dbtestutil.NewDB(t)
|
||||
|
||||
authz := rbac.NewAuthorizer(prometheus.NewRegistry())
|
||||
db := dbauthz.New(rawDB, authz, testutil.Logger(t), coderdtest.AccessControlStorePointer())
|
||||
|
||||
ctx = dbauthz.AsDBPurge(ctx)
|
||||
|
||||
clk := quartz.NewMock(t)
|
||||
now := time.Date(2025, 1, 15, 7, 30, 0, 0, time.UTC)
|
||||
clk.Set(now)
|
||||
|
||||
vals := &codersdk.DeploymentValues{ /* same vals as before */ }
|
||||
|
||||
inst := &instance{
|
||||
logger: testutil.Logger(t),
|
||||
vals: vals,
|
||||
clk: clk,
|
||||
// metrics can be nil in this test
|
||||
}
|
||||
|
||||
err := inst.purgeTick(ctx, db, now)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
@@ -21,10 +21,8 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"cdr.dev/slog/v3/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/coderd/coderdtest"
|
||||
"github.com/coder/coder/v2/coderd/coderdtest/promhelp"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
||||
"github.com/coder/coder/v2/coderd/database/dbgen"
|
||||
"github.com/coder/coder/v2/coderd/database/dbmock"
|
||||
"github.com/coder/coder/v2/coderd/database/dbpurge"
|
||||
@@ -32,7 +30,6 @@ import (
|
||||
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/provisionerdserver"
|
||||
"github.com/coder/coder/v2/coderd/rbac"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/provisionerd/proto"
|
||||
"github.com/coder/coder/v2/provisionersdk"
|
||||
@@ -1633,62 +1630,6 @@ func TestDeleteExpiredAPIKeys(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDBPurgeAuthorization(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("DBPurgeActorCanCallPurgeOperations", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
rawDB, _ := dbtestutil.NewDB(t)
|
||||
|
||||
authz := rbac.NewAuthorizer(prometheus.NewRegistry())
|
||||
db := dbauthz.New(rawDB, authz, testutil.Logger(t), coderdtest.AccessControlStorePointer())
|
||||
|
||||
ctx = dbauthz.AsDBPurge(ctx)
|
||||
|
||||
actor, ok := dbauthz.ActorFromContext(ctx)
|
||||
require.True(t, ok, "actor should be present")
|
||||
require.Equal(t, rbac.SubjectTypeDBPurge, actor.Type, "should be DBPurge type")
|
||||
require.Contains(t, actor.Roles.Names(), rbac.RoleIdentifier{Name: "dbpurge"},
|
||||
"should have dbpurge role")
|
||||
|
||||
_, err := db.DeleteOldWorkspaceAgentLogs(ctx, time.Now().Add(-24*time.Hour))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.DeleteOldWorkspaceAgentStats(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.DeleteOldProvisionerDaemons(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.DeleteOldNotificationMessages(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.ExpirePrebuildsAPIKeys(ctx, time.Now().Add(-24*time.Hour))
|
||||
require.NoError(t, err)
|
||||
|
||||
params := database.DeleteExpiredAPIKeysParams{
|
||||
Before: time.Now().Add(-24 * time.Hour),
|
||||
LimitCount: 100,
|
||||
}
|
||||
_, err = db.DeleteExpiredAPIKeys(ctx, params)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{
|
||||
BeforeTime: time.Now().Add(-24 * time.Hour),
|
||||
LimitCount: 100,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = db.DeleteOldAuditLogs(ctx, database.DeleteOldAuditLogsParams{
|
||||
BeforeTime: time.Now().Add(-24 * time.Hour),
|
||||
LimitCount: 100,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// ptr is a helper to create a pointer to a value.
|
||||
func ptr[T any](v T) *T {
|
||||
return &v
|
||||
|
||||
Reference in New Issue
Block a user