diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index a4d801512e..2bbd14e267 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -1749,6 +1749,13 @@ func (q *querier) DeleteOldAuditLogConnectionEvents(ctx context.Context, thresho return q.db.DeleteOldAuditLogConnectionEvents(ctx, threshold) } +func (q *querier) DeleteOldConnectionLogs(ctx context.Context, arg database.DeleteOldConnectionLogsParams) (int64, error) { + if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil { + return 0, err + } + return q.db.DeleteOldConnectionLogs(ctx, arg) +} + func (q *querier) DeleteOldNotificationMessages(ctx context.Context) error { if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceNotificationMessage); err != nil { return err diff --git a/coderd/database/dbauthz/dbauthz_test.go b/coderd/database/dbauthz/dbauthz_test.go index e70da620e1..4833498f45 100644 --- a/coderd/database/dbauthz/dbauthz_test.go +++ b/coderd/database/dbauthz/dbauthz_test.go @@ -355,6 +355,10 @@ func (s *MethodTestSuite) TestConnectionLogs() { dbm.EXPECT().CountConnectionLogs(gomock.Any(), database.CountConnectionLogsParams{}).Return(int64(0), nil).AnyTimes() check.Args(database.CountConnectionLogsParams{}, emptyPreparedAuthorized{}).Asserts(rbac.ResourceConnectionLog, policy.ActionRead) })) + s.Run("DeleteOldConnectionLogs", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) { + dbm.EXPECT().DeleteOldConnectionLogs(gomock.Any(), database.DeleteOldConnectionLogsParams{}).Return(int64(0), nil).AnyTimes() + check.Args(database.DeleteOldConnectionLogsParams{}).Asserts(rbac.ResourceSystem, policy.ActionDelete) + })) } func (s *MethodTestSuite) TestFile() { diff --git a/coderd/database/dbmetrics/querymetrics.go b/coderd/database/dbmetrics/querymetrics.go index cdad3598b3..26342761a7 100644 --- a/coderd/database/dbmetrics/querymetrics.go +++ b/coderd/database/dbmetrics/querymetrics.go @@ -410,6 +410,13 @@ func (m queryMetricsStore) DeleteOldAuditLogConnectionEvents(ctx context.Context return r0 } +func (m queryMetricsStore) DeleteOldConnectionLogs(ctx context.Context, arg database.DeleteOldConnectionLogsParams) (int64, error) { + start := time.Now() + r0, r1 := m.s.DeleteOldConnectionLogs(ctx, arg) + m.queryLatencies.WithLabelValues("DeleteOldConnectionLogs").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m queryMetricsStore) DeleteOldNotificationMessages(ctx context.Context) error { start := time.Now() r0 := m.s.DeleteOldNotificationMessages(ctx) diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index 03de5508e5..7d5714deea 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -753,6 +753,21 @@ func (mr *MockStoreMockRecorder) DeleteOldAuditLogConnectionEvents(ctx, arg any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOldAuditLogConnectionEvents", reflect.TypeOf((*MockStore)(nil).DeleteOldAuditLogConnectionEvents), ctx, arg) } +// DeleteOldConnectionLogs mocks base method. +func (m *MockStore) DeleteOldConnectionLogs(ctx context.Context, arg database.DeleteOldConnectionLogsParams) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteOldConnectionLogs", ctx, arg) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteOldConnectionLogs indicates an expected call of DeleteOldConnectionLogs. +func (mr *MockStoreMockRecorder) DeleteOldConnectionLogs(ctx, arg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOldConnectionLogs", reflect.TypeOf((*MockStore)(nil).DeleteOldConnectionLogs), ctx, arg) +} + // DeleteOldNotificationMessages mocks base method. func (m *MockStore) DeleteOldNotificationMessages(ctx context.Context) error { m.ctrl.T.Helper() diff --git a/coderd/database/dbpurge/dbpurge.go b/coderd/database/dbpurge/dbpurge.go index 0356636ff1..e9c32a5833 100644 --- a/coderd/database/dbpurge/dbpurge.go +++ b/coderd/database/dbpurge/dbpurge.go @@ -21,10 +21,12 @@ const ( delay = 10 * time.Minute maxAgentLogAge = 7 * 24 * time.Hour // Connection events are now inserted into the `connection_logs` table. - // We'll slowly remove old connection events from the `audit_logs` table, - // but we won't touch the `connection_logs` table. + // We'll slowly remove old connection events from the `audit_logs` table. + // The `connection_logs` table is purged based on the configured retention. maxAuditLogConnectionEventAge = 90 * 24 * time.Hour // 90 days auditLogConnectionEventBatchSize = 1000 + // Batch size for connection log deletion. + connectionLogsBatchSize = 10000 // Telemetry heartbeats are used to deduplicate events across replicas. We // don't need to persist heartbeat rows for longer than 24 hours, as they // are only used for deduplication across replicas. The time needs to be @@ -111,9 +113,23 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder return xerrors.Errorf("failed to delete old aibridge records: %w", err) } + var purgedConnectionLogs int64 + connectionLogsRetention := vals.Retention.ConnectionLogs.Value() + if connectionLogsRetention > 0 { + deleteConnectionLogsBefore := start.Add(-connectionLogsRetention) + purgedConnectionLogs, err = tx.DeleteOldConnectionLogs(ctx, database.DeleteOldConnectionLogsParams{ + BeforeTime: deleteConnectionLogsBefore, + LimitCount: connectionLogsBatchSize, + }) + if err != nil { + return xerrors.Errorf("failed to delete old connection logs: %w", err) + } + } + logger.Debug(ctx, "purged old database entries", slog.F("expired_api_keys", expiredAPIKeys), slog.F("aibridge_records", purgedAIBridgeRecords), + slog.F("connection_logs", purgedConnectionLogs), slog.F("duration", clk.Since(start)), ) diff --git a/coderd/database/dbpurge/dbpurge_test.go b/coderd/database/dbpurge/dbpurge_test.go index 0a4de8c922..76d981959a 100644 --- a/coderd/database/dbpurge/dbpurge_test.go +++ b/coderd/database/dbpurge/dbpurge_test.go @@ -759,6 +759,129 @@ func TestDeleteOldTelemetryHeartbeats(t *testing.T) { }, testutil.WaitShort, testutil.IntervalFast, "it should delete old telemetry heartbeats") } +func TestDeleteOldConnectionLogs(t *testing.T) { + t.Parallel() + + now := time.Date(2025, 1, 15, 7, 30, 0, 0, time.UTC) + retentionPeriod := 30 * 24 * time.Hour + afterThreshold := now.Add(-retentionPeriod).Add(-24 * time.Hour) // 31 days ago (older than threshold) + beforeThreshold := now.Add(-15 * 24 * time.Hour) // 15 days ago (newer than threshold) + + testCases := []struct { + name string + retentionConfig codersdk.RetentionConfig + oldLogTime time.Time + recentLogTime *time.Time // nil means no recent log created + expectOldDeleted bool + expectedLogsRemaining int + }{ + { + name: "RetentionEnabled", + retentionConfig: codersdk.RetentionConfig{ + ConnectionLogs: serpent.Duration(retentionPeriod), + }, + oldLogTime: afterThreshold, + recentLogTime: &beforeThreshold, + expectOldDeleted: true, + expectedLogsRemaining: 1, // only recent log remains + }, + { + name: "RetentionDisabled", + retentionConfig: codersdk.RetentionConfig{ + ConnectionLogs: serpent.Duration(0), + }, + oldLogTime: now.Add(-365 * 24 * time.Hour), // 1 year ago + recentLogTime: nil, + expectOldDeleted: false, + expectedLogsRemaining: 1, // old log is kept + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + clk := quartz.NewMock(t) + clk.Set(now).MustWait(ctx) + + db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure()) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}) + + // Setup test fixtures. + user := dbgen.User(t, db, database.User{}) + org := dbgen.Organization(t, db, database.Organization{}) + _ = dbgen.OrganizationMember(t, db, database.OrganizationMember{UserID: user.ID, OrganizationID: org.ID}) + tv := dbgen.TemplateVersion(t, db, database.TemplateVersion{OrganizationID: org.ID, CreatedBy: user.ID}) + tmpl := dbgen.Template(t, db, database.Template{OrganizationID: org.ID, ActiveVersionID: tv.ID, CreatedBy: user.ID}) + workspace := dbgen.Workspace(t, db, database.WorkspaceTable{ + OwnerID: user.ID, + OrganizationID: org.ID, + TemplateID: tmpl.ID, + }) + + // Create old connection log. + oldLog := dbgen.ConnectionLog(t, db, database.UpsertConnectionLogParams{ + ID: uuid.New(), + Time: tc.oldLogTime, + OrganizationID: org.ID, + WorkspaceOwnerID: user.ID, + WorkspaceID: workspace.ID, + WorkspaceName: workspace.Name, + AgentName: "agent1", + Type: database.ConnectionTypeSsh, + ConnectionStatus: database.ConnectionStatusConnected, + }) + + // Create recent connection log if specified. + var recentLog database.ConnectionLog + if tc.recentLogTime != nil { + recentLog = dbgen.ConnectionLog(t, db, database.UpsertConnectionLogParams{ + ID: uuid.New(), + Time: *tc.recentLogTime, + OrganizationID: org.ID, + WorkspaceOwnerID: user.ID, + WorkspaceID: workspace.ID, + WorkspaceName: workspace.Name, + AgentName: "agent2", + Type: database.ConnectionTypeSsh, + ConnectionStatus: database.ConnectionStatusConnected, + }) + } + + // Run the purge. + done := awaitDoTick(ctx, t, clk) + closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{ + Retention: tc.retentionConfig, + }, clk) + defer closer.Close() + testutil.TryReceive(ctx, t, done) + + // Verify results. + logs, err := db.GetConnectionLogsOffset(ctx, database.GetConnectionLogsOffsetParams{ + LimitOpt: 100, + }) + require.NoError(t, err) + require.Len(t, logs, tc.expectedLogsRemaining, "unexpected number of logs remaining") + + logIDs := make([]uuid.UUID, len(logs)) + for i, log := range logs { + logIDs[i] = log.ConnectionLog.ID + } + + if tc.expectOldDeleted { + require.NotContains(t, logIDs, oldLog.ID, "old connection log should be deleted") + } else { + require.Contains(t, logIDs, oldLog.ID, "old connection log should NOT be deleted") + } + + if tc.recentLogTime != nil { + require.Contains(t, logIDs, recentLog.ID, "recent connection log should be kept") + } + }) + } +} + func TestDeleteOldAIBridgeRecords(t *testing.T) { t.Parallel() diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 7997d7f085..1ec13cdf4d 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -106,6 +106,7 @@ type sqlcQuerier interface { // Cumulative count. DeleteOldAIBridgeRecords(ctx context.Context, beforeTime time.Time) (int32, error) DeleteOldAuditLogConnectionEvents(ctx context.Context, arg DeleteOldAuditLogConnectionEventsParams) error + DeleteOldConnectionLogs(ctx context.Context, arg DeleteOldConnectionLogsParams) (int64, error) // Delete all notification messages which have not been updated for over a week. DeleteOldNotificationMessages(ctx context.Context) error // Delete provisioner daemons that have been created at least a week ago diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 329a23dd70..2de5ae3a2c 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -2095,6 +2095,32 @@ func (q *sqlQuerier) CountConnectionLogs(ctx context.Context, arg CountConnectio return count, err } +const deleteOldConnectionLogs = `-- name: DeleteOldConnectionLogs :execrows +WITH old_logs AS ( + SELECT id + FROM connection_logs + WHERE connect_time < $1::timestamp with time zone + ORDER BY connect_time ASC + LIMIT $2 +) +DELETE FROM connection_logs +USING old_logs +WHERE connection_logs.id = old_logs.id +` + +type DeleteOldConnectionLogsParams struct { + BeforeTime time.Time `db:"before_time" json:"before_time"` + LimitCount int32 `db:"limit_count" json:"limit_count"` +} + +func (q *sqlQuerier) DeleteOldConnectionLogs(ctx context.Context, arg DeleteOldConnectionLogsParams) (int64, error) { + result, err := q.db.ExecContext(ctx, deleteOldConnectionLogs, arg.BeforeTime, arg.LimitCount) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + const getConnectionLogsOffset = `-- name: GetConnectionLogsOffset :many SELECT connection_logs.id, connection_logs.connect_time, connection_logs.organization_id, connection_logs.workspace_owner_id, connection_logs.workspace_id, connection_logs.workspace_name, connection_logs.agent_name, connection_logs.type, connection_logs.ip, connection_logs.code, connection_logs.user_agent, connection_logs.user_id, connection_logs.slug_or_port, connection_logs.connection_id, connection_logs.disconnect_time, connection_logs.disconnect_reason, diff --git a/coderd/database/queries/connectionlogs.sql b/coderd/database/queries/connectionlogs.sql index eb2d1b0cb1..fc38d1af1a 100644 --- a/coderd/database/queries/connectionlogs.sql +++ b/coderd/database/queries/connectionlogs.sql @@ -239,6 +239,18 @@ WHERE -- @authorize_filter ; +-- name: DeleteOldConnectionLogs :execrows +WITH old_logs AS ( + SELECT id + FROM connection_logs + WHERE connect_time < @before_time::timestamp with time zone + ORDER BY connect_time ASC + LIMIT @limit_count +) +DELETE FROM connection_logs +USING old_logs +WHERE connection_logs.id = old_logs.id; + -- name: UpsertConnectionLog :one INSERT INTO connection_logs ( id,