Files
coder/coderd/database/dbpurge/dbpurge.go
T
Steven Masley 0a3afeddc8 chore: add more pprof labels for various go routines (#19243)
- ReplicaSync
- Notifications
- MetricsAggregator
- DBPurge
2025-08-07 20:05:32 +00:00

120 lines
3.8 KiB
Go

package dbpurge
import (
"context"
"io"
"time"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/pproflabel"
"github.com/coder/quartz"
)
const (
delay = 10 * time.Minute
maxAgentLogAge = 7 * 24 * time.Hour
// Connection events are now inserted into the `connection_logs` table.
// We'll slowly remove old connection events from the `audit_logs` table,
// but we won't touch the `connection_logs` table.
maxAuditLogConnectionEventAge = 90 * 24 * time.Hour // 90 days
auditLogConnectionEventBatchSize = 1000
)
// New creates a new periodically purging database instance.
// It is the caller's responsibility to call Close on the returned instance.
//
// This is for cleaning up old, unused resources from the database that take up space.
func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz.Clock) io.Closer {
closed := make(chan struct{})
ctx, cancelFunc := context.WithCancel(ctx)
//nolint:gocritic // The system purges old db records without user input.
ctx = dbauthz.AsSystemRestricted(ctx)
// Start the ticker with the initial delay.
ticker := clk.NewTicker(delay)
doTick := func(ctx context.Context, start time.Time) {
defer ticker.Reset(delay)
// Start a transaction to grab advisory lock, we don't want to run
// multiple purges at the same time (multiple replicas).
if err := db.InTx(func(tx database.Store) error {
// Acquire a lock to ensure that only one instance of the
// purge is running at a time.
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBPurge)
if err != nil {
return err
}
if !ok {
logger.Debug(ctx, "unable to acquire lock for purging old database entries, skipping")
return nil
}
deleteOldWorkspaceAgentLogsBefore := start.Add(-maxAgentLogAge)
if err := tx.DeleteOldWorkspaceAgentLogs(ctx, deleteOldWorkspaceAgentLogsBefore); err != nil {
return xerrors.Errorf("failed to delete old workspace agent logs: %w", err)
}
if err := tx.DeleteOldWorkspaceAgentStats(ctx); err != nil {
return xerrors.Errorf("failed to delete old workspace agent stats: %w", err)
}
if err := tx.DeleteOldProvisionerDaemons(ctx); err != nil {
return xerrors.Errorf("failed to delete old provisioner daemons: %w", err)
}
if err := tx.DeleteOldNotificationMessages(ctx); err != nil {
return xerrors.Errorf("failed to delete old notification messages: %w", err)
}
deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge)
if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{
BeforeTime: deleteOldAuditLogConnectionEventsBefore,
LimitCount: auditLogConnectionEventBatchSize,
}); err != nil {
return xerrors.Errorf("failed to delete old audit log connection events: %w", err)
}
logger.Debug(ctx, "purged old database entries", slog.F("duration", clk.Since(start)))
return nil
}, database.DefaultTXOptions().WithID("db_purge")); err != nil {
logger.Error(ctx, "failed to purge old database entries", slog.Error(err))
return
}
}
pproflabel.Go(ctx, pproflabel.Service(pproflabel.ServiceDBPurge), func(ctx context.Context) {
defer close(closed)
defer ticker.Stop()
// Force an initial tick.
doTick(ctx, dbtime.Time(clk.Now()).UTC())
for {
select {
case <-ctx.Done():
return
case tick := <-ticker.C:
ticker.Stop()
doTick(ctx, dbtime.Time(tick).UTC())
}
}
})
return &instance{
cancel: cancelFunc,
closed: closed,
}
}
type instance struct {
cancel context.CancelFunc
closed chan struct{}
}
func (i *instance) Close() error {
i.cancel()
<-i.closed
return nil
}