feat: implement observability of notifications subsystem (#13799)

This commit is contained in:
Danny Kopping
2024-07-11 10:57:49 +02:00
committed by GitHub
parent a6d66cc7ec
commit b2dab3308d
22 changed files with 769 additions and 186 deletions
+2 -1
View File
@@ -983,6 +983,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
)
if experiments.Enabled(codersdk.ExperimentNotifications) {
cfg := options.DeploymentValues.Notifications
metrics := notifications.NewMetrics(options.PrometheusRegistry)
// The enqueuer is responsible for enqueueing notifications to the given store.
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"))
@@ -994,7 +995,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
// The notification manager is responsible for:
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
// - keeping the store updated with status updates
notificationsManager, err = notifications.NewManager(cfg, options.Database, logger.Named("notifications.manager"))
notificationsManager, err = notifications.NewManager(cfg, options.Database, metrics, logger.Named("notifications.manager"))
if err != nil {
return xerrors.Errorf("failed to instantiate notification manager: %w", err)
}
+2 -2
View File
@@ -1143,9 +1143,9 @@ func (q *querier) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context,
return q.db.DeleteWorkspaceAgentPortSharesByTemplate(ctx, templateID)
}
func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) error {
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
return database.NotificationMessage{}, err
return err
}
return q.db.EnqueueNotificationMessage(ctx, arg)
}
+15 -10
View File
@@ -935,12 +935,17 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
q.mutex.Lock()
defer q.mutex.Unlock()
var out []database.AcquireNotificationMessagesRow
for _, nm := range q.notificationMessages {
if len(out) >= int(arg.Count) {
break
}
// Shift the first "Count" notifications off the slice (FIFO).
sz := len(q.notificationMessages)
if sz > int(arg.Count) {
sz = int(arg.Count)
}
list := q.notificationMessages[:sz]
q.notificationMessages = q.notificationMessages[sz:]
var out []database.AcquireNotificationMessagesRow
for _, nm := range list {
acquirableStatuses := []database.NotificationMessageStatus{database.NotificationMessageStatusPending, database.NotificationMessageStatusTemporaryFailure}
if !slices.Contains(acquirableStatuses, nm.Status) {
continue
@@ -956,9 +961,9 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
ID: nm.ID,
Payload: nm.Payload,
Method: nm.Method,
CreatedBy: nm.CreatedBy,
TitleTemplate: "This is a title with {{.Labels.variable}}",
BodyTemplate: "This is a body with {{.Labels.variable}}",
TemplateID: nm.NotificationTemplateID,
})
}
@@ -1815,10 +1820,10 @@ func (q *FakeQuerier) DeleteWorkspaceAgentPortSharesByTemplate(_ context.Context
return nil
}
func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) error {
err := validateDatabaseType(arg)
if err != nil {
return database.NotificationMessage{}, err
return err
}
q.mutex.Lock()
@@ -1827,7 +1832,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database
var payload types.MessagePayload
err = json.Unmarshal(arg.Payload, &payload)
if err != nil {
return database.NotificationMessage{}, err
return err
}
nm := database.NotificationMessage{
@@ -1845,7 +1850,7 @@ func (q *FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database
q.notificationMessages = append(q.notificationMessages, nm)
return nm, err
return err
}
func (q *FakeQuerier) FavoriteWorkspace(_ context.Context, arg uuid.UUID) error {
+3 -3
View File
@@ -382,11 +382,11 @@ func (m metricsStore) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Conte
return r0
}
func (m metricsStore) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
func (m metricsStore) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) error {
start := time.Now()
r0, r1 := m.s.EnqueueNotificationMessage(ctx, arg)
r0 := m.s.EnqueueNotificationMessage(ctx, arg)
m.queryLatencies.WithLabelValues("EnqueueNotificationMessage").Observe(time.Since(start).Seconds())
return r0, r1
return r0
}
func (m metricsStore) FavoriteWorkspace(ctx context.Context, arg uuid.UUID) error {
+3 -4
View File
@@ -659,12 +659,11 @@ func (mr *MockStoreMockRecorder) DeleteWorkspaceAgentPortSharesByTemplate(arg0,
}
// EnqueueNotificationMessage mocks base method.
func (m *MockStore) EnqueueNotificationMessage(arg0 context.Context, arg1 database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
func (m *MockStore) EnqueueNotificationMessage(arg0 context.Context, arg1 database.EnqueueNotificationMessageParams) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EnqueueNotificationMessage", arg0, arg1)
ret0, _ := ret[0].(database.NotificationMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
ret0, _ := ret[0].(error)
return ret0
}
// EnqueueNotificationMessage indicates an expected call of EnqueueNotificationMessage.
+2 -1
View File
@@ -563,7 +563,8 @@ CREATE TABLE notification_messages (
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at timestamp with time zone,
leased_until timestamp with time zone,
next_retry_after timestamp with time zone
next_retry_after timestamp with time zone,
queued_seconds double precision
);
CREATE TABLE notification_templates (
@@ -0,0 +1,2 @@
ALTER TABLE notification_messages
DROP COLUMN IF EXISTS queued_seconds;
@@ -0,0 +1,2 @@
ALTER TABLE notification_messages
ADD COLUMN queued_seconds FLOAT NULL;
+1
View File
@@ -2031,6 +2031,7 @@ type NotificationMessage struct {
UpdatedAt sql.NullTime `db:"updated_at" json:"updated_at"`
LeasedUntil sql.NullTime `db:"leased_until" json:"leased_until"`
NextRetryAfter sql.NullTime `db:"next_retry_after" json:"next_retry_after"`
QueuedSeconds sql.NullFloat64 `db:"queued_seconds" json:"queued_seconds"`
}
// Templates from which to create notification messages.
+1 -1
View File
@@ -100,7 +100,7 @@ type sqlcQuerier interface {
DeleteTailnetTunnel(ctx context.Context, arg DeleteTailnetTunnelParams) (DeleteTailnetTunnelRow, error)
DeleteWorkspaceAgentPortShare(ctx context.Context, arg DeleteWorkspaceAgentPortShareParams) error
DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context, templateID uuid.UUID) error
EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error)
EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) error
FavoriteWorkspace(ctx context.Context, id uuid.UUID) error
// This is used to build up the notification_message's JSON payload.
FetchNewMessageMetadata(ctx context.Context, arg FetchNewMessageMetadataParams) (FetchNewMessageMetadataRow, error)
+23 -31
View File
@@ -3292,7 +3292,8 @@ const acquireNotificationMessages = `-- name: AcquireNotificationMessages :many
WITH acquired AS (
UPDATE
notification_messages
SET updated_at = NOW(),
SET queued_seconds = GREATEST(0, EXTRACT(EPOCH FROM (NOW() - updated_at)))::FLOAT,
updated_at = NOW(),
status = 'leased'::notification_message_status,
status_reason = 'Leased by notifier ' || $1::uuid,
leased_until = NOW() + CONCAT($2::int, ' seconds')::interval
@@ -3328,14 +3329,16 @@ WITH acquired AS (
FOR UPDATE OF nm
SKIP LOCKED
LIMIT $4)
RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after)
RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds)
SELECT
-- message
nm.id,
nm.payload,
nm.method,
nm.created_by,
nm.attempt_count::int AS attempt_count,
nm.queued_seconds::float AS queued_seconds,
-- template
nt.id AS template_id,
nt.title_template,
nt.body_template
FROM acquired nm
@@ -3353,7 +3356,9 @@ type AcquireNotificationMessagesRow struct {
ID uuid.UUID `db:"id" json:"id"`
Payload json.RawMessage `db:"payload" json:"payload"`
Method NotificationMethod `db:"method" json:"method"`
CreatedBy string `db:"created_by" json:"created_by"`
AttemptCount int32 `db:"attempt_count" json:"attempt_count"`
QueuedSeconds float64 `db:"queued_seconds" json:"queued_seconds"`
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
TitleTemplate string `db:"title_template" json:"title_template"`
BodyTemplate string `db:"body_template" json:"body_template"`
}
@@ -3386,7 +3391,9 @@ func (q *sqlQuerier) AcquireNotificationMessages(ctx context.Context, arg Acquir
&i.ID,
&i.Payload,
&i.Method,
&i.CreatedBy,
&i.AttemptCount,
&i.QueuedSeconds,
&i.TemplateID,
&i.TitleTemplate,
&i.BodyTemplate,
); err != nil {
@@ -3405,7 +3412,8 @@ func (q *sqlQuerier) AcquireNotificationMessages(ctx context.Context, arg Acquir
const bulkMarkNotificationMessagesFailed = `-- name: BulkMarkNotificationMessagesFailed :execrows
UPDATE notification_messages
SET updated_at = subquery.failed_at,
SET queued_seconds = 0,
updated_at = subquery.failed_at,
attempt_count = attempt_count + 1,
status = CASE
WHEN attempt_count + 1 < $1::int THEN subquery.status
@@ -3448,13 +3456,14 @@ func (q *sqlQuerier) BulkMarkNotificationMessagesFailed(ctx context.Context, arg
const bulkMarkNotificationMessagesSent = `-- name: BulkMarkNotificationMessagesSent :execrows
UPDATE notification_messages
SET updated_at = new_values.sent_at,
SET queued_seconds = 0,
updated_at = new_values.sent_at,
attempt_count = attempt_count + 1,
status = 'sent'::notification_message_status,
status_reason = NULL,
leased_until = NULL,
next_retry_after = NULL
FROM (SELECT UNNEST($1::uuid[]) AS id,
FROM (SELECT UNNEST($1::uuid[]) AS id,
UNNEST($2::timestamptz[]) AS sent_at)
AS new_values
WHERE notification_messages.id = new_values.id
@@ -3488,7 +3497,7 @@ func (q *sqlQuerier) DeleteOldNotificationMessages(ctx context.Context) error {
return err
}
const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :one
const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :exec
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
VALUES ($1,
$2,
@@ -3497,7 +3506,6 @@ VALUES ($1,
$5::jsonb,
$6,
$7)
RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after
`
type EnqueueNotificationMessageParams struct {
@@ -3510,8 +3518,8 @@ type EnqueueNotificationMessageParams struct {
CreatedBy string `db:"created_by" json:"created_by"`
}
func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error) {
row := q.db.QueryRowContext(ctx, enqueueNotificationMessage,
func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) error {
_, err := q.db.ExecContext(ctx, enqueueNotificationMessage,
arg.ID,
arg.NotificationTemplateID,
arg.UserID,
@@ -3520,24 +3528,7 @@ func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg Enqueue
pq.Array(arg.Targets),
arg.CreatedBy,
)
var i NotificationMessage
err := row.Scan(
&i.ID,
&i.NotificationTemplateID,
&i.UserID,
&i.Method,
&i.Status,
&i.StatusReason,
&i.CreatedBy,
&i.Payload,
&i.AttemptCount,
pq.Array(&i.Targets),
&i.CreatedAt,
&i.UpdatedAt,
&i.LeasedUntil,
&i.NextRetryAfter,
)
return i, err
return err
}
const fetchNewMessageMetadata = `-- name: FetchNewMessageMetadata :one
@@ -3580,7 +3571,7 @@ func (q *sqlQuerier) FetchNewMessageMetadata(ctx context.Context, arg FetchNewMe
}
const getNotificationMessagesByStatus = `-- name: GetNotificationMessagesByStatus :many
SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after FROM notification_messages WHERE status = $1 LIMIT $2::int
SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds FROM notification_messages WHERE status = $1 LIMIT $2::int
`
type GetNotificationMessagesByStatusParams struct {
@@ -3612,6 +3603,7 @@ func (q *sqlQuerier) GetNotificationMessagesByStatus(ctx context.Context, arg Ge
&i.UpdatedAt,
&i.LeasedUntil,
&i.NextRetryAfter,
&i.QueuedSeconds,
); err != nil {
return nil, err
}
+12 -8
View File
@@ -10,7 +10,7 @@ FROM notification_templates nt,
WHERE nt.id = @notification_template_id
AND u.id = @user_id;
-- name: EnqueueNotificationMessage :one
-- name: EnqueueNotificationMessage :exec
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
VALUES (@id,
@notification_template_id,
@@ -18,8 +18,7 @@ VALUES (@id,
@method::notification_method,
@payload::jsonb,
@targets,
@created_by)
RETURNING *;
@created_by);
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
@@ -36,7 +35,8 @@ RETURNING *;
WITH acquired AS (
UPDATE
notification_messages
SET updated_at = NOW(),
SET queued_seconds = GREATEST(0, EXTRACT(EPOCH FROM (NOW() - updated_at)))::FLOAT,
updated_at = NOW(),
status = 'leased'::notification_message_status,
status_reason = 'Leased by notifier ' || sqlc.arg('notifier_id')::uuid,
leased_until = NOW() + CONCAT(sqlc.arg('lease_seconds')::int, ' seconds')::interval
@@ -78,8 +78,10 @@ SELECT
nm.id,
nm.payload,
nm.method,
nm.created_by,
nm.attempt_count::int AS attempt_count,
nm.queued_seconds::float AS queued_seconds,
-- template
nt.id AS template_id,
nt.title_template,
nt.body_template
FROM acquired nm
@@ -87,7 +89,8 @@ FROM acquired nm
-- name: BulkMarkNotificationMessagesFailed :execrows
UPDATE notification_messages
SET updated_at = subquery.failed_at,
SET queued_seconds = 0,
updated_at = subquery.failed_at,
attempt_count = attempt_count + 1,
status = CASE
WHEN attempt_count + 1 < @max_attempts::int THEN subquery.status
@@ -105,13 +108,14 @@ WHERE notification_messages.id = subquery.id;
-- name: BulkMarkNotificationMessagesSent :execrows
UPDATE notification_messages
SET updated_at = new_values.sent_at,
SET queued_seconds = 0,
updated_at = new_values.sent_at,
attempt_count = attempt_count + 1,
status = 'sent'::notification_message_status,
status_reason = NULL,
leased_until = NULL,
next_retry_after = NULL
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
UNNEST(@sent_ats::timestamptz[]) AS sent_at)
AS new_values
WHERE notification_messages.id = new_values.id;
+2 -2
View File
@@ -59,7 +59,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
}
id := uuid.New()
msg, err := s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
err = s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
ID: id,
UserID: userID,
NotificationTemplateID: templateID,
@@ -73,7 +73,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
return nil, xerrors.Errorf("enqueue notification: %w", err)
}
s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", msg.ID))
s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", id))
return &id, nil
}
+27 -24
View File
@@ -43,6 +43,9 @@ type Manager struct {
notifier *notifier
handlers map[database.NotificationMethod]Handler
method database.NotificationMethod
metrics *Metrics
success, failure chan dispatchResult
@@ -56,7 +59,13 @@ type Manager struct {
//
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
// access URL etc.
func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger) (*Manager, error) {
func NewManager(cfg codersdk.NotificationsConfig, store Store, metrics *Metrics, log slog.Logger) (*Manager, error) {
// TODO(dannyk): add the ability to use multiple notification methods.
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
return nil, xerrors.Errorf("notification method %q is invalid", cfg.Method)
}
// If dispatch timeout exceeds lease period, it is possible that messages can be delivered in duplicate because the
// lease can expire before the notifier gives up on the dispatch, which results in the message becoming eligible for
// being re-acquired.
@@ -78,6 +87,9 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
success: make(chan dispatchResult, cfg.StoreSyncBufferSize),
failure: make(chan dispatchResult, cfg.StoreSyncBufferSize),
metrics: metrics,
method: method,
stop: make(chan any),
done: make(chan any),
@@ -137,7 +149,7 @@ func (m *Manager) loop(ctx context.Context) error {
var eg errgroup.Group
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.method, m.metrics)
eg.Go(func() error {
return m.notifier.run(ctx, m.success, m.failure)
})
@@ -171,12 +183,12 @@ func (m *Manager) loop(ctx context.Context) error {
if len(m.success)+len(m.failure) > 0 {
m.log.Warn(ctx, "flushing buffered updates before stop",
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
m.bulkUpdate(ctx)
m.syncUpdates(ctx)
m.log.Warn(ctx, "flushing updates done")
}
return nil
case <-tick.C:
m.bulkUpdate(ctx)
m.syncUpdates(ctx)
}
}
})
@@ -194,8 +206,13 @@ func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
return len(m.success), len(m.failure)
}
// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
func (m *Manager) bulkUpdate(ctx context.Context) {
// syncUpdates updates messages in the store based on the given successful and failed message dispatch results.
func (m *Manager) syncUpdates(ctx context.Context) {
// Ensure we update the metrics to reflect the current state after each invocation.
defer func() {
m.metrics.PendingUpdates.Set(float64(len(m.success) + len(m.failure)))
}()
select {
case <-ctx.Done():
return
@@ -205,6 +222,8 @@ func (m *Manager) bulkUpdate(ctx context.Context) {
nSuccess := len(m.success)
nFailure := len(m.failure)
m.metrics.PendingUpdates.Set(float64(nSuccess + nFailure))
// Nothing to do.
if nSuccess+nFailure == 0 {
return
@@ -266,6 +285,7 @@ func (m *Manager) bulkUpdate(ctx context.Context) {
logger.Error(ctx, "bulk update failed", slog.Error(err))
return
}
m.metrics.SyncedUpdates.Add(float64(n))
logger.Debug(ctx, "bulk update completed", slog.F("updated", n))
}()
@@ -289,6 +309,7 @@ func (m *Manager) bulkUpdate(ctx context.Context) {
logger.Error(ctx, "bulk update failed", slog.Error(err))
return
}
m.metrics.SyncedUpdates.Add(float64(n))
logger.Debug(ctx, "bulk update completed", slog.F("updated", n))
}()
@@ -347,21 +368,3 @@ type dispatchResult struct {
err error
retryable bool
}
func newSuccessfulDispatch(notifier, msg uuid.UUID) dispatchResult {
return dispatchResult{
notifier: notifier,
msg: msg,
ts: time.Now(),
}
}
func newFailedDispatch(notifier, msg uuid.UUID, err error, retryable bool) dispatchResult {
return dispatchResult{
notifier: notifier,
msg: msg,
ts: time.Now(),
err: err,
retryable: retryable,
}
}
+9 -9
View File
@@ -28,14 +28,14 @@ func TestBufferedUpdates(t *testing.T) {
// setup
ctx, logger, db := setupInMemory(t)
interceptor := &bulkUpdateInterceptor{Store: db}
interceptor := &syncInterceptor{Store: db}
santa := &santaHandler{}
cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically.
// GIVEN: a manager which will pass or fail notifications based on their "nice" labels
mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(cfg, interceptor, createMetrics(), logger.Named("notifications-manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
database.NotificationMethodSmtp: santa,
@@ -148,7 +148,7 @@ func TestStopBeforeRun(t *testing.T) {
ctx, logger, db := setupInMemory(t)
// GIVEN: a standard manager
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), db, logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), db, createMetrics(), logger.Named("notifications-manager"))
require.NoError(t, err)
// THEN: validate that the manager can be stopped safely without Run() having been called yet
@@ -158,7 +158,7 @@ func TestStopBeforeRun(t *testing.T) {
}, testutil.WaitShort, testutil.IntervalFast)
}
type bulkUpdateInterceptor struct {
type syncInterceptor struct {
notifications.Store
sent atomic.Int32
@@ -166,7 +166,7 @@ type bulkUpdateInterceptor struct {
err atomic.Value
}
func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
func (b *syncInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
updated, err := b.Store.BulkMarkNotificationMessagesSent(ctx, arg)
b.sent.Add(int32(updated))
if err != nil {
@@ -175,7 +175,7 @@ func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Con
return updated, err
}
func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
func (b *syncInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
updated, err := b.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
b.failed.Add(int32(updated))
if err != nil {
@@ -213,15 +213,15 @@ func newEnqueueInterceptor(db notifications.Store, metadataFn func() database.Fe
return &enqueueInterceptor{Store: db, payload: make(chan types.MessagePayload, 1), metadataFn: metadataFn}
}
func (e *enqueueInterceptor) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
func (e *enqueueInterceptor) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) error {
var payload types.MessagePayload
err := json.Unmarshal(arg.Payload, &payload)
if err != nil {
return database.NotificationMessage{}, err
return err
}
e.payload <- payload
return database.NotificationMessage{}, err
return err
}
func (e *enqueueInterceptor) FetchNewMessageMetadata(_ context.Context, _ database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) {
+80
View File
@@ -0,0 +1,80 @@
package notifications
import (
"fmt"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Metrics struct {
DispatchAttempts *prometheus.CounterVec
RetryCount *prometheus.CounterVec
QueuedSeconds *prometheus.HistogramVec
InflightDispatches *prometheus.GaugeVec
DispatcherSendSeconds *prometheus.HistogramVec
PendingUpdates prometheus.Gauge
SyncedUpdates prometheus.Counter
}
const (
ns = "coderd"
subsystem = "notifications"
LabelMethod = "method"
LabelTemplateID = "notification_template_id"
LabelResult = "result"
ResultSuccess = "success"
ResultTempFail = "temp_fail"
ResultPermFail = "perm_fail"
)
func NewMetrics(reg prometheus.Registerer) *Metrics {
return &Metrics{
DispatchAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "dispatch_attempts_total", Namespace: ns, Subsystem: subsystem,
Help: fmt.Sprintf("The number of dispatch attempts, aggregated by the result type (%s)",
strings.Join([]string{ResultSuccess, ResultTempFail, ResultPermFail}, ", ")),
}, []string{LabelMethod, LabelTemplateID, LabelResult}),
RetryCount: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "retry_count", Namespace: ns, Subsystem: subsystem,
Help: "The count of notification dispatch retry attempts.",
}, []string{LabelMethod, LabelTemplateID}),
// Aggregating on LabelTemplateID as well would cause a cardinality explosion.
QueuedSeconds: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "queued_seconds", Namespace: ns, Subsystem: subsystem,
Buckets: []float64{1, 2.5, 5, 7.5, 10, 15, 20, 30, 60, 120, 300, 600, 3600},
Help: "The time elapsed between a notification being enqueued in the store and retrieved for dispatching " +
"(measures the latency of the notifications system). This should generally be within CODER_NOTIFICATIONS_FETCH_INTERVAL " +
"seconds; higher values for a sustained period indicates delayed processing and CODER_NOTIFICATIONS_LEASE_COUNT " +
"can be increased to accommodate this.",
}, []string{LabelMethod}),
InflightDispatches: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "inflight_dispatches", Namespace: ns, Subsystem: subsystem,
Help: "The number of dispatch attempts which are currently in progress.",
}, []string{LabelMethod, LabelTemplateID}),
// Aggregating on LabelTemplateID as well would cause a cardinality explosion.
DispatcherSendSeconds: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "dispatcher_send_seconds", Namespace: ns, Subsystem: subsystem,
Buckets: []float64{0.001, 0.05, 0.1, 0.5, 1, 2, 5, 10, 15, 30, 60, 120},
Help: "The time taken to dispatch notifications.",
}, []string{LabelMethod}),
// Currently no requirement to discriminate between success and failure updates which are pending.
PendingUpdates: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "pending_updates", Namespace: ns, Subsystem: subsystem,
Help: "The number of dispatch attempt results waiting to be flushed to the store.",
}),
SyncedUpdates: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "synced_updates_total", Namespace: ns, Subsystem: subsystem,
Help: "The number of dispatch attempt results flushed to the store.",
}),
}
}
+444
View File
@@ -0,0 +1,444 @@
package notifications_test
import (
"context"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/coder/serpent"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
"github.com/coder/coder/v2/coderd/notifications/types"
"github.com/coder/coder/v2/testutil"
)
func TestMetrics(t *testing.T) {
t.Parallel()
// SETUP
if !dbtestutil.WillUsePostgres() {
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
}
ctx, logger, store := setup(t)
reg := prometheus.NewRegistry()
metrics := notifications.NewMetrics(reg)
template := notifications.TemplateWorkspaceDeleted
const (
method = database.NotificationMethodSmtp
maxAttempts = 3
debug = false
)
// GIVEN: a notification manager whose intervals are tuned low (for test speed) and whose dispatches are intercepted
cfg := defaultNotificationsConfig(method)
cfg.MaxSendAttempts = maxAttempts
// Tune the intervals low to increase test speed.
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
cfg.RetryInterval = serpent.Duration(time.Millisecond * 50)
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) // Twice as long as fetch interval to ensure we catch pending updates.
mgr, err := notifications.NewManager(cfg, store, metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
handler := &fakeHandler{}
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
})
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
require.NoError(t, err)
user := createSampleUser(t, store)
// Build fingerprints for the two different series we expect.
methodTemplateFP := fingerprintLabels(notifications.LabelMethod, string(method), notifications.LabelTemplateID, template.String())
methodFP := fingerprintLabels(notifications.LabelMethod, string(method))
expected := map[string]func(metric *dto.Metric, series string) bool{
"coderd_notifications_dispatch_attempts_total": func(metric *dto.Metric, series string) bool {
// This metric has 3 possible dispositions; find if any of them match first before we check the metric's value.
results := map[string]float64{
notifications.ResultSuccess: 1, // Only 1 successful delivery.
notifications.ResultTempFail: maxAttempts - 1, // 2 temp failures, on the 3rd it'll be marked permanent failure.
notifications.ResultPermFail: 1, // 1 permanent failure after retries exhausted.
}
var match string
for result, val := range results {
seriesFP := fingerprintLabels(notifications.LabelMethod, string(method), notifications.LabelTemplateID, template.String(), notifications.LabelResult, result)
if !hasMatchingFingerprint(metric, seriesFP) {
continue
}
match = result
if debug {
t.Logf("coderd_notifications_dispatch_attempts_total{result=%q} == %v: %v", result, val, metric.Counter.GetValue())
}
break
}
// Could not find a matching series.
if match == "" {
assert.Failf(t, "found unexpected series %q", series)
return false
}
// nolint:forcetypeassert // Already checked above.
target := results[match]
return metric.Counter.GetValue() == target
},
"coderd_notifications_retry_count": func(metric *dto.Metric, series string) bool {
assert.Truef(t, hasMatchingFingerprint(metric, methodTemplateFP), "found unexpected series %q", series)
if debug {
t.Logf("coderd_notifications_retry_count == %v: %v", maxAttempts-1, metric.Counter.GetValue())
}
// 1 original attempts + 2 retries = maxAttempts
return metric.Counter.GetValue() == maxAttempts-1
},
"coderd_notifications_queued_seconds": func(metric *dto.Metric, series string) bool {
assert.Truef(t, hasMatchingFingerprint(metric, methodFP), "found unexpected series %q", series)
if debug {
t.Logf("coderd_notifications_queued_seconds > 0: %v", metric.Histogram.GetSampleSum())
}
// Notifications will queue for a non-zero amount of time.
return metric.Histogram.GetSampleSum() > 0
},
"coderd_notifications_dispatcher_send_seconds": func(metric *dto.Metric, series string) bool {
assert.Truef(t, hasMatchingFingerprint(metric, methodFP), "found unexpected series %q", series)
if debug {
t.Logf("coderd_notifications_dispatcher_send_seconds > 0: %v", metric.Histogram.GetSampleSum())
}
// Dispatches should take a non-zero amount of time.
return metric.Histogram.GetSampleSum() > 0
},
"coderd_notifications_inflight_dispatches": func(metric *dto.Metric, series string) bool {
// This is a gauge, so it can be difficult to get the timing right to catch it.
// See TestInflightDispatchesMetric for a more precise test.
return true
},
"coderd_notifications_pending_updates": func(metric *dto.Metric, series string) bool {
// This is a gauge, so it can be difficult to get the timing right to catch it.
// See TestPendingUpdatesMetric for a more precise test.
return true
},
"coderd_notifications_synced_updates_total": func(metric *dto.Metric, series string) bool {
if debug {
t.Logf("coderd_notifications_synced_updates_total = %v: %v", maxAttempts+1, metric.Counter.GetValue())
}
// 1 message will exceed its maxAttempts, 1 will succeed on the first try.
return metric.Counter.GetValue() == maxAttempts+1
},
}
// WHEN: 2 notifications are enqueued, 1 of which will fail until its retries are exhausted, and another which will succeed
_, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") // this will succeed
require.NoError(t, err)
_, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "failure"}, "test2") // this will fail and retry (maxAttempts - 1) times
require.NoError(t, err)
mgr.Run(ctx)
// THEN: expect all the defined metrics to be present and have their expected values
require.EventuallyWithT(t, func(ct *assert.CollectT) {
handler.mu.RLock()
defer handler.mu.RUnlock()
gathered, err := reg.Gather()
assert.NoError(t, err)
succeeded := len(handler.succeeded)
failed := len(handler.failed)
if debug {
t.Logf("SUCCEEDED == 1: %v, FAILED == %v: %v\n", succeeded, maxAttempts, failed)
}
// Ensure that all metrics have a) the expected label combinations (series) and b) the expected values.
for _, family := range gathered {
hasExpectedValue, ok := expected[family.GetName()]
if !assert.Truef(ct, ok, "found unexpected metric family %q", family.GetName()) {
t.Logf("found unexpected metric family %q", family.GetName())
// Bail out fast if precondition is not met.
ct.FailNow()
}
for _, metric := range family.Metric {
assert.True(ct, hasExpectedValue(metric, metric.String()))
}
}
// One message will succeed.
assert.Equal(ct, succeeded, 1)
// One message will fail, and exhaust its maxAttempts.
assert.Equal(ct, failed, maxAttempts)
}, testutil.WaitShort, testutil.IntervalFast)
}
func TestPendingUpdatesMetric(t *testing.T) {
t.Parallel()
// SETUP
ctx, logger, store := setupInMemory(t)
reg := prometheus.NewRegistry()
metrics := notifications.NewMetrics(reg)
template := notifications.TemplateWorkspaceDeleted
const method = database.NotificationMethodSmtp
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
cfg := defaultNotificationsConfig(method)
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)
syncer := &syncInterceptor{Store: store}
interceptor := newUpdateSignallingInterceptor(syncer)
mgr, err := notifications.NewManager(cfg, interceptor, metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
handler := &fakeHandler{}
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
})
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
require.NoError(t, err)
user := createSampleUser(t, store)
// WHEN: 2 notifications are enqueued, one of which will fail and one which will succeed
_, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") // this will succeed
require.NoError(t, err)
_, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "failure"}, "test2") // this will fail and retry (maxAttempts - 1) times
require.NoError(t, err)
mgr.Run(ctx)
// THEN:
// Wait until the handler has dispatched the given notifications.
require.Eventually(t, func() bool {
handler.mu.RLock()
defer handler.mu.RUnlock()
return len(handler.succeeded) == 1 && len(handler.failed) == 1
}, testutil.WaitShort, testutil.IntervalFast)
// Wait until we intercept the calls to sync the pending updates to the store.
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
// Ensure that the value set in the metric is equivalent to the number of actual pending updates.
pending := promtest.ToFloat64(metrics.PendingUpdates)
require.EqualValues(t, pending, success+failure)
// Unpause the interceptor so the updates can proceed.
interceptor.proceed.Broadcast()
// Validate that the store synced the expected number of updates.
require.Eventually(t, func() bool {
return syncer.sent.Load() == 1 && syncer.failed.Load() == 1
}, testutil.WaitShort, testutil.IntervalFast)
// Wait for the updates to be synced and the metric to reflect that.
require.Eventually(t, func() bool {
return promtest.ToFloat64(metrics.PendingUpdates) == 0
}, testutil.WaitShort, testutil.IntervalFast)
}
func TestInflightDispatchesMetric(t *testing.T) {
t.Parallel()
// SETUP
ctx, logger, store := setupInMemory(t)
reg := prometheus.NewRegistry()
metrics := notifications.NewMetrics(reg)
template := notifications.TemplateWorkspaceDeleted
const method = database.NotificationMethodSmtp
// GIVEN: a notification manager whose dispatches are intercepted and delayed to measure the number of inflight requests
cfg := defaultNotificationsConfig(method)
cfg.LeaseCount = 10
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)
mgr, err := notifications.NewManager(cfg, store, metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
handler := &fakeHandler{}
// Delayer will delay all dispatches by 2x fetch intervals to ensure we catch the requests inflight.
delayer := newDelayingHandler(cfg.FetchInterval.Value()*2, handler)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: delayer,
})
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
require.NoError(t, err)
user := createSampleUser(t, store)
// WHEN: notifications are enqueued which will succeed (and be delayed during dispatch)
const msgCount = 2
for i := 0; i < msgCount; i++ {
_, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test")
require.NoError(t, err)
}
mgr.Run(ctx)
// THEN:
// Ensure we see the dispatches of the messages inflight.
require.Eventually(t, func() bool {
return promtest.ToFloat64(metrics.InflightDispatches.WithLabelValues(string(method), template.String())) == msgCount
}, testutil.WaitShort, testutil.IntervalFast)
// Wait until the handler has dispatched the given notifications.
require.Eventually(t, func() bool {
handler.mu.RLock()
defer handler.mu.RUnlock()
return len(handler.succeeded) == msgCount
}, testutil.WaitShort, testutil.IntervalFast)
// Wait for the updates to be synced and the metric to reflect that.
require.Eventually(t, func() bool {
return promtest.ToFloat64(metrics.InflightDispatches) == 0
}, testutil.WaitShort, testutil.IntervalFast)
}
// hasMatchingFingerprint checks if the given metric's series fingerprint matches the reference fingerprint.
func hasMatchingFingerprint(metric *dto.Metric, fp model.Fingerprint) bool {
return fingerprintLabelPairs(metric.Label) == fp
}
// fingerprintLabelPairs produces a fingerprint unique to the given combination of label pairs.
func fingerprintLabelPairs(lbs []*dto.LabelPair) model.Fingerprint {
pairs := make([]string, 0, len(lbs)*2)
for _, lp := range lbs {
pairs = append(pairs, lp.GetName(), lp.GetValue())
}
return fingerprintLabels(pairs...)
}
// fingerprintLabels produces a fingerprint unique to the given pairs of label values.
// MUST contain an even number of arguments (key:value), otherwise it will panic.
func fingerprintLabels(lbs ...string) model.Fingerprint {
if len(lbs)%2 != 0 {
panic("imbalanced set of label pairs given")
}
lbsSet := make(model.LabelSet, len(lbs)/2)
for i := 0; i < len(lbs); i += 2 {
k := lbs[i]
v := lbs[i+1]
lbsSet[model.LabelName(k)] = model.LabelValue(v)
}
return lbsSet.Fingerprint() // FastFingerprint does not sort the labels.
}
// updateSignallingInterceptor intercepts bulk update calls to the store, and waits on the "proceed" condition to be
// signaled by the caller so it can continue.
type updateSignallingInterceptor struct {
notifications.Store
proceed *sync.Cond
updateSuccess chan int
updateFailure chan int
}
func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor {
return &updateSignallingInterceptor{
Store: interceptor,
proceed: sync.NewCond(&sync.Mutex{}),
updateSuccess: make(chan int, 1),
updateFailure: make(chan int, 1),
}
}
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
u.updateSuccess <- len(arg.IDs)
u.proceed.L.Lock()
defer u.proceed.L.Unlock()
// Wait until signaled so we have a chance to read the number of pending updates.
u.proceed.Wait()
return u.Store.BulkMarkNotificationMessagesSent(ctx, arg)
}
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
u.updateFailure <- len(arg.IDs)
u.proceed.L.Lock()
defer u.proceed.L.Unlock()
// Wait until signaled so we have a chance to read the number of pending updates.
u.proceed.Wait()
return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
}
type delayingHandler struct {
h notifications.Handler
delay time.Duration
}
func newDelayingHandler(delay time.Duration, handler notifications.Handler) *delayingHandler {
return &delayingHandler{
delay: delay,
h: handler,
}
}
func (d *delayingHandler) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) {
deliverFn, err := d.h.Dispatcher(payload, title, body)
if err != nil {
return nil, err
}
return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) {
time.Sleep(d.delay)
return deliverFn(ctx, msgID)
}, nil
}
+24 -70
View File
@@ -7,13 +7,13 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"slices"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"github.com/google/uuid"
@@ -54,10 +54,10 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
// GIVEN: a manager with standard config but a faked dispatch handler
handler := &fakeHandler{}
interceptor := &bulkUpdateInterceptor{Store: db}
interceptor := &syncInterceptor{Store: db}
cfg := defaultNotificationsConfig(method)
cfg.RetryInterval = serpent.Duration(time.Hour) // Ensure retries don't interfere with the test
mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, interceptor, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
t.Cleanup(func() {
@@ -88,7 +88,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
require.Eventually(t, func() bool {
return interceptor.sent.Load() == 1 &&
interceptor.failed.Load() == 1
}, testutil.WaitShort, testutil.IntervalFast)
}, testutil.WaitLong, testutil.IntervalFast)
// THEN: we verify that the store contains notifications in their expected state
success, err := db.GetNotificationMessagesByStatus(ctx, database.GetNotificationMessagesByStatusParams{
@@ -131,7 +131,7 @@ func TestSMTPDispatch(t *testing.T) {
Hello: "localhost",
}
handler := newDispatchInterceptor(dispatch.NewSMTPHandler(cfg.SMTP, logger.Named("smtp")))
mgr, err := notifications.NewManager(cfg, db, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
t.Cleanup(func() {
@@ -192,7 +192,7 @@ func TestWebhookDispatch(t *testing.T) {
cfg.Webhook = codersdk.NotificationsWebhookConfig{
Endpoint: *serpent.URLOf(endpoint),
}
mgr, err := notifications.NewManager(cfg, db, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@@ -285,10 +285,10 @@ func TestBackpressure(t *testing.T) {
handler := newDispatchInterceptor(dispatch.NewWebhookHandler(cfg.Webhook, logger.Named("webhook")))
// Intercept calls to submit the buffered updates to the store.
storeInterceptor := &bulkUpdateInterceptor{Store: db}
storeInterceptor := &syncInterceptor{Store: db}
// GIVEN: a notification manager whose updates will be intercepted
mgr, err := notifications.NewManager(cfg, storeInterceptor, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, storeInterceptor, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
@@ -381,9 +381,9 @@ func TestRetries(t *testing.T) {
handler := newDispatchInterceptor(dispatch.NewWebhookHandler(cfg.Webhook, logger.Named("webhook")))
// Intercept calls to submit the buffered updates to the store.
storeInterceptor := &bulkUpdateInterceptor{Store: db}
storeInterceptor := &syncInterceptor{Store: db}
mgr, err := notifications.NewManager(cfg, storeInterceptor, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, storeInterceptor, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@@ -439,12 +439,12 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
cfg.LeasePeriod = serpent.Duration(leasePeriod)
cfg.DispatchTimeout = serpent.Duration(leasePeriod - time.Millisecond)
noopInterceptor := newNoopBulkUpdater(db)
noopInterceptor := newNoopStoreSyncer(db)
mgrCtx, cancelManagerCtx := context.WithCancel(context.Background())
t.Cleanup(cancelManagerCtx)
mgr, err := notifications.NewManager(cfg, noopInterceptor, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, noopInterceptor, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
require.NoError(t, err)
@@ -489,9 +489,9 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
// Start a new notification manager.
// Intercept calls to submit the buffered updates to the store.
storeInterceptor := &bulkUpdateInterceptor{Store: db}
storeInterceptor := &syncInterceptor{Store: db}
handler := newDispatchInterceptor(&fakeHandler{})
mgr, err = notifications.NewManager(cfg, storeInterceptor, logger.Named("manager"))
mgr, err = notifications.NewManager(cfg, storeInterceptor, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
@@ -532,7 +532,7 @@ func TestInvalidConfig(t *testing.T) {
cfg.DispatchTimeout = serpent.Duration(leasePeriod)
// WHEN: the manager is created with invalid config
_, err := notifications.NewManager(cfg, db, logger.Named("manager"))
_, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager"))
// THEN: the manager will fail to be created, citing invalid config as error
require.ErrorIs(t, err, notifications.ErrInvalidDispatchTimeout)
@@ -550,9 +550,7 @@ func TestNotifierPaused(t *testing.T) {
user := createSampleUser(t, db)
cfg := defaultNotificationsConfig(method)
fetchInterval := time.Nanosecond // Let
cfg.FetchInterval = *serpent.DurationOf(&fetchInterval)
mgr, err := notifications.NewManager(cfg, db, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, db, createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
t.Cleanup(func() {
@@ -604,10 +602,8 @@ func TestNotifierPaused(t *testing.T) {
}
type fakeHandler struct {
mu sync.RWMutex
succeeded []string
failed []string
mu sync.RWMutex
succeeded, failed []string
}
func (f *fakeHandler) Dispatcher(payload types.MessagePayload, _, _ string) (dispatch.DeliveryFunc, error) {
@@ -625,62 +621,20 @@ func (f *fakeHandler) Dispatcher(payload types.MessagePayload, _, _ string) (dis
}, nil
}
type dispatchInterceptor struct {
handler notifications.Handler
sent atomic.Int32
retryable atomic.Int32
unretryable atomic.Int32
err atomic.Int32
lastErr atomic.Value
}
func newDispatchInterceptor(h notifications.Handler) *dispatchInterceptor {
return &dispatchInterceptor{
handler: h,
}
}
func (i *dispatchInterceptor) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) {
return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) {
deliveryFn, err := i.handler.Dispatcher(payload, title, body)
if err != nil {
return false, err
}
retryable, err = deliveryFn(ctx, msgID)
if err != nil {
i.err.Add(1)
i.lastErr.Store(err)
}
switch {
case !retryable && err == nil:
i.sent.Add(1)
case retryable:
i.retryable.Add(1)
case !retryable && err != nil:
i.unretryable.Add(1)
}
return retryable, err
}, nil
}
// noopBulkUpdater pretends to perform bulk updates, but does not; leading to messages being stuck in "leased" state.
type noopBulkUpdater struct {
// noopStoreSyncer pretends to perform store syncs, but does not; leading to messages being stuck in "leased" state.
type noopStoreSyncer struct {
*acquireSignalingInterceptor
}
func newNoopBulkUpdater(db notifications.Store) *noopBulkUpdater {
return &noopBulkUpdater{newAcquireSignalingInterceptor(db)}
func newNoopStoreSyncer(db notifications.Store) *noopStoreSyncer {
return &noopStoreSyncer{newAcquireSignalingInterceptor(db)}
}
func (*noopBulkUpdater) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
func (*noopStoreSyncer) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
return int64(len(arg.IDs)), nil
}
func (*noopBulkUpdater) BulkMarkNotificationMessagesFailed(_ context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
func (*noopStoreSyncer) BulkMarkNotificationMessagesFailed(_ context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
return int64(len(arg.IDs)), nil
}
+58 -12
View File
@@ -33,10 +33,12 @@ type notifier struct {
quit chan any
done chan any
method database.NotificationMethod
handlers map[database.NotificationMethod]Handler
metrics *Metrics
}
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler) *notifier {
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, method database.NotificationMethod, metrics *Metrics) *notifier {
return &notifier{
id: id,
cfg: cfg,
@@ -46,6 +48,8 @@ func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger
tick: time.NewTicker(cfg.FetchInterval.Value()),
store: db,
handlers: hr,
method: method,
metrics: metrics,
}
}
@@ -99,8 +103,6 @@ func (n *notifier) run(ctx context.Context, success chan<- dispatchResult, failu
// ensureRunning checks if notifier is not paused.
func (n *notifier) ensureRunning(ctx context.Context) (bool, error) {
n.log.Debug(ctx, "check if notifier is paused")
settingsJSON, err := n.store.GetNotificationsSettings(ctx)
if err != nil {
return false, xerrors.Errorf("get notifications settings: %w", err)
@@ -129,14 +131,13 @@ func (n *notifier) ensureRunning(ctx context.Context) (bool, error) {
// resulting in a failed attempt for each notification when their contexts are canceled; this is not possible with the
// default configurations but could be brought about by an operator tuning things incorrectly.
func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, failure chan<- dispatchResult) error {
n.log.Debug(ctx, "attempting to dequeue messages")
msgs, err := n.fetch(ctx)
if err != nil {
return xerrors.Errorf("fetch messages: %w", err)
}
n.log.Debug(ctx, "dequeued messages", slog.F("count", len(msgs)))
if len(msgs) == 0 {
return nil
}
@@ -147,7 +148,9 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f
deliverFn, err := n.prepare(ctx, msg)
if err != nil {
n.log.Warn(ctx, "dispatcher construction failed", slog.F("msg_id", msg.ID), slog.Error(err))
failure <- newFailedDispatch(n.id, msg.ID, err, false)
failure <- n.newFailedDispatch(msg, err, false)
n.metrics.PendingUpdates.Set(float64(len(success) + len(failure)))
continue
}
@@ -162,7 +165,7 @@ func (n *notifier) process(ctx context.Context, success chan<- dispatchResult, f
return xerrors.Errorf("dispatch failed: %w", err)
}
n.log.Debug(ctx, "dispatch completed", slog.F("count", len(msgs)))
n.log.Debug(ctx, "batch completed", slog.F("count", len(msgs)))
return nil
}
@@ -228,9 +231,21 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
ctx, cancel := context.WithTimeout(ctx, n.cfg.DispatchTimeout.Value())
defer cancel()
logger := n.log.With(slog.F("msg_id", msg.ID), slog.F("method", msg.Method))
logger := n.log.With(slog.F("msg_id", msg.ID), slog.F("method", msg.Method), slog.F("attempt", msg.AttemptCount+1))
if msg.AttemptCount > 0 {
n.metrics.RetryCount.WithLabelValues(string(n.method), msg.TemplateID.String()).Inc()
}
n.metrics.InflightDispatches.WithLabelValues(string(n.method), msg.TemplateID.String()).Inc()
n.metrics.QueuedSeconds.WithLabelValues(string(n.method)).Observe(msg.QueuedSeconds)
start := time.Now()
retryable, err := deliver(ctx, msg.ID)
n.metrics.DispatcherSendSeconds.WithLabelValues(string(n.method)).Observe(time.Since(start).Seconds())
n.metrics.InflightDispatches.WithLabelValues(string(n.method), msg.TemplateID.String()).Dec()
if err != nil {
// Don't try to accumulate message responses if the context has been canceled.
//
@@ -248,24 +263,55 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
case <-ctx.Done():
logger.Warn(context.Background(), "cannot record dispatch failure result", slog.Error(ctx.Err()))
return ctx.Err()
default:
case failure <- n.newFailedDispatch(msg, err, retryable):
logger.Warn(ctx, "message dispatch failed", slog.Error(err))
failure <- newFailedDispatch(n.id, msg.ID, err, retryable)
}
} else {
select {
case <-ctx.Done():
logger.Warn(context.Background(), "cannot record dispatch success result", slog.Error(ctx.Err()))
return ctx.Err()
default:
case success <- n.newSuccessfulDispatch(msg):
logger.Debug(ctx, "message dispatch succeeded")
success <- newSuccessfulDispatch(n.id, msg.ID)
}
}
n.metrics.PendingUpdates.Set(float64(len(success) + len(failure)))
return nil
}
func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessagesRow) dispatchResult {
n.metrics.DispatchAttempts.WithLabelValues(string(n.method), msg.TemplateID.String(), ResultSuccess).Inc()
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: time.Now(),
}
}
// revive:disable-next-line:flag-parameter // Not used for control flow, rather just choosing which metric to increment.
func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow, err error, retryable bool) dispatchResult {
var result string
// If retryable and not the last attempt, it's a temporary failure.
if retryable && msg.AttemptCount < int32(n.cfg.MaxSendAttempts)-1 {
result = ResultTempFail
} else {
result = ResultPermFail
}
n.metrics.DispatchAttempts.WithLabelValues(string(n.method), msg.TemplateID.String(), result).Inc()
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: time.Now(),
err: err,
retryable: retryable,
}
}
// stop stops the notifier from processing any new notifications.
// This is a graceful stop, so any in-flight notifications will be completed before the notifier stops.
// Once a notifier has stopped, it cannot be restarted.
+1 -1
View File
@@ -18,7 +18,7 @@ type Store interface {
AcquireNotificationMessages(ctx context.Context, params database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error)
BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error)
BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error)
EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error)
EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) error
FetchNewMessageMetadata(ctx context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error)
GetNotificationMessagesByStatus(ctx context.Context, arg database.GetNotificationMessagesByStatusParams) ([]database.NotificationMessage, error)
GetNotificationsSettings(ctx context.Context) (string, error)
-1
View File
@@ -8,7 +8,6 @@ type MessagePayload struct {
Version string `json:"_version"`
NotificationName string `json:"notification_name"`
CreatedBy string `json:"created_by"`
UserID string `json:"user_id"`
UserEmail string `json:"user_email"`
+56 -6
View File
@@ -3,9 +3,12 @@ package notifications_test
import (
"context"
"database/sql"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"cdr.dev/slog"
@@ -17,6 +20,9 @@ import (
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbmem"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
"github.com/coder/coder/v2/coderd/notifications/types"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
)
@@ -57,13 +63,13 @@ func defaultNotificationsConfig(method database.NotificationMethod) codersdk.Not
return codersdk.NotificationsConfig{
Method: serpent.String(method),
MaxSendAttempts: 5,
RetryInterval: serpent.Duration(time.Minute * 5),
StoreSyncInterval: serpent.Duration(time.Second * 2),
StoreSyncBufferSize: 50,
LeasePeriod: serpent.Duration(time.Minute * 2),
FetchInterval: serpent.Duration(time.Millisecond * 100),
StoreSyncInterval: serpent.Duration(time.Millisecond * 200),
LeasePeriod: serpent.Duration(time.Second * 10),
DispatchTimeout: serpent.Duration(time.Second * 5),
RetryInterval: serpent.Duration(time.Millisecond * 50),
LeaseCount: 10,
FetchInterval: serpent.Duration(time.Second * 10),
DispatchTimeout: serpent.Duration(time.Minute),
StoreSyncBufferSize: 50,
SMTP: codersdk.NotificationsEmailConfig{},
Webhook: codersdk.NotificationsWebhookConfig{},
}
@@ -81,3 +87,47 @@ func createSampleUser(t *testing.T, db database.Store) database.User {
Username: "bob",
})
}
func createMetrics() *notifications.Metrics {
return notifications.NewMetrics(prometheus.NewRegistry())
}
type dispatchInterceptor struct {
handler notifications.Handler
sent atomic.Int32
retryable atomic.Int32
unretryable atomic.Int32
err atomic.Int32
lastErr atomic.Value
}
func newDispatchInterceptor(h notifications.Handler) *dispatchInterceptor {
return &dispatchInterceptor{handler: h}
}
func (i *dispatchInterceptor) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) {
return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) {
deliveryFn, err := i.handler.Dispatcher(payload, title, body)
if err != nil {
return false, err
}
retryable, err = deliveryFn(ctx, msgID)
if err != nil {
i.err.Add(1)
i.lastErr.Store(err)
}
switch {
case !retryable && err == nil:
i.sent.Add(1)
case retryable:
i.retryable.Add(1)
case !retryable && err != nil:
i.unretryable.Add(1)
}
return retryable, err
}, nil
}