From 0401ed3af58e450041ef930911d5b598ac7c18d6 Mon Sep 17 00:00:00 2001 From: dylanhuff-at-coder Date: Fri, 29 May 2026 14:02:13 -0400 Subject: [PATCH] fix(coderd/notifications): serialize pending updates gauge writes (#25495) Fixes a race where concurrent notification dispatch goroutines could overwrite `coderd_notifications_pending_updates` with an older buffer-length snapshot. Pending update snapshots now serialize count evaluation with the gauge write, and inhibited dispatch results refresh the metric when buffered. --- coderd/notifications/manager.go | 6 +- coderd/notifications/metrics.go | 36 ++++++-- coderd/notifications/metrics_internal_test.go | 85 +++++++++++++++++++ coderd/notifications/metrics_test.go | 21 +++-- coderd/notifications/notifier.go | 5 +- 5 files changed, 135 insertions(+), 18 deletions(-) create mode 100644 coderd/notifications/metrics_internal_test.go diff --git a/coderd/notifications/manager.go b/coderd/notifications/manager.go index f65fc3ff7f..4d44563fce 100644 --- a/coderd/notifications/manager.go +++ b/coderd/notifications/manager.go @@ -237,9 +237,7 @@ func (m *Manager) BufferedUpdatesCount() (success int, failure int) { // syncUpdates updates messages in the store based on the given successful and failed message dispatch results. func (m *Manager) syncUpdates(ctx context.Context) { // Ensure we update the metrics to reflect the current state after each invocation. - defer func() { - m.metrics.PendingUpdates.Set(float64(len(m.success) + len(m.failure))) - }() + defer m.metrics.pendingUpdatesGauge.set(func() int { return len(m.success) + len(m.failure) }) select { case <-ctx.Done(): @@ -250,7 +248,7 @@ func (m *Manager) syncUpdates(ctx context.Context) { nSuccess := len(m.success) nFailure := len(m.failure) - m.metrics.PendingUpdates.Set(float64(nSuccess + nFailure)) + m.metrics.pendingUpdatesGauge.set(func() int { return len(m.success) + len(m.failure) }) // Nothing to do. if nSuccess+nFailure == 0 { diff --git a/coderd/notifications/metrics.go b/coderd/notifications/metrics.go index 204bc260c7..69a262bb47 100644 --- a/coderd/notifications/metrics.go +++ b/coderd/notifications/metrics.go @@ -3,6 +3,7 @@ package notifications import ( "fmt" "strings" + "sync" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,8 +18,28 @@ type Metrics struct { InflightDispatches *prometheus.GaugeVec DispatcherSendSeconds *prometheus.HistogramVec - PendingUpdates prometheus.Gauge + PendingUpdates prometheus.Collector SyncedUpdates prometheus.Counter + + pendingUpdatesGauge *pendingUpdatesGauge +} + +// pendingUpdatesGauge serializes count evaluation with the gauge write, +// preventing stale snapshots when concurrent goroutines race to update +// the metric. +type pendingUpdatesGauge struct { + gauge prometheus.Gauge + mu sync.Mutex +} + +// set evaluates count under the lock and writes the result to the gauge. +// count is a function, not a value, so the channel length is read atomically +// with the write; passing a pre-evaluated int would reintroduce the race. +func (g *pendingUpdatesGauge) set(count func() int) { + g.mu.Lock() + defer g.mu.Unlock() + + g.gauge.Set(float64(count())) } const ( @@ -35,6 +56,11 @@ const ( ) func NewMetrics(reg prometheus.Registerer) *Metrics { + pendingUpdates := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "pending_updates", Namespace: ns, Subsystem: subsystem, + Help: "The number of dispatch attempt results waiting to be flushed to the store.", + }) + return &Metrics{ DispatchAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "dispatch_attempts_total", Namespace: ns, Subsystem: subsystem, @@ -68,10 +94,10 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { }, []string{LabelMethod}), // Currently no requirement to discriminate between success and failure updates which are pending. - PendingUpdates: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "pending_updates", Namespace: ns, Subsystem: subsystem, - Help: "The number of dispatch attempt results waiting to be flushed to the store.", - }), + PendingUpdates: pendingUpdates, + pendingUpdatesGauge: &pendingUpdatesGauge{ + gauge: pendingUpdates, + }, SyncedUpdates: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "synced_updates_total", Namespace: ns, Subsystem: subsystem, Help: "The number of dispatch attempt results flushed to the store.", diff --git a/coderd/notifications/metrics_internal_test.go b/coderd/notifications/metrics_internal_test.go new file mode 100644 index 0000000000..04360dc221 --- /dev/null +++ b/coderd/notifications/metrics_internal_test.go @@ -0,0 +1,85 @@ +package notifications + +import ( + "sync" + "testing" + + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/testutil" +) + +func TestMetricsSetPendingUpdatesSerializesGaugeWrites(t *testing.T) { + t.Parallel() + + realGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "test_pending_updates", + Help: "test pending updates gauge", + }) + blockingGauge := &pendingUpdatesBlockingGauge{ + Gauge: realGauge, + blockValue: 3, + entered: make(chan struct{}), + release: make(chan struct{}), + } + metrics := &Metrics{ + PendingUpdates: blockingGauge, + pendingUpdatesGauge: &pendingUpdatesGauge{gauge: blockingGauge}, + } + + success := make(chan dispatchResult, 4) + failure := make(chan dispatchResult, 4) + success <- dispatchResult{} + success <- dispatchResult{} + + firstDone := make(chan struct{}) + go func() { + defer close(firstDone) + failure <- dispatchResult{} + // The first writer observes total=3 and blocks inside Set(3) + // while still holding the pendingUpdatesGauge mutex. + metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) }) + }() + + testutil.TryReceive(testutil.Context(t, testutil.WaitShort), t, blockingGauge.entered) + + // The main goroutine raises the real total to 4 before a second + // writer queues behind the locked gauge. + success <- dispatchResult{} + + secondDone := make(chan struct{}) + go func() { + defer close(secondDone) + // This count must be evaluated after release, while holding the + // mutex, so the final gauge value cannot regress to 3. + metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) }) + }() + + close(blockingGauge.release) + testutil.TryReceive(testutil.Context(t, testutil.WaitShort), t, firstDone) + testutil.TryReceive(testutil.Context(t, testutil.WaitShort), t, secondDone) + + require.Equal(t, 4, len(success)+len(failure)) + require.EqualValues(t, 4, promtest.ToFloat64(metrics.PendingUpdates)) +} + +type pendingUpdatesBlockingGauge struct { + prometheus.Gauge + + blockValue float64 + entered chan struct{} + release chan struct{} + once sync.Once +} + +func (g *pendingUpdatesBlockingGauge) Set(value float64) { + if value == g.blockValue { + g.once.Do(func() { + close(g.entered) + <-g.release + }) + } + g.Gauge.Set(value) +} diff --git a/coderd/notifications/metrics_test.go b/coderd/notifications/metrics_test.go index 5562ded86e..3a2d7fbc34 100644 --- a/coderd/notifications/metrics_test.go +++ b/coderd/notifications/metrics_test.go @@ -276,17 +276,24 @@ func TestPendingUpdatesMetric(t *testing.T) { mClock.Advance(cfg.FetchInterval.Value()).MustWait(ctx) // THEN: - // handler has dispatched the given notifications. - func() { + // Both handlers have dispatched the given notifications, and their + // results are pending in the metrics. + require.EventuallyWithT(t, func(ct *assert.CollectT) { handler.mu.RLock() + inboxHandler.mu.RLock() defer handler.mu.RUnlock() + defer inboxHandler.mu.RUnlock() - require.Len(t, handler.succeeded, 1) - require.Len(t, handler.failed, 1) - }() + assert.Len(ct, handler.succeeded, 1) + assert.Len(ct, handler.failed, 1) + assert.Len(ct, inboxHandler.succeeded, 1) + assert.Len(ct, inboxHandler.failed, 1) - // Both handler calls should be pending in the metrics. - require.EqualValues(t, 4, promtest.ToFloat64(metrics.PendingUpdates)) + success, failure := mgr.BufferedUpdatesCount() + assert.Equal(ct, 2, success) + assert.Equal(ct, 2, failure) + assert.EqualValues(ct, 4, promtest.ToFloat64(metrics.PendingUpdates)) + }, testutil.WaitShort, testutil.IntervalFast) // THEN: // Trigger syncing updates diff --git a/coderd/notifications/notifier.go b/coderd/notifications/notifier.go index 391c7c9bdb..9c7284c019 100644 --- a/coderd/notifications/notifier.go +++ b/coderd/notifications/notifier.go @@ -172,6 +172,7 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f // If a notification template has been disabled by the user after a notification was enqueued, mark it as inhibited if msg.Disabled { failure <- n.newInhibitedDispatch(msg) + n.metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) }) continue } @@ -184,7 +185,7 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f n.log.Error(ctx, "dispatcher construction failed", slog.F("msg_id", msg.ID), slog.Error(err)) } failure <- n.newFailedDispatch(msg, err, xerrors.Is(err, decorateHelpersError{})) - n.metrics.PendingUpdates.Set(float64(len(success) + len(failure))) + n.metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) }) continue } @@ -316,7 +317,7 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification logger.Debug(ctx, "message dispatch succeeded") } } - n.metrics.PendingUpdates.Set(float64(len(success) + len(failure))) + n.metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) }) return nil }