mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(coderd/notifications): serialize pending updates gauge writes (#25495)
Fixes a race where concurrent notification dispatch goroutines could overwrite `coderd_notifications_pending_updates` with an older buffer-length snapshot. Pending update snapshots now serialize count evaluation with the gauge write, and inhibited dispatch results refresh the metric when buffered.
This commit is contained in:
committed by
GitHub
parent
5cdc9e28a9
commit
0401ed3af5
@@ -237,9 +237,7 @@ func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
|
||||
// syncUpdates updates messages in the store based on the given successful and failed message dispatch results.
|
||||
func (m *Manager) syncUpdates(ctx context.Context) {
|
||||
// Ensure we update the metrics to reflect the current state after each invocation.
|
||||
defer func() {
|
||||
m.metrics.PendingUpdates.Set(float64(len(m.success) + len(m.failure)))
|
||||
}()
|
||||
defer m.metrics.pendingUpdatesGauge.set(func() int { return len(m.success) + len(m.failure) })
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -250,7 +248,7 @@ func (m *Manager) syncUpdates(ctx context.Context) {
|
||||
nSuccess := len(m.success)
|
||||
nFailure := len(m.failure)
|
||||
|
||||
m.metrics.PendingUpdates.Set(float64(nSuccess + nFailure))
|
||||
m.metrics.pendingUpdatesGauge.set(func() int { return len(m.success) + len(m.failure) })
|
||||
|
||||
// Nothing to do.
|
||||
if nSuccess+nFailure == 0 {
|
||||
|
||||
@@ -3,6 +3,7 @@ package notifications
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
@@ -17,8 +18,28 @@ type Metrics struct {
|
||||
InflightDispatches *prometheus.GaugeVec
|
||||
DispatcherSendSeconds *prometheus.HistogramVec
|
||||
|
||||
PendingUpdates prometheus.Gauge
|
||||
PendingUpdates prometheus.Collector
|
||||
SyncedUpdates prometheus.Counter
|
||||
|
||||
pendingUpdatesGauge *pendingUpdatesGauge
|
||||
}
|
||||
|
||||
// pendingUpdatesGauge serializes count evaluation with the gauge write,
|
||||
// preventing stale snapshots when concurrent goroutines race to update
|
||||
// the metric.
|
||||
type pendingUpdatesGauge struct {
|
||||
gauge prometheus.Gauge
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// set evaluates count under the lock and writes the result to the gauge.
|
||||
// count is a function, not a value, so the channel length is read atomically
|
||||
// with the write; passing a pre-evaluated int would reintroduce the race.
|
||||
func (g *pendingUpdatesGauge) set(count func() int) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
|
||||
g.gauge.Set(float64(count()))
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -35,6 +56,11 @@ const (
|
||||
)
|
||||
|
||||
func NewMetrics(reg prometheus.Registerer) *Metrics {
|
||||
pendingUpdates := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
|
||||
Name: "pending_updates", Namespace: ns, Subsystem: subsystem,
|
||||
Help: "The number of dispatch attempt results waiting to be flushed to the store.",
|
||||
})
|
||||
|
||||
return &Metrics{
|
||||
DispatchAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "dispatch_attempts_total", Namespace: ns, Subsystem: subsystem,
|
||||
@@ -68,10 +94,10 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
|
||||
}, []string{LabelMethod}),
|
||||
|
||||
// Currently no requirement to discriminate between success and failure updates which are pending.
|
||||
PendingUpdates: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
|
||||
Name: "pending_updates", Namespace: ns, Subsystem: subsystem,
|
||||
Help: "The number of dispatch attempt results waiting to be flushed to the store.",
|
||||
}),
|
||||
PendingUpdates: pendingUpdates,
|
||||
pendingUpdatesGauge: &pendingUpdatesGauge{
|
||||
gauge: pendingUpdates,
|
||||
},
|
||||
SyncedUpdates: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
||||
Name: "synced_updates_total", Namespace: ns, Subsystem: subsystem,
|
||||
Help: "The number of dispatch attempt results flushed to the store.",
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
package notifications
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
promtest "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestMetricsSetPendingUpdatesSerializesGaugeWrites(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
realGauge := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "test_pending_updates",
|
||||
Help: "test pending updates gauge",
|
||||
})
|
||||
blockingGauge := &pendingUpdatesBlockingGauge{
|
||||
Gauge: realGauge,
|
||||
blockValue: 3,
|
||||
entered: make(chan struct{}),
|
||||
release: make(chan struct{}),
|
||||
}
|
||||
metrics := &Metrics{
|
||||
PendingUpdates: blockingGauge,
|
||||
pendingUpdatesGauge: &pendingUpdatesGauge{gauge: blockingGauge},
|
||||
}
|
||||
|
||||
success := make(chan dispatchResult, 4)
|
||||
failure := make(chan dispatchResult, 4)
|
||||
success <- dispatchResult{}
|
||||
success <- dispatchResult{}
|
||||
|
||||
firstDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(firstDone)
|
||||
failure <- dispatchResult{}
|
||||
// The first writer observes total=3 and blocks inside Set(3)
|
||||
// while still holding the pendingUpdatesGauge mutex.
|
||||
metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) })
|
||||
}()
|
||||
|
||||
testutil.TryReceive(testutil.Context(t, testutil.WaitShort), t, blockingGauge.entered)
|
||||
|
||||
// The main goroutine raises the real total to 4 before a second
|
||||
// writer queues behind the locked gauge.
|
||||
success <- dispatchResult{}
|
||||
|
||||
secondDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(secondDone)
|
||||
// This count must be evaluated after release, while holding the
|
||||
// mutex, so the final gauge value cannot regress to 3.
|
||||
metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) })
|
||||
}()
|
||||
|
||||
close(blockingGauge.release)
|
||||
testutil.TryReceive(testutil.Context(t, testutil.WaitShort), t, firstDone)
|
||||
testutil.TryReceive(testutil.Context(t, testutil.WaitShort), t, secondDone)
|
||||
|
||||
require.Equal(t, 4, len(success)+len(failure))
|
||||
require.EqualValues(t, 4, promtest.ToFloat64(metrics.PendingUpdates))
|
||||
}
|
||||
|
||||
type pendingUpdatesBlockingGauge struct {
|
||||
prometheus.Gauge
|
||||
|
||||
blockValue float64
|
||||
entered chan struct{}
|
||||
release chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (g *pendingUpdatesBlockingGauge) Set(value float64) {
|
||||
if value == g.blockValue {
|
||||
g.once.Do(func() {
|
||||
close(g.entered)
|
||||
<-g.release
|
||||
})
|
||||
}
|
||||
g.Gauge.Set(value)
|
||||
}
|
||||
@@ -276,17 +276,24 @@ func TestPendingUpdatesMetric(t *testing.T) {
|
||||
mClock.Advance(cfg.FetchInterval.Value()).MustWait(ctx)
|
||||
|
||||
// THEN:
|
||||
// handler has dispatched the given notifications.
|
||||
func() {
|
||||
// Both handlers have dispatched the given notifications, and their
|
||||
// results are pending in the metrics.
|
||||
require.EventuallyWithT(t, func(ct *assert.CollectT) {
|
||||
handler.mu.RLock()
|
||||
inboxHandler.mu.RLock()
|
||||
defer handler.mu.RUnlock()
|
||||
defer inboxHandler.mu.RUnlock()
|
||||
|
||||
require.Len(t, handler.succeeded, 1)
|
||||
require.Len(t, handler.failed, 1)
|
||||
}()
|
||||
assert.Len(ct, handler.succeeded, 1)
|
||||
assert.Len(ct, handler.failed, 1)
|
||||
assert.Len(ct, inboxHandler.succeeded, 1)
|
||||
assert.Len(ct, inboxHandler.failed, 1)
|
||||
|
||||
// Both handler calls should be pending in the metrics.
|
||||
require.EqualValues(t, 4, promtest.ToFloat64(metrics.PendingUpdates))
|
||||
success, failure := mgr.BufferedUpdatesCount()
|
||||
assert.Equal(ct, 2, success)
|
||||
assert.Equal(ct, 2, failure)
|
||||
assert.EqualValues(ct, 4, promtest.ToFloat64(metrics.PendingUpdates))
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
// THEN:
|
||||
// Trigger syncing updates
|
||||
|
||||
@@ -172,6 +172,7 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f
|
||||
// If a notification template has been disabled by the user after a notification was enqueued, mark it as inhibited
|
||||
if msg.Disabled {
|
||||
failure <- n.newInhibitedDispatch(msg)
|
||||
n.metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) })
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -184,7 +185,7 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f
|
||||
n.log.Error(ctx, "dispatcher construction failed", slog.F("msg_id", msg.ID), slog.Error(err))
|
||||
}
|
||||
failure <- n.newFailedDispatch(msg, err, xerrors.Is(err, decorateHelpersError{}))
|
||||
n.metrics.PendingUpdates.Set(float64(len(success) + len(failure)))
|
||||
n.metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) })
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -316,7 +317,7 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
|
||||
logger.Debug(ctx, "message dispatch succeeded")
|
||||
}
|
||||
}
|
||||
n.metrics.PendingUpdates.Set(float64(len(success) + len(failure)))
|
||||
n.metrics.pendingUpdatesGauge.set(func() int { return len(success) + len(failure) })
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user