From 4671ebb330d5507c86fe97dd82556d55b6a312e6 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 10 May 2024 14:31:49 +0200 Subject: [PATCH] feat: measure pubsub latencies and expose metrics (#13126) --- coderd/database/pubsub/latency.go | 74 +++++++++++++ coderd/database/pubsub/pubsub.go | 61 ++++++++++- coderd/database/pubsub/pubsub_linux_test.go | 111 ++++++++++++++++++++ coderd/database/pubsub/pubsub_test.go | 39 +++++-- testutil/prometheus.go | 79 ++++++++++---- 5 files changed, 326 insertions(+), 38 deletions(-) create mode 100644 coderd/database/pubsub/latency.go diff --git a/coderd/database/pubsub/latency.go b/coderd/database/pubsub/latency.go new file mode 100644 index 0000000000..0797e6642b --- /dev/null +++ b/coderd/database/pubsub/latency.go @@ -0,0 +1,74 @@ +package pubsub + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" +) + +// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these +// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full. +type LatencyMeasurer struct { + // Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements. + channel uuid.UUID + logger slog.Logger +} + +// LatencyMessageLength is the length of a UUIDv4 encoded to hex. +const LatencyMessageLength = 36 + +func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer { + return &LatencyMeasurer{ + channel: uuid.New(), + logger: logger, + } +} + +// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency. +func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send, recv time.Duration, err error) { + var ( + start time.Time + res = make(chan time.Duration, 1) + ) + + msg := []byte(uuid.New().String()) + lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg)) + + cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) { + if !bytes.Equal(in, msg) { + lm.logger.Warn(ctx, "received unexpected message", slog.F("got", in), slog.F("expected", msg)) + return + } + + res <- time.Since(start) + }) + if err != nil { + return -1, -1, xerrors.Errorf("failed to subscribe: %w", err) + } + defer cancel() + + start = time.Now() + err = p.Publish(lm.latencyChannelName(), msg) + if err != nil { + return -1, -1, xerrors.Errorf("failed to publish: %w", err) + } + + send = time.Since(start) + select { + case <-ctx.Done(): + lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg)) + return send, -1, ctx.Err() + case recv = <-res: + return send, recv, nil + } +} + +func (lm *LatencyMeasurer) latencyChannelName() string { + return fmt.Sprintf("latency-measure:%s", lm.channel) +} diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index 59e5b23c34..c391a7c3ea 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -7,6 +7,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -28,6 +29,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error) // might have been dropped. var ErrDroppedMessages = xerrors.New("dropped messages") +// LatencyMeasureTimeout defines how often to trigger a new background latency measurement. +const LatencyMeasureTimeout = time.Second * 10 + // Pubsub is a generic interface for broadcasting and receiving messages. // Implementors should assume high-availability with the backing implementation. type Pubsub interface { @@ -205,6 +209,10 @@ type PGPubsub struct { receivedBytesTotal prometheus.Counter disconnectionsTotal prometheus.Counter connected prometheus.Gauge + + latencyMeasurer *LatencyMeasurer + latencyMeasureCounter atomic.Int64 + latencyErrCounter atomic.Int64 } // BufferSize is the maximum number of unhandled messages we will buffer @@ -478,6 +486,30 @@ var ( ) ) +// additional metrics collected out-of-band +var ( + pubsubSendLatencyDesc = prometheus.NewDesc( + "coder_pubsub_send_latency_seconds", + "The time taken to send a message into a pubsub event channel", + nil, nil, + ) + pubsubRecvLatencyDesc = prometheus.NewDesc( + "coder_pubsub_receive_latency_seconds", + "The time taken to receive a message from a pubsub event channel", + nil, nil, + ) + pubsubLatencyMeasureCountDesc = prometheus.NewDesc( + "coder_pubsub_latency_measures_total", + "The number of pubsub latency measurements", + nil, nil, + ) + pubsubLatencyMeasureErrDesc = prometheus.NewDesc( + "coder_pubsub_latency_measure_errs_total", + "The number of pubsub latency measurement failures", + nil, nil, + ) +) + // We'll track messages as size "normal" and "colossal", where the // latter are messages larger than 7600 bytes, or 95% of the postgres // notify limit. If we see a lot of colossal packets that's an indication that @@ -504,6 +536,12 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) { // implicit metrics descs <- currentSubscribersDesc descs <- currentEventsDesc + + // additional metrics + descs <- pubsubSendLatencyDesc + descs <- pubsubRecvLatencyDesc + descs <- pubsubLatencyMeasureCountDesc + descs <- pubsubLatencyMeasureErrDesc } // Collect implements, along with Describe, the prometheus.Collector interface @@ -528,6 +566,20 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) { p.qMu.Unlock() metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs)) metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events)) + + // additional metrics + ctx, cancel := context.WithTimeout(context.Background(), LatencyMeasureTimeout) + defer cancel() + send, recv, err := p.latencyMeasurer.Measure(ctx, p) + + metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasureCounter.Add(1))) + if err != nil { + p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err)) + metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, float64(p.latencyErrCounter.Add(1))) + return + } + metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send.Seconds()) + metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv.Seconds()) } // New creates a new Pubsub implementation using a PostgreSQL connection. @@ -544,10 +596,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect // newWithoutListener creates a new PGPubsub without creating the pqListener. func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub { return &PGPubsub{ - logger: logger, - listenDone: make(chan struct{}), - db: database, - queues: make(map[string]map[uuid.UUID]*msgQueue), + logger: logger, + listenDone: make(chan struct{}), + db: database, + queues: make(map[string]map[uuid.UUID]*msgQueue), + latencyMeasurer: NewLatencyMeasurer(logger.Named("latency-measurer")), publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "coder", diff --git a/coderd/database/pubsub/pubsub_linux_test.go b/coderd/database/pubsub/pubsub_linux_test.go index 4952539bc4..efde759096 100644 --- a/coderd/database/pubsub/pubsub_linux_test.go +++ b/coderd/database/pubsub/pubsub_linux_test.go @@ -3,6 +3,7 @@ package pubsub_test import ( + "bytes" "context" "database/sql" "fmt" @@ -15,6 +16,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" + "cdr.dev/slog/sloggers/sloghuman" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/coderd/database/dbtestutil" @@ -294,3 +297,111 @@ func TestPubsub_Disconnect(t *testing.T) { } require.True(t, gotDroppedErr) } + +func TestMeasureLatency(t *testing.T) { + t.Parallel() + + newPubsub := func() (pubsub.Pubsub, func()) { + ctx, cancel := context.WithCancel(context.Background()) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + connectionURL, closePg, err := dbtestutil.Open() + require.NoError(t, err) + db, err := sql.Open("postgres", connectionURL) + require.NoError(t, err) + ps, err := pubsub.New(ctx, logger, db, connectionURL) + require.NoError(t, err) + + return ps, func() { + _ = ps.Close() + _ = db.Close() + closePg() + cancel() + } + } + + t.Run("MeasureLatency", func(t *testing.T) { + t.Parallel() + + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + ps, done := newPubsub() + defer done() + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + + send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps) + require.NoError(t, err) + require.Greater(t, send.Seconds(), 0.0) + require.Greater(t, recv.Seconds(), 0.0) + }) + + t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) { + t.Parallel() + + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + ps, done := newPubsub() + defer done() + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) + defer cancel() + + send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps) + require.ErrorContains(t, err, context.DeadlineExceeded.Error()) + require.Greater(t, send.Seconds(), 0.0) + require.EqualValues(t, recv, time.Duration(-1)) + }) + + t.Run("MeasureLatencyNotifyRace", func(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + logger = logger.AppendSinks(sloghuman.Sink(&buf)) + + lm := pubsub.NewLatencyMeasurer(logger) + ps, done := newPubsub() + defer done() + + racy := newRacyPubsub(ps) + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + + send, recv, err := lm.Measure(ctx, racy) + assert.NoError(t, err) + assert.Greater(t, send.Seconds(), 0.0) + assert.Greater(t, recv.Seconds(), 0.0) + + logger.Sync() + assert.Contains(t, buf.String(), "received unexpected message") + }) +} + +// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not). +// This is used to verify that a subscriber will only listen for the message it explicitly expects. +type racyPubsub struct { + pubsub.Pubsub +} + +func newRacyPubsub(ps pubsub.Pubsub) *racyPubsub { + return &racyPubsub{ps} +} + +func (s *racyPubsub) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) { + return s.Pubsub.Subscribe(event, listener) +} + +func (s *racyPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) { + return s.Pubsub.SubscribeWithErr(event, listener) +} + +func (s *racyPubsub) Publish(event string, message []byte) error { + err := s.Pubsub.Publish(event, []byte("nonsense")) + if err != nil { + return xerrors.Errorf("failed to send simulated race: %w", err) + } + return s.Pubsub.Publish(event, message) +} + +func (s *racyPubsub) Close() error { + return s.Pubsub.Close() +} diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index 97e507412a..d36298bb32 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -39,7 +39,11 @@ func TestPGPubsub_Metrics(t *testing.T) { err = registry.Register(uut) require.NoError(t, err) + // each Gather measures pubsub latency by publishing a message & subscribing to it + var gatherCount float64 + metrics, err := registry.Gather() + gatherCount++ require.NoError(t, err) require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_events")) require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_subscribers")) @@ -59,19 +63,26 @@ func TestPGPubsub_Metrics(t *testing.T) { _ = testutil.RequireRecvCtx(ctx, t, messageChannel) require.Eventually(t, func() bool { + latencyBytes := gatherCount * pubsub.LatencyMessageLength metrics, err = registry.Gather() + gatherCount++ assert.NoError(t, err) return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") && testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_subscribers") && testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") && - testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_publishes_total", "true") && - testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_subscribes_total", "true") && - testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") && - testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_received_bytes_total") && - testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_published_bytes_total") + testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_publishes_total", "true") && + testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_subscribes_total", "true") && + testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") && + testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_received_bytes_total") && + testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_published_bytes_total") && + testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") && + testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") && + testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") && + !testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total") }, testutil.WaitShort, testutil.IntervalFast) - colossalData := make([]byte, 7600) + colossalSize := 7600 + colossalData := make([]byte, colossalSize) for i := range colossalData { colossalData[i] = 'q' } @@ -89,16 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) { _ = testutil.RequireRecvCtx(ctx, t, messageChannel) require.Eventually(t, func() bool { + latencyBytes := gatherCount * pubsub.LatencyMessageLength metrics, err = registry.Gather() + gatherCount++ assert.NoError(t, err) return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") && testutil.PromGaugeHasValue(t, metrics, 2, "coder_pubsub_current_subscribers") && testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") && - testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_publishes_total", "true") && - testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_subscribes_total", "true") && - testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") && + testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_publishes_total", "true") && + testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_subscribes_total", "true") && + testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") && testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "colossal") && - testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_received_bytes_total") && - testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_published_bytes_total") + testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_received_bytes_total") && + testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_published_bytes_total") && + testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") && + testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") && + testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") && + !testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total") }, testutil.WaitShort, testutil.IntervalFast) } diff --git a/testutil/prometheus.go b/testutil/prometheus.go index 3d4879c14c..94d350abe3 100644 --- a/testutil/prometheus.go +++ b/testutil/prometheus.go @@ -7,29 +7,60 @@ import ( "github.com/stretchr/testify/require" ) -func PromGaugeHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, label ...string) bool { +type kind string + +const ( + counterKind kind = "counter" + gaugeKind kind = "gauge" +) + +func PromGaugeHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, labels ...string) bool { t.Helper() - for _, family := range metrics { - if family.GetName() != name { - continue - } - ms := family.GetMetric() - metricsLoop: - for _, m := range ms { - require.Equal(t, len(label), len(m.GetLabel())) - for i, lv := range label { - if lv != m.GetLabel()[i].GetValue() { - continue metricsLoop - } - } - return value == m.GetGauge().GetValue() - } - } - return false + return value == getValue(t, metrics, gaugeKind, name, labels...) } -func PromCounterHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, label ...string) bool { +func PromCounterHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, labels ...string) bool { t.Helper() + return value == getValue(t, metrics, counterKind, name, labels...) +} + +func PromGaugeAssertion(t testing.TB, metrics []*dto.MetricFamily, assert func(in float64) bool, name string, labels ...string) bool { + t.Helper() + return assert(getValue(t, metrics, gaugeKind, name, labels...)) +} + +func PromCounterAssertion(t testing.TB, metrics []*dto.MetricFamily, assert func(in float64) bool, name string, labels ...string) bool { + t.Helper() + return assert(getValue(t, metrics, counterKind, name, labels...)) +} + +func PromCounterGathered(t testing.TB, metrics []*dto.MetricFamily, name string, labels ...string) bool { + t.Helper() + return getMetric(t, metrics, name, labels...) != nil +} + +func PromGaugeGathered(t testing.TB, metrics []*dto.MetricFamily, name string, labels ...string) bool { + t.Helper() + return getMetric(t, metrics, name, labels...) != nil +} + +func getValue(t testing.TB, metrics []*dto.MetricFamily, kind kind, name string, labels ...string) float64 { + m := getMetric(t, metrics, name, labels...) + if m == nil { + return -1 + } + + switch kind { + case counterKind: + return m.GetCounter().GetValue() + case gaugeKind: + return m.GetGauge().GetValue() + default: + return -1 + } +} + +func getMetric(t testing.TB, metrics []*dto.MetricFamily, name string, labels ...string) *dto.Metric { for _, family := range metrics { if family.GetName() != name { continue @@ -37,14 +68,16 @@ func PromCounterHasValue(t testing.TB, metrics []*dto.MetricFamily, value float6 ms := family.GetMetric() metricsLoop: for _, m := range ms { - require.Equal(t, len(label), len(m.GetLabel())) - for i, lv := range label { + require.Equal(t, len(labels), len(m.GetLabel())) + for i, lv := range labels { if lv != m.GetLabel()[i].GetValue() { continue metricsLoop } } - return value == m.GetCounter().GetValue() + + return m } } - return false + + return nil }