diff --git a/enterprise/coderd/connectionlog/connectionlog.go b/enterprise/coderd/connectionlog/connectionlog.go index 9cd36e6505..6668373f1b 100644 --- a/enterprise/coderd/connectionlog/connectionlog.go +++ b/enterprise/coderd/connectionlog/connectionlog.go @@ -468,9 +468,10 @@ func (b *DBBatcher) retryLoop() { func (b *DBBatcher) retryBatch(params database.BatchUpsertConnectionLogsParams) { count := len(params.ID) for attempt := range maxRetries { - t := time.NewTimer(retryInterval) + t := b.clock.NewTimer(retryInterval, "connectionLogBatcher", "retryBackoff") select { case <-b.ctx.Done(): + t.Stop() b.shutdownBatch(params) return case <-t.C: diff --git a/enterprise/coderd/connectionlog/connectionlog_internal_test.go b/enterprise/coderd/connectionlog/connectionlog_internal_test.go index 5804e5ae3f..2e165451ba 100644 --- a/enterprise/coderd/connectionlog/connectionlog_internal_test.go +++ b/enterprise/coderd/connectionlog/connectionlog_internal_test.go @@ -355,15 +355,20 @@ func Test_batcherFlush(t *testing.T) { store := dbmock.NewMockStore(ctrl) clock := quartz.NewMock(t) - scheduledTrap := clock.Trap().TimerReset("connectionLogBatcher", "scheduledFlush") - defer scheduledTrap.Close() + // Trap the capacity flush (fires when batch reaches maxBatchSize). + capacityTrap := clock.Trap().TimerReset("connectionLogBatcher", "capacityFlush") + defer capacityTrap.Close() - b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(100)) + // Trap the retry backoff timer created by retryBatch. + retryTrap := clock.Trap().NewTimer("connectionLogBatcher", "retryBackoff") + defer retryTrap.Close() + + // Batch size of 1: consuming the item triggers an immediate + // capacity flush, avoiding the timer/itemCh select race. + b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(1)) evt := fakeConnectEvent(uuid.New(), "agent1", uuid.New()) - // First call (synchronous in flush) fails, then the - // retry worker retries after the backoff and succeeds. gomock.InOrder( store.EXPECT(). BatchUpsertConnectionLogs(gomock.Any(), gomock.Any()). @@ -380,14 +385,15 @@ func Test_batcherFlush(t *testing.T) { require.NoError(t, b.Upsert(ctx, evt)) - // Trigger a scheduled flush while the batcher is still - // running. The synchronous write fails and queues to - // retryCh. The retry worker picks it up after a real- - // time 1s delay and succeeds. - clock.Advance(defaultFlushInterval).MustWait(ctx) - scheduledTrap.MustWait(ctx).MustRelease(ctx) + // Item consumed → capacity flush fires → transient error → + // batch queued to retryCh → timer reset trapped. + capacityTrap.MustWait(ctx).MustRelease(ctx) + + // Retry worker creates a timer — trap it, release, advance. + retryCall := retryTrap.MustWait(ctx) + retryCall.MustRelease(ctx) + clock.Advance(retryInterval).MustWait(ctx) - // Wait for the retry to complete (real-time 1s delay). require.NoError(t, b.Close()) }) @@ -400,10 +406,10 @@ func Test_batcherFlush(t *testing.T) { store := dbmock.NewMockStore(ctrl) clock := quartz.NewMock(t) - scheduledTrap := clock.Trap().TimerReset("connectionLogBatcher", "scheduledFlush") - defer scheduledTrap.Close() + capacityTrap := clock.Trap().TimerReset("connectionLogBatcher", "capacityFlush") + defer capacityTrap.Close() - b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(100)) + b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(1)) evt := fakeConnectEvent(uuid.New(), "agent1", uuid.New()) @@ -428,10 +434,9 @@ func Test_batcherFlush(t *testing.T) { }). AnyTimes() - // Send event and trigger flush — fails, queues. + // Send event — capacity flush triggers immediately. require.NoError(t, b.Upsert(ctx, evt)) - clock.Advance(defaultFlushInterval).MustWait(ctx) - scheduledTrap.MustWait(ctx).MustRelease(ctx) + capacityTrap.MustWait(ctx).MustRelease(ctx) // Close triggers shutdown. The retry worker drains // retryCh and writes the batch via writeBatch.