mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
feat: add notification deduplication trigger (#14172)
This commit is contained in:
+1
-1
@@ -1005,7 +1005,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
|
||||
helpers := templateHelpers(options)
|
||||
|
||||
// The enqueuer is responsible for enqueueing notifications to the given store.
|
||||
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, helpers, logger.Named("notifications.enqueuer"))
|
||||
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, helpers, logger.Named("notifications.enqueuer"), quartz.NewReal())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
|
||||
}
|
||||
|
||||
Generated
+26
-1
@@ -223,6 +223,24 @@ CREATE TYPE workspace_transition AS ENUM (
|
||||
'delete'
|
||||
);
|
||||
|
||||
CREATE FUNCTION compute_notification_message_dedupe_hash() RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
NEW.dedupe_hash := MD5(CONCAT_WS(':',
|
||||
NEW.notification_template_id,
|
||||
NEW.user_id,
|
||||
NEW.method,
|
||||
NEW.payload::text,
|
||||
ARRAY_TO_STRING(NEW.targets, ','),
|
||||
DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text
|
||||
));
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$;
|
||||
|
||||
COMMENT ON FUNCTION compute_notification_message_dedupe_hash() IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day';
|
||||
|
||||
CREATE FUNCTION delete_deleted_oauth2_provider_app_token_api_key() RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
@@ -678,9 +696,12 @@ CREATE TABLE notification_messages (
|
||||
updated_at timestamp with time zone,
|
||||
leased_until timestamp with time zone,
|
||||
next_retry_after timestamp with time zone,
|
||||
queued_seconds double precision
|
||||
queued_seconds double precision,
|
||||
dedupe_hash text
|
||||
);
|
||||
|
||||
COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day';
|
||||
|
||||
CREATE TABLE notification_preferences (
|
||||
user_id uuid NOT NULL,
|
||||
notification_template_id uuid NOT NULL,
|
||||
@@ -1846,6 +1867,8 @@ CREATE UNIQUE INDEX idx_users_email ON users USING btree (email) WHERE (deleted
|
||||
|
||||
CREATE UNIQUE INDEX idx_users_username ON users USING btree (username) WHERE (deleted = false);
|
||||
|
||||
CREATE UNIQUE INDEX notification_messages_dedupe_hash_idx ON notification_messages USING btree (dedupe_hash);
|
||||
|
||||
CREATE UNIQUE INDEX organizations_single_default_org ON organizations USING btree (is_default) WHERE (is_default = true);
|
||||
|
||||
CREATE INDEX provisioner_job_logs_id_job_id_idx ON provisioner_job_logs USING btree (job_id, id);
|
||||
@@ -1918,6 +1941,8 @@ CREATE TRIGGER trigger_update_users AFTER INSERT OR UPDATE ON users FOR EACH ROW
|
||||
|
||||
CREATE TRIGGER trigger_upsert_user_links BEFORE INSERT OR UPDATE ON user_links FOR EACH ROW EXECUTE FUNCTION insert_user_links_fail_if_user_deleted();
|
||||
|
||||
CREATE TRIGGER update_notification_message_dedupe_hash BEFORE INSERT OR UPDATE ON notification_messages FOR EACH ROW EXECUTE FUNCTION compute_notification_message_dedupe_hash();
|
||||
|
||||
ALTER TABLE ONLY api_keys
|
||||
ADD CONSTRAINT api_keys_user_id_uuid_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
DROP TRIGGER IF EXISTS update_notification_message_dedupe_hash ON notification_messages;
|
||||
DROP FUNCTION IF EXISTS compute_notification_message_dedupe_hash();
|
||||
ALTER TABLE IF EXISTS notification_messages
|
||||
DROP COLUMN IF EXISTS dedupe_hash;
|
||||
@@ -0,0 +1,33 @@
|
||||
-- Add a column to store the hash.
|
||||
ALTER TABLE IF EXISTS notification_messages
|
||||
ADD COLUMN IF NOT EXISTS dedupe_hash TEXT NULL;
|
||||
|
||||
COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day';
|
||||
|
||||
-- Ensure that multiple notifications with identical hashes cannot be inserted into the table.
|
||||
CREATE UNIQUE INDEX ON notification_messages (dedupe_hash);
|
||||
|
||||
-- Computes a hash from all unique messages fields and the current day; this will help prevent duplicate messages from being sent within the same day.
|
||||
-- It is possible that a message could be sent at 23:59:59 and again at 00:00:00, but this should be good enough for now.
|
||||
-- This could have been a unique index, but we cannot immutably create an index on a timestamp with a timezone.
|
||||
CREATE OR REPLACE FUNCTION compute_notification_message_dedupe_hash() RETURNS TRIGGER AS
|
||||
$$
|
||||
BEGIN
|
||||
NEW.dedupe_hash := MD5(CONCAT_WS(':',
|
||||
NEW.notification_template_id,
|
||||
NEW.user_id,
|
||||
NEW.method,
|
||||
NEW.payload::text,
|
||||
ARRAY_TO_STRING(NEW.targets, ','),
|
||||
DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text
|
||||
));
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
COMMENT ON FUNCTION compute_notification_message_dedupe_hash IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day';
|
||||
CREATE TRIGGER update_notification_message_dedupe_hash
|
||||
BEFORE INSERT OR UPDATE
|
||||
ON notification_messages
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION compute_notification_message_dedupe_hash();
|
||||
@@ -2116,6 +2116,8 @@ type NotificationMessage struct {
|
||||
LeasedUntil sql.NullTime `db:"leased_until" json:"leased_until"`
|
||||
NextRetryAfter sql.NullTime `db:"next_retry_after" json:"next_retry_after"`
|
||||
QueuedSeconds sql.NullFloat64 `db:"queued_seconds" json:"queued_seconds"`
|
||||
// Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day
|
||||
DedupeHash sql.NullString `db:"dedupe_hash" json:"dedupe_hash"`
|
||||
}
|
||||
|
||||
type NotificationPreference struct {
|
||||
|
||||
@@ -3274,7 +3274,7 @@ WITH acquired AS (
|
||||
FOR UPDATE OF nm
|
||||
SKIP LOCKED
|
||||
LIMIT $4)
|
||||
RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds)
|
||||
RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds, dedupe_hash)
|
||||
SELECT
|
||||
-- message
|
||||
nm.id,
|
||||
@@ -3449,14 +3449,15 @@ func (q *sqlQuerier) DeleteOldNotificationMessages(ctx context.Context) error {
|
||||
}
|
||||
|
||||
const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :exec
|
||||
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
|
||||
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at)
|
||||
VALUES ($1,
|
||||
$2,
|
||||
$3,
|
||||
$4::notification_method,
|
||||
$5::jsonb,
|
||||
$6,
|
||||
$7)
|
||||
$7,
|
||||
$8)
|
||||
`
|
||||
|
||||
type EnqueueNotificationMessageParams struct {
|
||||
@@ -3467,6 +3468,7 @@ type EnqueueNotificationMessageParams struct {
|
||||
Payload json.RawMessage `db:"payload" json:"payload"`
|
||||
Targets []uuid.UUID `db:"targets" json:"targets"`
|
||||
CreatedBy string `db:"created_by" json:"created_by"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) error {
|
||||
@@ -3478,6 +3480,7 @@ func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg Enqueue
|
||||
arg.Payload,
|
||||
pq.Array(arg.Targets),
|
||||
arg.CreatedBy,
|
||||
arg.CreatedAt,
|
||||
)
|
||||
return err
|
||||
}
|
||||
@@ -3528,7 +3531,7 @@ func (q *sqlQuerier) FetchNewMessageMetadata(ctx context.Context, arg FetchNewMe
|
||||
}
|
||||
|
||||
const getNotificationMessagesByStatus = `-- name: GetNotificationMessagesByStatus :many
|
||||
SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds
|
||||
SELECT id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after, queued_seconds, dedupe_hash
|
||||
FROM notification_messages
|
||||
WHERE status = $1
|
||||
LIMIT $2::int
|
||||
@@ -3564,6 +3567,7 @@ func (q *sqlQuerier) GetNotificationMessagesByStatus(ctx context.Context, arg Ge
|
||||
&i.LeasedUntil,
|
||||
&i.NextRetryAfter,
|
||||
&i.QueuedSeconds,
|
||||
&i.DedupeHash,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -13,14 +13,15 @@ WHERE nt.id = @notification_template_id
|
||||
AND u.id = @user_id;
|
||||
|
||||
-- name: EnqueueNotificationMessage :exec
|
||||
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
|
||||
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at)
|
||||
VALUES (@id,
|
||||
@notification_template_id,
|
||||
@user_id,
|
||||
@method::notification_method,
|
||||
@payload::jsonb,
|
||||
@targets,
|
||||
@created_by);
|
||||
@created_by,
|
||||
@created_at);
|
||||
|
||||
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
|
||||
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
|
||||
|
||||
@@ -88,6 +88,7 @@ const (
|
||||
UniqueIndexProvisionerDaemonsOrgNameOwnerKey UniqueConstraint = "idx_provisioner_daemons_org_name_owner_key" // CREATE UNIQUE INDEX idx_provisioner_daemons_org_name_owner_key ON provisioner_daemons USING btree (organization_id, name, lower(COALESCE((tags ->> 'owner'::text), ''::text)));
|
||||
UniqueIndexUsersEmail UniqueConstraint = "idx_users_email" // CREATE UNIQUE INDEX idx_users_email ON users USING btree (email) WHERE (deleted = false);
|
||||
UniqueIndexUsersUsername UniqueConstraint = "idx_users_username" // CREATE UNIQUE INDEX idx_users_username ON users USING btree (username) WHERE (deleted = false);
|
||||
UniqueNotificationMessagesDedupeHashIndex UniqueConstraint = "notification_messages_dedupe_hash_idx" // CREATE UNIQUE INDEX notification_messages_dedupe_hash_idx ON notification_messages USING btree (dedupe_hash);
|
||||
UniqueOrganizationsSingleDefaultOrg UniqueConstraint = "organizations_single_default_org" // CREATE UNIQUE INDEX organizations_single_default_org ON organizations USING btree (is_default) WHERE (is_default = true);
|
||||
UniqueProvisionerKeysOrganizationIDNameIndex UniqueConstraint = "provisioner_keys_organization_id_name_idx" // CREATE UNIQUE INDEX provisioner_keys_organization_id_name_idx ON provisioner_keys USING btree (organization_id, lower((name)::text));
|
||||
UniqueTemplateUsageStatsStartTimeTemplateIDUserIDIndex UniqueConstraint = "template_usage_stats_start_time_template_id_user_id_idx" // CREATE UNIQUE INDEX template_usage_stats_start_time_template_id_user_id_idx ON template_usage_stats USING btree (start_time, template_id, user_id);
|
||||
|
||||
@@ -10,14 +10,19 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"github.com/coder/quartz"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/notifications/render"
|
||||
"github.com/coder/coder/v2/coderd/notifications/types"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
)
|
||||
|
||||
var ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
|
||||
var (
|
||||
ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
|
||||
ErrDuplicate = xerrors.New("duplicate notification")
|
||||
)
|
||||
|
||||
type StoreEnqueuer struct {
|
||||
store Store
|
||||
@@ -27,10 +32,12 @@ type StoreEnqueuer struct {
|
||||
// helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because
|
||||
// the template funcs will return values which are inappropriately encapsulated in this struct.
|
||||
helpers template.FuncMap
|
||||
// Used to manipulate time in tests.
|
||||
clock quartz.Clock
|
||||
}
|
||||
|
||||
// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
|
||||
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger) (*StoreEnqueuer, error) {
|
||||
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
|
||||
var method database.NotificationMethod
|
||||
if err := method.Scan(cfg.Method.String()); err != nil {
|
||||
return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method)
|
||||
@@ -41,6 +48,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
|
||||
log: log,
|
||||
defaultMethod: method,
|
||||
helpers: helpers,
|
||||
clock: clock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -81,6 +89,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
|
||||
Payload: input,
|
||||
Targets: targets,
|
||||
CreatedBy: createdBy,
|
||||
CreatedAt: dbtime.Time(s.clock.Now().UTC()),
|
||||
})
|
||||
if err != nil {
|
||||
// We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages
|
||||
@@ -92,6 +101,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
|
||||
return nil, ErrCannotEnqueueDisabledNotification
|
||||
}
|
||||
|
||||
// If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued
|
||||
// today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than
|
||||
// having each notification enqueue handle its own logic.
|
||||
if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) {
|
||||
return nil, ErrDuplicate
|
||||
}
|
||||
|
||||
s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
|
||||
return nil, xerrors.Errorf("enqueue notification: %w", err)
|
||||
}
|
||||
|
||||
@@ -12,13 +12,15 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/coder/quartz"
|
||||
"github.com/coder/serpent"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbgen"
|
||||
"github.com/coder/coder/v2/coderd/notifications"
|
||||
"github.com/coder/coder/v2/coderd/notifications/dispatch"
|
||||
"github.com/coder/coder/v2/coderd/notifications/types"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
"github.com/coder/serpent"
|
||||
)
|
||||
|
||||
func TestBufferedUpdates(t *testing.T) {
|
||||
@@ -39,7 +41,7 @@ func TestBufferedUpdates(t *testing.T) {
|
||||
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
|
||||
database.NotificationMethodSmtp: santa,
|
||||
})
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := dbgen.User(t, db, database.User{})
|
||||
@@ -127,7 +129,7 @@ func TestBuildPayload(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
// WHEN: a notification is enqueued
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/coder/quartz"
|
||||
|
||||
"github.com/coder/serpent"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
@@ -61,7 +63,7 @@ func TestMetrics(t *testing.T) {
|
||||
method: handler,
|
||||
})
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, store)
|
||||
@@ -228,7 +230,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
|
||||
method: handler,
|
||||
})
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, store)
|
||||
@@ -305,7 +307,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
|
||||
method: delayer,
|
||||
})
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, store)
|
||||
@@ -384,7 +386,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
|
||||
customMethod: webhookHandler,
|
||||
})
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, store)
|
||||
|
||||
@@ -21,6 +21,8 @@ import (
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/coder/quartz"
|
||||
|
||||
"github.com/google/uuid"
|
||||
smtpmock "github.com/mocktools/go-smtp-mock/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -71,7 +73,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
assert.NoError(t, mgr.Stop(ctx))
|
||||
})
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
@@ -145,7 +147,7 @@ func TestSMTPDispatch(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
assert.NoError(t, mgr.Stop(ctx))
|
||||
})
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
@@ -205,7 +207,7 @@ func TestWebhookDispatch(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
assert.NoError(t, mgr.Stop(ctx))
|
||||
})
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
const (
|
||||
@@ -301,7 +303,7 @@ func TestBackpressure(t *testing.T) {
|
||||
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
|
||||
require.NoError(t, err)
|
||||
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
@@ -399,7 +401,7 @@ func TestRetries(t *testing.T) {
|
||||
assert.NoError(t, mgr.Stop(ctx))
|
||||
})
|
||||
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
@@ -456,7 +458,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
|
||||
|
||||
mgr, err := notifications.NewManager(cfg, noopInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
|
||||
require.NoError(t, err)
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
@@ -464,7 +466,8 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
|
||||
// WHEN: a few notifications are enqueued which will all succeed
|
||||
var msgs []string
|
||||
for i := 0; i < msgCount; i++ {
|
||||
id, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "success"}, "test")
|
||||
id, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted,
|
||||
map[string]string{"type": "success", "index": fmt.Sprintf("%d", i)}, "test")
|
||||
require.NoError(t, err)
|
||||
msgs = append(msgs, id.String())
|
||||
}
|
||||
@@ -566,7 +569,7 @@ func TestNotifierPaused(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
assert.NoError(t, mgr.Stop(ctx))
|
||||
})
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
mgr.Run(ctx)
|
||||
@@ -821,7 +824,7 @@ func TestDisabledBeforeEnqueue(t *testing.T) {
|
||||
|
||||
// GIVEN: an enqueuer & a sample user
|
||||
cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
@@ -861,7 +864,7 @@ func TestDisabledAfterEnqueue(t *testing.T) {
|
||||
assert.NoError(t, mgr.Stop(ctx))
|
||||
})
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
@@ -967,7 +970,7 @@ func TestCustomNotificationMethod(t *testing.T) {
|
||||
_ = mgr.Stop(ctx)
|
||||
})
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger)
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger, quartz.NewReal())
|
||||
require.NoError(t, err)
|
||||
|
||||
// WHEN: a notification of that template is enqueued, it should be delivered with the configured method - not the default.
|
||||
@@ -1033,6 +1036,53 @@ func createOpts(t *testing.T) *coderdtest.Options {
|
||||
}
|
||||
}
|
||||
|
||||
// TestNotificationDuplicates validates that identical notifications cannot be sent on the same day.
|
||||
func TestNotificationDuplicates(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// SETUP
|
||||
if !dbtestutil.WillUsePostgres() {
|
||||
t.Skip("This test requires postgres; it is testing the dedupe hash trigger in the database")
|
||||
}
|
||||
|
||||
ctx, logger, db := setup(t)
|
||||
|
||||
method := database.NotificationMethodSmtp
|
||||
cfg := defaultNotificationsConfig(method)
|
||||
|
||||
mgr, err := notifications.NewManager(cfg, db, defaultHelpers(), createMetrics(), logger.Named("manager"))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
assert.NoError(t, mgr.Stop(ctx))
|
||||
})
|
||||
|
||||
// Set the time to a known value.
|
||||
mClock := quartz.NewMock(t)
|
||||
mClock.Set(time.Date(2024, 1, 15, 9, 0, 0, 0, time.UTC))
|
||||
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"), mClock)
|
||||
require.NoError(t, err)
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
// GIVEN: two notifications are enqueued with identical properties.
|
||||
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted,
|
||||
map[string]string{"initiator": "danny"}, "test", user.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// WHEN: the second is enqueued, the enqueuer will reject the request.
|
||||
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted,
|
||||
map[string]string{"initiator": "danny"}, "test", user.ID)
|
||||
require.ErrorIs(t, err, notifications.ErrDuplicate)
|
||||
|
||||
// THEN: when the clock is advanced 24h, the notification will be accepted.
|
||||
// NOTE: the time is used in the dedupe hash, so by advancing 24h we're creating a distinct notification from the one
|
||||
// which was enqueued "yesterday".
|
||||
mClock.Advance(time.Hour * 24)
|
||||
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted,
|
||||
map[string]string{"initiator": "danny"}, "test", user.ID)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
type fakeHandler struct {
|
||||
mu sync.RWMutex
|
||||
succeeded, failed []string
|
||||
|
||||
Reference in New Issue
Block a user