Files
coder/coderd/notifications/manager.go
Spike Curtis bddb808b25 chore: arrange imports in a standard way (#21452)
Fixes all our Go file imports to match the preferred spec that we've _mostly_ been using. For example:

```
import (
	"context"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"golang.org/x/xerrors"
	"gopkg.in/natefinch/lumberjack.v2"

	"cdr.dev/slog/v3"
	"github.com/coder/coder/v2/codersdk/agentsdk"
	"github.com/coder/serpent"
)
```

3 groups: standard library, 3rd partly libs, Coder libs.

This PR makes the change across the codebase. The PR in the stack above modifies our formatting to maintain this state of affairs, and is a separate PR so it's possible to review that one in detail.
2026-01-08 15:24:11 +04:00

410 lines
13 KiB
Go

package notifications
import (
"context"
"sync"
"text/template"
"time"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
"github.com/coder/coder/v2/coderd/pproflabel"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/quartz"
)
var ErrInvalidDispatchTimeout = xerrors.New("dispatch timeout must be less than lease period")
// Manager manages all notifications being enqueued and dispatched.
//
// Manager maintains a notifier: this consumes the queue of notification messages in the store.
//
// The notifier dequeues messages from the store _CODER_NOTIFICATIONS_LEASE_COUNT_ at a time and concurrently "dispatches"
// these messages, meaning they are sent by their respective methods (email, webhook, etc).
//
// To reduce load on the store, successful and failed dispatches are accumulated in two separate buffers (success/failure)
// of size CODER_NOTIFICATIONS_STORE_SYNC_INTERVAL in the Manager, and updates are sent to the store about which messages
// succeeded or failed every CODER_NOTIFICATIONS_STORE_SYNC_INTERVAL seconds.
// These buffers are limited in size, and naturally introduce some backpressure; if there are hundreds of messages to be
// sent but they start failing too quickly, the buffers (receive channels) will fill up and block senders, which will
// slow down the dispatch rate.
//
// NOTE: The above backpressure mechanism only works within the same process, which may not be true forever, such as if
// we split notifiers out into separate targets for greater processing throughput; in this case we will need an
// alternative mechanism for handling backpressure.
type Manager struct {
cfg codersdk.NotificationsConfig
store Store
log slog.Logger
handlers map[database.NotificationMethod]Handler
method database.NotificationMethod
helpers template.FuncMap
metrics *Metrics
success, failure chan dispatchResult
mu sync.Mutex // Protects following.
closed bool
notifier *notifier
runOnce sync.Once
stop chan any
done chan any
// clock is for testing only
clock quartz.Clock
}
type ManagerOption func(*Manager)
// WithTestClock is used in testing to set the quartz clock on the manager
func WithTestClock(clock quartz.Clock) ManagerOption {
return func(m *Manager) {
m.clock = clock
}
}
// NewManager instantiates a new Manager instance which coordinates notification enqueuing and delivery.
//
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
// access URL etc.
func NewManager(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) {
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
return nil, xerrors.Errorf("notification method %q is invalid", cfg.Method)
}
// If dispatch timeout exceeds lease period, it is possible that messages can be delivered in duplicate because the
// lease can expire before the notifier gives up on the dispatch, which results in the message becoming eligible for
// being re-acquired.
if cfg.DispatchTimeout.Value() >= cfg.LeasePeriod.Value() {
return nil, ErrInvalidDispatchTimeout
}
m := &Manager{
log: log,
cfg: cfg,
store: store,
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
//
// We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
// see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
// like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
// approach to this - but for now this will work fine.
success: make(chan dispatchResult, cfg.StoreSyncBufferSize),
failure: make(chan dispatchResult, cfg.StoreSyncBufferSize),
metrics: metrics,
method: method,
stop: make(chan any),
done: make(chan any),
handlers: defaultHandlers(cfg, log, store, ps),
helpers: helpers,
clock: quartz.NewReal(),
}
for _, o := range opts {
o(m)
}
return m, nil
}
// defaultHandlers builds a set of known handlers; panics if any error occurs as these handlers should be valid at compile time.
func defaultHandlers(cfg codersdk.NotificationsConfig, log slog.Logger, store Store, ps pubsub.Pubsub) map[database.NotificationMethod]Handler {
return map[database.NotificationMethod]Handler{
database.NotificationMethodSmtp: dispatch.NewSMTPHandler(cfg.SMTP, log.Named("dispatcher.smtp")),
database.NotificationMethodWebhook: dispatch.NewWebhookHandler(cfg.Webhook, log.Named("dispatcher.webhook")),
database.NotificationMethodInbox: dispatch.NewInboxHandler(log.Named("dispatcher.inbox"), store, ps),
}
}
// WithHandlers allows for tests to inject their own handlers to verify functionality.
func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {
m.handlers = reg
}
var ErrManagerAlreadyClosed = xerrors.New("manager already closed")
// Run initiates the control loop in the background, which spawns a given number of notifier goroutines.
// Manager requires system-level permissions to interact with the store.
// Run is only intended to be run once.
func (m *Manager) Run(ctx context.Context) {
m.log.Debug(ctx, "notification manager started")
m.runOnce.Do(func() {
// Closes when Stop() is called or context is canceled.
pproflabel.Go(ctx, pproflabel.Service(pproflabel.ServiceNotifications), func(ctx context.Context) {
err := m.loop(ctx)
if err != nil {
if xerrors.Is(err, ErrManagerAlreadyClosed) {
m.log.Warn(ctx, "notification manager stopped with error", slog.Error(err))
} else {
m.log.Error(ctx, "notification manager stopped with error", slog.Error(err))
}
}
})
})
}
// loop contains the main business logic of the notification manager. It is responsible for subscribing to notification
// events, creating a notifier, and publishing bulk dispatch result updates to the store.
func (m *Manager) loop(ctx context.Context) error {
defer func() {
close(m.done)
m.log.Debug(context.Background(), "notification manager stopped")
}()
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return ErrManagerAlreadyClosed
}
var eg errgroup.Group
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
eg.Go(func() error {
// run the notifier which will handle dequeueing and dispatching notifications.
return m.notifier.run(m.success, m.failure)
})
m.mu.Unlock()
// Periodically flush notification state changes to the store.
eg.Go(func() error {
// Every interval, collect the messages in the channels and bulk update them in the store.
tick := m.clock.NewTicker(m.cfg.StoreSyncInterval.Value(), "Manager", "storeSync")
defer tick.Stop()
for {
select {
case <-ctx.Done():
// Nothing we can do in this scenario except bail out; after the message lease expires, the messages will
// be requeued and users will receive duplicates.
// This is an explicit trade-off between keeping the database load light (by bulk-updating records) and
// exactly-once delivery.
//
// The current assumption is that duplicate delivery of these messages is, at worst, slightly annoying.
// If these notifications are triggering external actions (e.g. via webhooks) this could be more
// consequential, and we may need a more sophisticated mechanism.
//
// TODO: mention the above tradeoff in documentation.
m.log.Warn(ctx, "exiting ungracefully", slog.Error(ctx.Err()))
if len(m.success)+len(m.failure) > 0 {
m.log.Warn(ctx, "content canceled with pending updates in buffer, these messages will be sent again after lease expires",
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
}
return ctx.Err()
case <-m.stop:
if len(m.success)+len(m.failure) > 0 {
m.log.Warn(ctx, "flushing buffered updates before stop",
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
m.syncUpdates(ctx)
m.log.Warn(ctx, "flushing updates done")
}
return nil
case <-tick.C:
m.syncUpdates(ctx)
}
}
})
err := eg.Wait()
if err != nil {
m.log.Error(ctx, "manager loop exited with error", slog.Error(err))
}
return err
}
// BufferedUpdatesCount returns the number of buffered updates which are currently waiting to be flushed to the store.
// The returned values are for success & failure, respectively.
func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
return len(m.success), len(m.failure)
}
// syncUpdates updates messages in the store based on the given successful and failed message dispatch results.
func (m *Manager) syncUpdates(ctx context.Context) {
// Ensure we update the metrics to reflect the current state after each invocation.
defer func() {
m.metrics.PendingUpdates.Set(float64(len(m.success) + len(m.failure)))
}()
select {
case <-ctx.Done():
return
default:
}
nSuccess := len(m.success)
nFailure := len(m.failure)
m.metrics.PendingUpdates.Set(float64(nSuccess + nFailure))
// Nothing to do.
if nSuccess+nFailure == 0 {
return
}
var (
successParams database.BulkMarkNotificationMessagesSentParams
failureParams database.BulkMarkNotificationMessagesFailedParams
)
// Read all the existing messages due for update from the channel, but don't range over the channels because they
// block until they are closed.
//
// This is vulnerable to TOCTOU, but it's fine.
// If more items are added to the success or failure channels between measuring their lengths and now, those items
// will be processed on the next bulk update.
for i := 0; i < nSuccess; i++ {
res := <-m.success
successParams.IDs = append(successParams.IDs, res.msg)
successParams.SentAts = append(successParams.SentAts, res.ts)
}
for i := 0; i < nFailure; i++ {
res := <-m.failure
var (
reason string
status database.NotificationMessageStatus
)
switch {
case res.retryable:
status = database.NotificationMessageStatusTemporaryFailure
case res.inhibited:
status = database.NotificationMessageStatusInhibited
reason = "disabled by user"
default:
status = database.NotificationMessageStatusPermanentFailure
}
failureParams.IDs = append(failureParams.IDs, res.msg)
failureParams.FailedAts = append(failureParams.FailedAts, res.ts)
failureParams.Statuses = append(failureParams.Statuses, status)
if res.err != nil {
reason = res.err.Error()
}
failureParams.StatusReasons = append(failureParams.StatusReasons, reason)
}
// Execute bulk updates for success/failure concurrently.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if len(successParams.IDs) == 0 {
return
}
logger := m.log.With(slog.F("type", "update_sent"))
// Give up after waiting for the store for 30s.
uctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
n, err := m.store.BulkMarkNotificationMessagesSent(uctx, successParams)
if err != nil {
logger.Error(ctx, "bulk update failed", slog.Error(err))
return
}
m.metrics.SyncedUpdates.Add(float64(n))
logger.Debug(ctx, "bulk update completed", slog.F("updated", n))
}()
go func() {
defer wg.Done()
if len(failureParams.IDs) == 0 {
return
}
logger := m.log.With(slog.F("type", "update_failed"))
// Give up after waiting for the store for 30s.
uctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
// #nosec G115 - Safe conversion for max send attempts which is expected to be within int32 range
failureParams.MaxAttempts = int32(m.cfg.MaxSendAttempts)
failureParams.RetryInterval = int32(m.cfg.RetryInterval.Value().Seconds())
n, err := m.store.BulkMarkNotificationMessagesFailed(uctx, failureParams)
if err != nil {
logger.Error(ctx, "bulk update failed", slog.Error(err))
return
}
m.metrics.SyncedUpdates.Add(float64(n))
logger.Debug(ctx, "bulk update completed", slog.F("updated", n))
}()
wg.Wait()
}
// Stop stops the notifier and waits until it has stopped.
func (m *Manager) Stop(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return nil
}
m.closed = true
m.log.Debug(context.Background(), "graceful stop requested")
// If the notifier hasn't been started, we don't need to wait for anything.
// This is only really during testing when we want to enqueue messages only but not deliver them.
if m.notifier != nil {
m.notifier.stop()
}
// Signal the stop channel to cause loop to exit.
close(m.stop)
if m.notifier == nil {
return nil
}
m.mu.Unlock() // Unlock to avoid blocking loop.
defer m.mu.Lock() // Re-lock the mutex due to earlier defer.
// Wait for the manager loop to exit or the context to be canceled, whichever comes first.
select {
case <-ctx.Done():
var errStr string
if ctx.Err() != nil {
errStr = ctx.Err().Error()
}
// For some reason, slog.Error returns {} for a context error.
m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr))
return ctx.Err()
case <-m.done:
m.log.Debug(context.Background(), "gracefully stopped")
return nil
}
}
type dispatchResult struct {
notifier uuid.UUID
msg uuid.UUID
ts time.Time
err error
retryable bool
inhibited bool
}