diff --git a/coderd/database/dbpurge/dbpurge.go b/coderd/database/dbpurge/dbpurge.go index e650aefbc8..ba3df7236c 100644 --- a/coderd/database/dbpurge/dbpurge.go +++ b/coderd/database/dbpurge/dbpurge.go @@ -64,140 +64,27 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder }, []string{"record_type"}) reg.MustRegister(recordsPurged) + inst := &instance{ + cancel: cancelFunc, + closed: closed, + logger: logger, + vals: vals, + clk: clk, + iterationDuration: iterationDuration, + recordsPurged: recordsPurged, + } + // Start the ticker with the initial delay. ticker := clk.NewTicker(delay) doTick := func(ctx context.Context, start time.Time) { defer ticker.Reset(delay) - // Start a transaction to grab advisory lock, we don't want to run - // multiple purges at the same time (multiple replicas). - if err := db.InTx(func(tx database.Store) error { - // Acquire a lock to ensure that only one instance of the - // purge is running at a time. - ok, err := tx.TryAcquireLock(ctx, database.LockIDDBPurge) - if err != nil { - return err - } - if !ok { - logger.Debug(ctx, "unable to acquire lock for purging old database entries, skipping") - return nil - } - - var purgedWorkspaceAgentLogs int64 - workspaceAgentLogsRetention := vals.Retention.WorkspaceAgentLogs.Value() - if workspaceAgentLogsRetention > 0 { - deleteOldWorkspaceAgentLogsBefore := start.Add(-workspaceAgentLogsRetention) - purgedWorkspaceAgentLogs, err = tx.DeleteOldWorkspaceAgentLogs(ctx, deleteOldWorkspaceAgentLogsBefore) - if err != nil { - return xerrors.Errorf("failed to delete old workspace agent logs: %w", err) - } - } - if err := tx.DeleteOldWorkspaceAgentStats(ctx); err != nil { - return xerrors.Errorf("failed to delete old workspace agent stats: %w", err) - } - if err := tx.DeleteOldProvisionerDaemons(ctx); err != nil { - return xerrors.Errorf("failed to delete old provisioner daemons: %w", err) - } - if err := tx.DeleteOldNotificationMessages(ctx); err != nil { - return xerrors.Errorf("failed to delete old notification messages: %w", err) - } - if err := tx.ExpirePrebuildsAPIKeys(ctx, dbtime.Time(start)); err != nil { - return xerrors.Errorf("failed to expire prebuilds user api keys: %w", err) - } - - var expiredAPIKeys int64 - apiKeysRetention := vals.Retention.APIKeys.Value() - if apiKeysRetention > 0 { - // Delete keys that have been expired for at least the retention period. - // A higher retention period allows the backend to return a more helpful - // error message when a user tries to use an expired key. - deleteExpiredKeysBefore := start.Add(-apiKeysRetention) - expiredAPIKeys, err = tx.DeleteExpiredAPIKeys(ctx, database.DeleteExpiredAPIKeysParams{ - Before: dbtime.Time(deleteExpiredKeysBefore), - // There could be a lot of expired keys here, so set a limit to prevent - // this taking too long. This runs every 10 minutes, so it deletes - // ~1.5m keys per day at most. - LimitCount: 10000, - }) - if err != nil { - return xerrors.Errorf("failed to delete expired api keys: %w", err) - } - } - deleteOldTelemetryLocksBefore := start.Add(-maxTelemetryHeartbeatAge) - if err := tx.DeleteOldTelemetryLocks(ctx, deleteOldTelemetryLocksBefore); err != nil { - return xerrors.Errorf("failed to delete old telemetry locks: %w", err) - } - - deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge) - if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{ - BeforeTime: deleteOldAuditLogConnectionEventsBefore, - LimitCount: auditLogConnectionEventBatchSize, - }); err != nil { - return xerrors.Errorf("failed to delete old audit log connection events: %w", err) - } - - var purgedAIBridgeRecords int64 - aibridgeRetention := vals.AI.BridgeConfig.Retention.Value() - if aibridgeRetention > 0 { - deleteAIBridgeRecordsBefore := start.Add(-aibridgeRetention) - // nolint:gocritic // Needs to run as aibridge context. - purgedAIBridgeRecords, err = tx.DeleteOldAIBridgeRecords(dbauthz.AsAIBridged(ctx), deleteAIBridgeRecordsBefore) - if err != nil { - return xerrors.Errorf("failed to delete old aibridge records: %w", err) - } - } - - var purgedConnectionLogs int64 - connectionLogsRetention := vals.Retention.ConnectionLogs.Value() - if connectionLogsRetention > 0 { - deleteConnectionLogsBefore := start.Add(-connectionLogsRetention) - purgedConnectionLogs, err = tx.DeleteOldConnectionLogs(ctx, database.DeleteOldConnectionLogsParams{ - BeforeTime: deleteConnectionLogsBefore, - LimitCount: connectionLogsBatchSize, - }) - if err != nil { - return xerrors.Errorf("failed to delete old connection logs: %w", err) - } - } - - var purgedAuditLogs int64 - auditLogsRetention := vals.Retention.AuditLogs.Value() - if auditLogsRetention > 0 { - deleteAuditLogsBefore := start.Add(-auditLogsRetention) - purgedAuditLogs, err = tx.DeleteOldAuditLogs(ctx, database.DeleteOldAuditLogsParams{ - BeforeTime: deleteAuditLogsBefore, - LimitCount: auditLogsBatchSize, - }) - if err != nil { - return xerrors.Errorf("failed to delete old audit logs: %w", err) - } - } - - logger.Debug(ctx, "purged old database entries", - slog.F("workspace_agent_logs", purgedWorkspaceAgentLogs), - slog.F("expired_api_keys", expiredAPIKeys), - slog.F("aibridge_records", purgedAIBridgeRecords), - slog.F("connection_logs", purgedConnectionLogs), - slog.F("audit_logs", purgedAuditLogs), - slog.F("duration", clk.Since(start)), - ) - - duration := clk.Since(start) - iterationDuration.WithLabelValues("true").Observe(duration.Seconds()) - recordsPurged.WithLabelValues("workspace_agent_logs").Add(float64(purgedWorkspaceAgentLogs)) - recordsPurged.WithLabelValues("expired_api_keys").Add(float64(expiredAPIKeys)) - recordsPurged.WithLabelValues("aibridge_records").Add(float64(purgedAIBridgeRecords)) - recordsPurged.WithLabelValues("connection_logs").Add(float64(purgedConnectionLogs)) - recordsPurged.WithLabelValues("audit_logs").Add(float64(purgedAuditLogs)) - - return nil - }, database.DefaultTXOptions().WithID("db_purge")); err != nil { + err := inst.purgeTick(ctx, db, start) + if err != nil { logger.Error(ctx, "failed to purge old database entries", slog.Error(err)) // Record metrics for failed purge iteration. duration := clk.Since(start) iterationDuration.WithLabelValues("false").Observe(duration.Seconds()) - - return } } @@ -216,15 +103,149 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder } } }) - return &instance{ - cancel: cancelFunc, - closed: closed, - } + return inst +} + +// purgeTick performs a single purge iteration. It returns an error if the +// purge fails. +func (i *instance) purgeTick(ctx context.Context, db database.Store, start time.Time) error { + // Start a transaction to grab advisory lock, we don't want to run + // multiple purges at the same time (multiple replicas). + return db.InTx(func(tx database.Store) error { + // Acquire a lock to ensure that only one instance of the + // purge is running at a time. + ok, err := tx.TryAcquireLock(ctx, database.LockIDDBPurge) + if err != nil { + return err + } + if !ok { + i.logger.Debug(ctx, "unable to acquire lock for purging old database entries, skipping") + return nil + } + + var purgedWorkspaceAgentLogs int64 + workspaceAgentLogsRetention := i.vals.Retention.WorkspaceAgentLogs.Value() + if workspaceAgentLogsRetention > 0 { + deleteOldWorkspaceAgentLogsBefore := start.Add(-workspaceAgentLogsRetention) + purgedWorkspaceAgentLogs, err = tx.DeleteOldWorkspaceAgentLogs(ctx, deleteOldWorkspaceAgentLogsBefore) + if err != nil { + return xerrors.Errorf("failed to delete old workspace agent logs: %w", err) + } + } + if err := tx.DeleteOldWorkspaceAgentStats(ctx); err != nil { + return xerrors.Errorf("failed to delete old workspace agent stats: %w", err) + } + if err := tx.DeleteOldProvisionerDaemons(ctx); err != nil { + return xerrors.Errorf("failed to delete old provisioner daemons: %w", err) + } + if err := tx.DeleteOldNotificationMessages(ctx); err != nil { + return xerrors.Errorf("failed to delete old notification messages: %w", err) + } + if err := tx.ExpirePrebuildsAPIKeys(ctx, dbtime.Time(start)); err != nil { + return xerrors.Errorf("failed to expire prebuilds user api keys: %w", err) + } + + var expiredAPIKeys int64 + apiKeysRetention := i.vals.Retention.APIKeys.Value() + if apiKeysRetention > 0 { + // Delete keys that have been expired for at least the retention period. + // A higher retention period allows the backend to return a more helpful + // error message when a user tries to use an expired key. + deleteExpiredKeysBefore := start.Add(-apiKeysRetention) + expiredAPIKeys, err = tx.DeleteExpiredAPIKeys(ctx, database.DeleteExpiredAPIKeysParams{ + Before: dbtime.Time(deleteExpiredKeysBefore), + // There could be a lot of expired keys here, so set a limit to prevent + // this taking too long. This runs every 10 minutes, so it deletes + // ~1.5m keys per day at most. + LimitCount: 10000, + }) + if err != nil { + return xerrors.Errorf("failed to delete expired api keys: %w", err) + } + } + deleteOldTelemetryLocksBefore := start.Add(-maxTelemetryHeartbeatAge) + if err := tx.DeleteOldTelemetryLocks(ctx, deleteOldTelemetryLocksBefore); err != nil { + return xerrors.Errorf("failed to delete old telemetry locks: %w", err) + } + + deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge) + if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{ + BeforeTime: deleteOldAuditLogConnectionEventsBefore, + LimitCount: auditLogConnectionEventBatchSize, + }); err != nil { + return xerrors.Errorf("failed to delete old audit log connection events: %w", err) + } + + var purgedAIBridgeRecords int64 + aibridgeRetention := i.vals.AI.BridgeConfig.Retention.Value() + if aibridgeRetention > 0 { + deleteAIBridgeRecordsBefore := start.Add(-aibridgeRetention) + // nolint:gocritic // Needs to run as aibridge context. + purgedAIBridgeRecords, err = tx.DeleteOldAIBridgeRecords(dbauthz.AsAIBridged(ctx), deleteAIBridgeRecordsBefore) + if err != nil { + return xerrors.Errorf("failed to delete old aibridge records: %w", err) + } + } + + var purgedConnectionLogs int64 + connectionLogsRetention := i.vals.Retention.ConnectionLogs.Value() + if connectionLogsRetention > 0 { + deleteConnectionLogsBefore := start.Add(-connectionLogsRetention) + purgedConnectionLogs, err = tx.DeleteOldConnectionLogs(ctx, database.DeleteOldConnectionLogsParams{ + BeforeTime: deleteConnectionLogsBefore, + LimitCount: connectionLogsBatchSize, + }) + if err != nil { + return xerrors.Errorf("failed to delete old connection logs: %w", err) + } + } + + var purgedAuditLogs int64 + auditLogsRetention := i.vals.Retention.AuditLogs.Value() + if auditLogsRetention > 0 { + deleteAuditLogsBefore := start.Add(-auditLogsRetention) + purgedAuditLogs, err = tx.DeleteOldAuditLogs(ctx, database.DeleteOldAuditLogsParams{ + BeforeTime: deleteAuditLogsBefore, + LimitCount: auditLogsBatchSize, + }) + if err != nil { + return xerrors.Errorf("failed to delete old audit logs: %w", err) + } + } + + i.logger.Debug(ctx, "purged old database entries", + slog.F("workspace_agent_logs", purgedWorkspaceAgentLogs), + slog.F("expired_api_keys", expiredAPIKeys), + slog.F("aibridge_records", purgedAIBridgeRecords), + slog.F("connection_logs", purgedConnectionLogs), + slog.F("audit_logs", purgedAuditLogs), + slog.F("duration", i.clk.Since(start)), + ) + + if i.iterationDuration != nil { + duration := i.clk.Since(start) + i.iterationDuration.WithLabelValues("true").Observe(duration.Seconds()) + } + if i.recordsPurged != nil { + i.recordsPurged.WithLabelValues("workspace_agent_logs").Add(float64(purgedWorkspaceAgentLogs)) + i.recordsPurged.WithLabelValues("expired_api_keys").Add(float64(expiredAPIKeys)) + i.recordsPurged.WithLabelValues("aibridge_records").Add(float64(purgedAIBridgeRecords)) + i.recordsPurged.WithLabelValues("connection_logs").Add(float64(purgedConnectionLogs)) + i.recordsPurged.WithLabelValues("audit_logs").Add(float64(purgedAuditLogs)) + } + + return nil + }, database.DefaultTXOptions().WithID("db_purge")) } type instance struct { - cancel context.CancelFunc - closed chan struct{} + cancel context.CancelFunc + closed chan struct{} + logger slog.Logger + vals *codersdk.DeploymentValues + clk quartz.Clock + iterationDuration *prometheus.HistogramVec + recordsPurged *prometheus.CounterVec } func (i *instance) Close() error { diff --git a/coderd/database/dbpurge/dbpurge_internal_test.go b/coderd/database/dbpurge/dbpurge_internal_test.go new file mode 100644 index 0000000000..f49426e956 --- /dev/null +++ b/coderd/database/dbpurge/dbpurge_internal_test.go @@ -0,0 +1,45 @@ +package dbpurge + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/coderd/coderdtest" + "github.com/coder/coder/v2/coderd/database/dbauthz" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/rbac" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/testutil" + "github.com/coder/quartz" +) + +func TestDBPurgeAuthorization(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + rawDB, _ := dbtestutil.NewDB(t) + + authz := rbac.NewAuthorizer(prometheus.NewRegistry()) + db := dbauthz.New(rawDB, authz, testutil.Logger(t), coderdtest.AccessControlStorePointer()) + + ctx = dbauthz.AsDBPurge(ctx) + + clk := quartz.NewMock(t) + now := time.Date(2025, 1, 15, 7, 30, 0, 0, time.UTC) + clk.Set(now) + + vals := &codersdk.DeploymentValues{ /* same vals as before */ } + + inst := &instance{ + logger: testutil.Logger(t), + vals: vals, + clk: clk, + // metrics can be nil in this test + } + + err := inst.purgeTick(ctx, db, now) + require.NoError(t, err) +} diff --git a/coderd/database/dbpurge/dbpurge_test.go b/coderd/database/dbpurge/dbpurge_test.go index 2fd084988b..5aba49edf7 100644 --- a/coderd/database/dbpurge/dbpurge_test.go +++ b/coderd/database/dbpurge/dbpurge_test.go @@ -21,10 +21,8 @@ import ( "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/coderdtest/promhelp" "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbmock" "github.com/coder/coder/v2/coderd/database/dbpurge" @@ -32,7 +30,6 @@ import ( "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/provisionerdserver" - "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/provisionerd/proto" "github.com/coder/coder/v2/provisionersdk" @@ -1633,62 +1630,6 @@ func TestDeleteExpiredAPIKeys(t *testing.T) { } } -func TestDBPurgeAuthorization(t *testing.T) { - t.Parallel() - - t.Run("DBPurgeActorCanCallPurgeOperations", func(t *testing.T) { - t.Parallel() - - ctx := testutil.Context(t, testutil.WaitShort) - rawDB, _ := dbtestutil.NewDB(t) - - authz := rbac.NewAuthorizer(prometheus.NewRegistry()) - db := dbauthz.New(rawDB, authz, testutil.Logger(t), coderdtest.AccessControlStorePointer()) - - ctx = dbauthz.AsDBPurge(ctx) - - actor, ok := dbauthz.ActorFromContext(ctx) - require.True(t, ok, "actor should be present") - require.Equal(t, rbac.SubjectTypeDBPurge, actor.Type, "should be DBPurge type") - require.Contains(t, actor.Roles.Names(), rbac.RoleIdentifier{Name: "dbpurge"}, - "should have dbpurge role") - - _, err := db.DeleteOldWorkspaceAgentLogs(ctx, time.Now().Add(-24*time.Hour)) - require.NoError(t, err) - - err = db.DeleteOldWorkspaceAgentStats(ctx) - require.NoError(t, err) - - err = db.DeleteOldProvisionerDaemons(ctx) - require.NoError(t, err) - - err = db.DeleteOldNotificationMessages(ctx) - require.NoError(t, err) - - err = db.ExpirePrebuildsAPIKeys(ctx, time.Now().Add(-24*time.Hour)) - require.NoError(t, err) - - params := database.DeleteExpiredAPIKeysParams{ - Before: time.Now().Add(-24 * time.Hour), - LimitCount: 100, - } - _, err = db.DeleteExpiredAPIKeys(ctx, params) - require.NoError(t, err) - - err = db.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{ - BeforeTime: time.Now().Add(-24 * time.Hour), - LimitCount: 100, - }) - require.NoError(t, err) - - _, err = db.DeleteOldAuditLogs(ctx, database.DeleteOldAuditLogsParams{ - BeforeTime: time.Now().Add(-24 * time.Hour), - LimitCount: 100, - }) - require.NoError(t, err) - }) -} - // ptr is a helper to create a pointer to a value. func ptr[T any](v T) *T { return &v