package usage import ( "context" "math/rand" "sync" "sync/atomic" "time" "golang.org/x/xerrors" "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/pproflabel" agplusage "github.com/coder/coder/v2/coderd/usage" "github.com/coder/coder/v2/coderd/usage/usagetypes" "github.com/coder/quartz" ) // epoch is a fixed reference point for aligning interval boundaries. // All replicas use this same epoch so their buckets are identical. var epoch = time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) const ( cronDateFormat = "2006-01-02_15:04:05" ) // HeartbeatFunc generates a heartbeat event and its stable ID. // It is called periodically by the cron. Returning an error skips // the insert for that tick and logs a warning. type HeartbeatFunc func(ctx context.Context) (event usagetypes.HeartbeatEvent, err error) // CronJob defines a periodic heartbeat job. type CronJob struct { // Name is a human-readable label used in logs. Name string // Interval is the base duration between ticks. Interval time.Duration // EventType must match the events generated by the Fn. EventType usagetypes.UsageEventType // Jitter is the maximum random delay added after the boundary. // The actual offset is uniformly distributed in [0, Jitter). // This staggers replicas so one is likely to complete the work // before others attempt it, allowing them to skip via the // existence check (heartbeat inserts are idempotent). Jitter time.Duration // Fn produces the heartbeat event. Fn HeartbeatFunc } // Cron runs registered CronJobs on the dbInserter's clock. Stopping // the context passed to Start cancels all jobs. Daemon restarts // naturally restart the timers since Start() creates them fresh — // there is no state to persist or recover. type Cron struct { clock quartz.Clock log slog.Logger db database.Store ins agplusage.Inserter jobs []CronJob // cancel cancels the context on all running jobs. If the ctx passed into `Start` // is canceled, the jobs will also stop. cancel context.CancelFunc // wg ensures all job goroutines have exited before Close returns. wg sync.WaitGroup // startOnce ensures Start is idempotent. startOnce sync.Once started atomic.Bool } // NewCron creates a Cron that periodically generates and inserts // heartbeat events. The clock controls all timers so that tests can // advance time deterministically via quartz.Mock. func NewCron(clock quartz.Clock, log slog.Logger, db database.Store, ins agplusage.Inserter) *Cron { return &Cron{ clock: clock, log: log, db: db, ins: ins, } } // Register adds a job. It must be called before Start; calling it // after Start returns an error. func (c *Cron) Register(job CronJob) error { if !job.EventType.IsHeartbeat() { return xerrors.New("event type must be a heartbeat type") } if c.started.Load() { return xerrors.New("cannot register a job after Start has been called") } c.jobs = append(c.jobs, job) return nil } // Start launches a goroutine per job. Subsequent calls are no-ops. // On daemon restart a new Cron should be created. func (c *Cron) Start(ctx context.Context) { c.startOnce.Do(func() { c.started.Store(true) ctx, c.cancel = context.WithCancel(ctx) for _, job := range c.jobs { c.wg.Add(1) pproflabel.Go(ctx, pproflabel.Service(pproflabel.ServiceUsageEventCron, "job", job.Name), func(ctx context.Context) { c.run(ctx, job) }) } }) } // Close cancels all jobs and waits for goroutines to exit. func (c *Cron) Close() error { if c.cancel != nil { c.cancel() } c.wg.Wait() return nil } func (c *Cron) run(ctx context.Context, job CronJob) { //nolint:gocritic // We are a publisher in this function ctx = dbauthz.AsUsagePublisher(ctx) defer c.wg.Done() for { boundary, delay := nextTick(c.clock.Now(), job.Interval, job.Jitter) // Use a quartz timer so the wait honors ctx cancellation and // tests can advance time deterministically. timer := c.clock.NewTimer(delay, job.Name) select { case <-ctx.Done(): if !timer.Stop() { // Drain the channel if the timer already fired. <-timer.C } return case <-timer.C: } // Use the boundary (not wall-clock "now") for the stable ID // so all replicas targeting the same boundary produce the // same key. stableID := string(job.EventType) + ":" + boundary.UTC().Format(cronDateFormat) // Skip if this bucket was already recorded — avoids running // the potentially expensive heartbeat function for a // duplicate. exists, err := c.db.UsageEventExistsByID(ctx, stableID) if err != nil { c.log.Warn(ctx, "cron heartbeat existence check failed", slog.F("job", job.Name), slog.Error(err), ) continue } if exists { c.log.Debug(ctx, "cron heartbeat already recorded, skipping", slog.F("job", job.Name), slog.F("id", stableID), ) continue } event, err := job.Fn(ctx) if err != nil { c.log.Error(ctx, "cron heartbeat func failed", slog.F("job", job.Name), slog.Error(err), ) continue } if event.EventType() != job.EventType { c.log.Error(ctx, "cron heartbeat func returned wrong event type", slog.F("job", job.Name), slog.F("expected", job.EventType), slog.F("actual", event.EventType()), ) continue } if err := c.ins.InsertHeartbeatUsageEvent(ctx, c.db, stableID, event); err != nil { c.log.Warn(ctx, "cron heartbeat insert failed", slog.F("job", job.Name), slog.Error(err), ) } } } // nextTick computes the delay until the next epoch-aligned boundary // for the given interval, plus a random jitter in [0, jitter). It // returns the target boundary and the total delay from now. func nextTick(now time.Time, interval, jitter time.Duration) (boundary time.Time, delay time.Duration) { boundary = nextBoundary(now, interval) delay = boundary.Sub(now) if jitter > 0 { //nolint:gosec // Jitter does not need cryptographic randomness. delay += time.Duration(rand.Int63n(int64(jitter))) } return boundary, delay } // nextBoundary returns the first multiple of interval (relative to // epoch) that is strictly after t. func nextBoundary(t time.Time, interval time.Duration) time.Time { since := t.Sub(epoch) n := since / interval return epoch.Add((n + 1) * interval) }