chore: add usage tracking package (#19095)

Not used in coderd yet, see stack.

Adds two new packages:
- `coderd/usage`: provides an interface for the "Collector" as well as a stub implementation for AGPL
- `enterprise/coderd/usage`: provides an interface for the "Publisher" as well as a Tallyman implementation

Relates to https://github.com/coder/internal/issues/814
This commit is contained in:
Dean Sheather
2025-08-16 01:31:00 +10:00
committed by GitHub
parent e92af2b050
commit a25d85631b
36 changed files with 2069 additions and 17 deletions
+5 -1
View File
@@ -7,7 +7,6 @@ tailnet/proto/ @spikecurtis @johnstcn
vpn/vpn.proto @spikecurtis @johnstcn
vpn/version.go @spikecurtis @johnstcn
# This caching code is particularly tricky, and one must be very careful when
# altering it.
coderd/files/ @aslilac
@@ -34,3 +33,8 @@ site/CLAUDE.md
# requires elite ball knowledge of most of the scheduling code to make changes
# without inadvertently affecting other parts of the codebase.
coderd/schedule/autostop.go @deansheather @DanielleMaywood
# Usage tracking code requires intimate knowledge of Tallyman and Metronome, as
# well as guidance from revenue.
coderd/usage/ @deansheather @spikecurtis
enterprise/coderd/usage/ @deansheather @spikecurtis
+2
View File
@@ -15701,6 +15701,7 @@ const docTemplate = `{
"system",
"tailnet_coordinator",
"template",
"usage_event",
"user",
"user_secret",
"webpush_subscription",
@@ -15742,6 +15743,7 @@ const docTemplate = `{
"ResourceSystem",
"ResourceTailnetCoordinator",
"ResourceTemplate",
"ResourceUsageEvent",
"ResourceUser",
"ResourceUserSecret",
"ResourceWebpushSubscription",
+2
View File
@@ -14262,6 +14262,7 @@
"system",
"tailnet_coordinator",
"template",
"usage_event",
"user",
"user_secret",
"webpush_subscription",
@@ -14303,6 +14304,7 @@
"ResourceSystem",
"ResourceTailnetCoordinator",
"ResourceTemplate",
"ResourceUsageEvent",
"ResourceUser",
"ResourceUserSecret",
"ResourceWebpushSubscription",
+1
View File
@@ -9,6 +9,7 @@ const (
CheckOneTimePasscodeSet CheckConstraint = "one_time_passcode_set" // users
CheckMaxProvisionerLogsLength CheckConstraint = "max_provisioner_logs_length" // provisioner_jobs
CheckValidationMonotonicOrder CheckConstraint = "validation_monotonic_order" // template_version_parameters
CheckUsageEventTypeCheck CheckConstraint = "usage_event_type_check" // usage_events
CheckMaxLogsLength CheckConstraint = "max_logs_length" // workspace_agents
CheckSubsystemsNotNone CheckConstraint = "subsystems_not_none" // workspace_agents
CheckWorkspaceBuildsAiTaskSidebarAppIDRequired CheckConstraint = "workspace_builds_ai_task_sidebar_app_id_required" // workspace_builds
+49
View File
@@ -509,6 +509,25 @@ var (
}),
Scope: rbac.ScopeAll,
}.WithCachedASTValue()
subjectUsageTracker = rbac.Subject{
Type: rbac.SubjectTypeUsageTracker,
FriendlyName: "Usage Tracker",
ID: uuid.Nil.String(),
Roles: rbac.Roles([]rbac.Role{
{
Identifier: rbac.RoleIdentifier{Name: "usage-tracker"},
DisplayName: "Usage Tracker",
Site: rbac.Permissions(map[string][]policy.Action{
rbac.ResourceLicense.Type: {policy.ActionRead},
rbac.ResourceUsageEvent.Type: {policy.ActionCreate, policy.ActionRead, policy.ActionUpdate},
}),
Org: map[string][]rbac.Permission{},
User: []rbac.Permission{},
},
}),
Scope: rbac.ScopeAll,
}.WithCachedASTValue()
)
// AsProvisionerd returns a context with an actor that has permissions required
@@ -579,10 +598,18 @@ func AsPrebuildsOrchestrator(ctx context.Context) context.Context {
return As(ctx, subjectPrebuildsOrchestrator)
}
// AsFileReader returns a context with an actor that has permissions required
// for reading all files.
func AsFileReader(ctx context.Context) context.Context {
return As(ctx, subjectFileReader)
}
// AsUsageTracker returns a context with an actor that has permissions required
// for creating, reading, and updating usage events.
func AsUsageTracker(ctx context.Context) context.Context {
return As(ctx, subjectUsageTracker)
}
var AsRemoveActor = rbac.Subject{
ID: "remove-actor",
}
@@ -3951,6 +3978,13 @@ func (q *querier) InsertTemplateVersionWorkspaceTag(ctx context.Context, arg dat
return q.db.InsertTemplateVersionWorkspaceTag(ctx, arg)
}
func (q *querier) InsertUsageEvent(ctx context.Context, arg database.InsertUsageEventParams) error {
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceUsageEvent); err != nil {
return err
}
return q.db.InsertUsageEvent(ctx, arg)
}
func (q *querier) InsertUser(ctx context.Context, arg database.InsertUserParams) (database.User, error) {
// Always check if the assigned roles can actually be assigned by this actor.
impliedRoles := append([]rbac.RoleIdentifier{rbac.RoleMember()}, q.convertToDeploymentRoles(arg.RBACRoles)...)
@@ -4306,6 +4340,14 @@ func (q *querier) RevokeDBCryptKey(ctx context.Context, activeKeyDigest string)
return q.db.RevokeDBCryptKey(ctx, activeKeyDigest)
}
func (q *querier) SelectUsageEventsForPublishing(ctx context.Context, arg time.Time) ([]database.UsageEvent, error) {
// ActionUpdate because we're updating the publish_started_at column.
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceUsageEvent); err != nil {
return nil, err
}
return q.db.SelectUsageEventsForPublishing(ctx, arg)
}
func (q *querier) TryAcquireLock(ctx context.Context, id int64) (bool, error) {
return q.db.TryAcquireLock(ctx, id)
}
@@ -4787,6 +4829,13 @@ func (q *querier) UpdateTemplateWorkspacesLastUsedAt(ctx context.Context, arg da
return fetchAndExec(q.log, q.auth, policy.ActionUpdate, fetch, q.db.UpdateTemplateWorkspacesLastUsedAt)(ctx, arg)
}
func (q *querier) UpdateUsageEventsPostPublish(ctx context.Context, arg database.UpdateUsageEventsPostPublishParams) error {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceUsageEvent); err != nil {
return err
}
return q.db.UpdateUsageEventsPostPublish(ctx, arg)
}
func (q *querier) UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error {
return deleteQ(q.log, q.auth, q.db.GetUserByID, q.db.UpdateUserDeletedByID)(ctx, id)
}
+31
View File
@@ -5666,3 +5666,34 @@ func (s *MethodTestSuite) TestUserSecrets() {
Asserts(userSecret, policy.ActionRead, userSecret, policy.ActionDelete)
}))
}
func (s *MethodTestSuite) TestUsageEvents() {
s.Run("InsertUsageEvent", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
params := database.InsertUsageEventParams{
ID: "1",
EventType: "dc_managed_agents_v1",
EventData: []byte("{}"),
CreatedAt: dbtime.Now(),
}
db.EXPECT().InsertUsageEvent(gomock.Any(), params).Return(nil)
check.Args(params).Asserts(rbac.ResourceUsageEvent, policy.ActionCreate)
}))
s.Run("SelectUsageEventsForPublishing", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
now := dbtime.Now()
db.EXPECT().SelectUsageEventsForPublishing(gomock.Any(), now).Return([]database.UsageEvent{}, nil)
check.Args(now).Asserts(rbac.ResourceUsageEvent, policy.ActionUpdate)
}))
s.Run("UpdateUsageEventsPostPublish", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
now := dbtime.Now()
params := database.UpdateUsageEventsPostPublishParams{
Now: now,
IDs: []string{"1", "2"},
FailureMessages: []string{"error", "error"},
SetPublishedAts: []bool{false, false},
}
db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), params).Return(nil)
check.Args(params).Asserts(rbac.ResourceUsageEvent, policy.ActionUpdate)
}))
}
+21
View File
@@ -2392,6 +2392,13 @@ func (m queryMetricsStore) InsertTemplateVersionWorkspaceTag(ctx context.Context
return r0, r1
}
func (m queryMetricsStore) InsertUsageEvent(ctx context.Context, arg database.InsertUsageEventParams) error {
start := time.Now()
r0 := m.s.InsertUsageEvent(ctx, arg)
m.queryLatencies.WithLabelValues("InsertUsageEvent").Observe(time.Since(start).Seconds())
return r0
}
func (m queryMetricsStore) InsertUser(ctx context.Context, arg database.InsertUserParams) (database.User, error) {
start := time.Now()
user, err := m.s.InsertUser(ctx, arg)
@@ -2651,6 +2658,13 @@ func (m queryMetricsStore) RevokeDBCryptKey(ctx context.Context, activeKeyDigest
return r0
}
func (m queryMetricsStore) SelectUsageEventsForPublishing(ctx context.Context, arg time.Time) ([]database.UsageEvent, error) {
start := time.Now()
r0, r1 := m.s.SelectUsageEventsForPublishing(ctx, arg)
m.queryLatencies.WithLabelValues("SelectUsageEventsForPublishing").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m queryMetricsStore) TryAcquireLock(ctx context.Context, pgTryAdvisoryXactLock int64) (bool, error) {
start := time.Now()
ok, err := m.s.TryAcquireLock(ctx, pgTryAdvisoryXactLock)
@@ -2938,6 +2952,13 @@ func (m queryMetricsStore) UpdateTemplateWorkspacesLastUsedAt(ctx context.Contex
return r0
}
func (m queryMetricsStore) UpdateUsageEventsPostPublish(ctx context.Context, arg database.UpdateUsageEventsPostPublishParams) error {
start := time.Now()
r0 := m.s.UpdateUsageEventsPostPublish(ctx, arg)
m.queryLatencies.WithLabelValues("UpdateUsageEventsPostPublish").Observe(time.Since(start).Seconds())
return r0
}
func (m queryMetricsStore) UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error {
start := time.Now()
r0 := m.s.UpdateUserDeletedByID(ctx, id)
+43
View File
@@ -5107,6 +5107,20 @@ func (mr *MockStoreMockRecorder) InsertTemplateVersionWorkspaceTag(ctx, arg any)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertTemplateVersionWorkspaceTag", reflect.TypeOf((*MockStore)(nil).InsertTemplateVersionWorkspaceTag), ctx, arg)
}
// InsertUsageEvent mocks base method.
func (m *MockStore) InsertUsageEvent(ctx context.Context, arg database.InsertUsageEventParams) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "InsertUsageEvent", ctx, arg)
ret0, _ := ret[0].(error)
return ret0
}
// InsertUsageEvent indicates an expected call of InsertUsageEvent.
func (mr *MockStoreMockRecorder) InsertUsageEvent(ctx, arg any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertUsageEvent", reflect.TypeOf((*MockStore)(nil).InsertUsageEvent), ctx, arg)
}
// InsertUser mocks base method.
func (m *MockStore) InsertUser(ctx context.Context, arg database.InsertUserParams) (database.User, error) {
m.ctrl.T.Helper()
@@ -5682,6 +5696,21 @@ func (mr *MockStoreMockRecorder) RevokeDBCryptKey(ctx, activeKeyDigest any) *gom
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RevokeDBCryptKey", reflect.TypeOf((*MockStore)(nil).RevokeDBCryptKey), ctx, activeKeyDigest)
}
// SelectUsageEventsForPublishing mocks base method.
func (m *MockStore) SelectUsageEventsForPublishing(ctx context.Context, now time.Time) ([]database.UsageEvent, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SelectUsageEventsForPublishing", ctx, now)
ret0, _ := ret[0].([]database.UsageEvent)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SelectUsageEventsForPublishing indicates an expected call of SelectUsageEventsForPublishing.
func (mr *MockStoreMockRecorder) SelectUsageEventsForPublishing(ctx, now any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectUsageEventsForPublishing", reflect.TypeOf((*MockStore)(nil).SelectUsageEventsForPublishing), ctx, now)
}
// TryAcquireLock mocks base method.
func (m *MockStore) TryAcquireLock(ctx context.Context, pgTryAdvisoryXactLock int64) (bool, error) {
m.ctrl.T.Helper()
@@ -6270,6 +6299,20 @@ func (mr *MockStoreMockRecorder) UpdateTemplateWorkspacesLastUsedAt(ctx, arg any
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTemplateWorkspacesLastUsedAt", reflect.TypeOf((*MockStore)(nil).UpdateTemplateWorkspacesLastUsedAt), ctx, arg)
}
// UpdateUsageEventsPostPublish mocks base method.
func (m *MockStore) UpdateUsageEventsPostPublish(ctx context.Context, arg database.UpdateUsageEventsPostPublishParams) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateUsageEventsPostPublish", ctx, arg)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateUsageEventsPostPublish indicates an expected call of UpdateUsageEventsPostPublish.
func (mr *MockStoreMockRecorder) UpdateUsageEventsPostPublish(ctx, arg any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateUsageEventsPostPublish", reflect.TypeOf((*MockStore)(nil).UpdateUsageEventsPostPublish), ctx, arg)
}
// UpdateUserDeletedByID mocks base method.
func (m *MockStore) UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error {
m.ctrl.T.Helper()
+30
View File
@@ -1832,6 +1832,31 @@ CREATE VIEW template_with_names AS
COMMENT ON VIEW template_with_names IS 'Joins in the display name information such as username, avatar, and organization name.';
CREATE TABLE usage_events (
id text NOT NULL,
event_type text NOT NULL,
event_data jsonb NOT NULL,
created_at timestamp with time zone NOT NULL,
publish_started_at timestamp with time zone,
published_at timestamp with time zone,
failure_message text,
CONSTRAINT usage_event_type_check CHECK ((event_type = 'dc_managed_agents_v1'::text))
);
COMMENT ON TABLE usage_events IS 'usage_events contains usage data that is collected from the product and potentially shipped to the usage collector service.';
COMMENT ON COLUMN usage_events.id IS 'For "discrete" event types, this is a random UUID. For "heartbeat" event types, this is a combination of the event type and a truncated timestamp.';
COMMENT ON COLUMN usage_events.event_type IS 'The usage event type with version. "dc" means "discrete" (e.g. a single event, for counters), "hb" means "heartbeat" (e.g. a recurring event that contains a total count of usage generated from the database, for gauges).';
COMMENT ON COLUMN usage_events.event_data IS 'Event payload. Determined by the matching usage struct for this event type.';
COMMENT ON COLUMN usage_events.publish_started_at IS 'Set to a timestamp while the event is being published by a Coder replica to the usage collector service. Used to avoid duplicate publishes by multiple replicas. Timestamps older than 1 hour are considered expired.';
COMMENT ON COLUMN usage_events.published_at IS 'Set to a timestamp when the event is successfully (or permanently unsuccessfully) published to the usage collector service. If set, the event should never be attempted to be published again.';
COMMENT ON COLUMN usage_events.failure_message IS 'Set to an error message when the event is temporarily or permanently unsuccessfully published to the usage collector service.';
CREATE TABLE user_configs (
user_id uuid NOT NULL,
key character varying(256) NOT NULL,
@@ -2681,6 +2706,9 @@ ALTER TABLE ONLY template_versions
ALTER TABLE ONLY templates
ADD CONSTRAINT templates_pkey PRIMARY KEY (id);
ALTER TABLE ONLY usage_events
ADD CONSTRAINT usage_events_pkey PRIMARY KEY (id);
ALTER TABLE ONLY user_configs
ADD CONSTRAINT user_configs_pkey PRIMARY KEY (user_id, key);
@@ -2849,6 +2877,8 @@ CREATE INDEX idx_template_versions_has_ai_task ON template_versions USING btree
CREATE UNIQUE INDEX idx_unique_preset_name ON template_version_presets USING btree (name, template_version_id);
CREATE INDEX idx_usage_events_select_for_publishing ON usage_events USING btree (published_at, publish_started_at, created_at);
CREATE INDEX idx_user_deleted_deleted_at ON user_deleted USING btree (deleted_at);
CREATE INDEX idx_user_status_changes_changed_at ON user_status_changes USING btree (changed_at);
@@ -0,0 +1 @@
DROP TABLE usage_events;
@@ -0,0 +1,25 @@
CREATE TABLE usage_events (
id TEXT PRIMARY KEY,
-- We use a TEXT column with a CHECK constraint rather than an enum because of
-- the limitations with adding new values to an enum and using them in the
-- same transaction.
event_type TEXT NOT NULL CONSTRAINT usage_event_type_check CHECK (event_type IN ('dc_managed_agents_v1')),
event_data JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
publish_started_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
published_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
failure_message TEXT DEFAULT NULL
);
COMMENT ON TABLE usage_events IS 'usage_events contains usage data that is collected from the product and potentially shipped to the usage collector service.';
COMMENT ON COLUMN usage_events.id IS 'For "discrete" event types, this is a random UUID. For "heartbeat" event types, this is a combination of the event type and a truncated timestamp.';
COMMENT ON COLUMN usage_events.event_type IS 'The usage event type with version. "dc" means "discrete" (e.g. a single event, for counters), "hb" means "heartbeat" (e.g. a recurring event that contains a total count of usage generated from the database, for gauges).';
COMMENT ON COLUMN usage_events.event_data IS 'Event payload. Determined by the matching usage struct for this event type.';
COMMENT ON COLUMN usage_events.publish_started_at IS 'Set to a timestamp while the event is being published by a Coder replica to the usage collector service. Used to avoid duplicate publishes by multiple replicas. Timestamps older than 1 hour are considered expired.';
COMMENT ON COLUMN usage_events.published_at IS 'Set to a timestamp when the event is successfully (or permanently unsuccessfully) published to the usage collector service. If set, the event should never be attempted to be published again.';
COMMENT ON COLUMN usage_events.failure_message IS 'Set to an error message when the event is temporarily or permanently unsuccessfully published to the usage collector service.';
-- Create an index with all three fields used by the
-- SelectUsageEventsForPublishing query.
CREATE INDEX idx_usage_events_select_for_publishing
ON usage_events (published_at, publish_started_at, created_at);
@@ -0,0 +1,60 @@
INSERT INTO usage_events (
id,
event_type,
event_data,
created_at,
publish_started_at,
published_at,
failure_message
)
VALUES
-- Unpublished dc_managed_agents_v1 event.
(
'event1',
'dc_managed_agents_v1',
'{"count":1}',
'2023-01-01 00:00:00+00',
NULL,
NULL,
NULL
),
-- Successfully published dc_managed_agents_v1 event.
(
'event2',
'dc_managed_agents_v1',
'{"count":2}',
'2023-01-01 00:00:00+00',
NULL,
'2023-01-01 00:00:02+00',
NULL
),
-- Publish in progress dc_managed_agents_v1 event.
(
'event3',
'dc_managed_agents_v1',
'{"count":3}',
'2023-01-01 00:00:00+00',
'2023-01-01 00:00:01+00',
NULL,
NULL
),
-- Temporarily failed to publish dc_managed_agents_v1 event.
(
'event4',
'dc_managed_agents_v1',
'{"count":4}',
'2023-01-01 00:00:00+00',
NULL,
NULL,
'publish failed temporarily'
),
-- Permanently failed to publish dc_managed_agents_v1 event.
(
'event5',
'dc_managed_agents_v1',
'{"count":5}',
'2023-01-01 00:00:00+00',
NULL,
'2023-01-01 00:00:02+00',
'publish failed permanently'
)
+17
View File
@@ -3759,6 +3759,23 @@ type TemplateVersionWorkspaceTag struct {
Value string `db:"value" json:"value"`
}
// usage_events contains usage data that is collected from the product and potentially shipped to the usage collector service.
type UsageEvent struct {
// For "discrete" event types, this is a random UUID. For "heartbeat" event types, this is a combination of the event type and a truncated timestamp.
ID string `db:"id" json:"id"`
// The usage event type with version. "dc" means "discrete" (e.g. a single event, for counters), "hb" means "heartbeat" (e.g. a recurring event that contains a total count of usage generated from the database, for gauges).
EventType string `db:"event_type" json:"event_type"`
// Event payload. Determined by the matching usage struct for this event type.
EventData json.RawMessage `db:"event_data" json:"event_data"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
// Set to a timestamp while the event is being published by a Coder replica to the usage collector service. Used to avoid duplicate publishes by multiple replicas. Timestamps older than 1 hour are considered expired.
PublishStartedAt sql.NullTime `db:"publish_started_at" json:"publish_started_at"`
// Set to a timestamp when the event is successfully (or permanently unsuccessfully) published to the usage collector service. If set, the event should never be attempted to be published again.
PublishedAt sql.NullTime `db:"published_at" json:"published_at"`
// Set to an error message when the event is temporarily or permanently unsuccessfully published to the usage collector service.
FailureMessage sql.NullString `db:"failure_message" json:"failure_message"`
}
type User struct {
ID uuid.UUID `db:"id" json:"id"`
Email string `db:"email" json:"email"`
+9
View File
@@ -522,6 +522,9 @@ type sqlcQuerier interface {
InsertTemplateVersionTerraformValuesByJobID(ctx context.Context, arg InsertTemplateVersionTerraformValuesByJobIDParams) error
InsertTemplateVersionVariable(ctx context.Context, arg InsertTemplateVersionVariableParams) (TemplateVersionVariable, error)
InsertTemplateVersionWorkspaceTag(ctx context.Context, arg InsertTemplateVersionWorkspaceTagParams) (TemplateVersionWorkspaceTag, error)
// Duplicate events are ignored intentionally to allow for multiple replicas to
// publish heartbeat events.
InsertUsageEvent(ctx context.Context, arg InsertUsageEventParams) error
InsertUser(ctx context.Context, arg InsertUserParams) (User, error)
// InsertUserGroupsByID adds a user to all provided groups, if they exist.
// If there is a conflict, the user is already a member
@@ -568,6 +571,11 @@ type sqlcQuerier interface {
RemoveUserFromAllGroups(ctx context.Context, userID uuid.UUID) error
RemoveUserFromGroups(ctx context.Context, arg RemoveUserFromGroupsParams) ([]uuid.UUID, error)
RevokeDBCryptKey(ctx context.Context, activeKeyDigest string) error
// Note that this selects from the CTE, not the original table. The CTE is named
// the same as the original table to trick sqlc into reusing the existing struct
// for the table.
// The CTE and the reorder is required because UPDATE doesn't guarantee order.
SelectUsageEventsForPublishing(ctx context.Context, now time.Time) ([]UsageEvent, error)
// Non blocking lock. Returns true if the lock was acquired, false otherwise.
//
// This must be called from within a transaction. The lock will be automatically
@@ -614,6 +622,7 @@ type sqlcQuerier interface {
UpdateTemplateVersionDescriptionByJobID(ctx context.Context, arg UpdateTemplateVersionDescriptionByJobIDParams) error
UpdateTemplateVersionExternalAuthProvidersByJobID(ctx context.Context, arg UpdateTemplateVersionExternalAuthProvidersByJobIDParams) error
UpdateTemplateWorkspacesLastUsedAt(ctx context.Context, arg UpdateTemplateWorkspacesLastUsedAtParams) error
UpdateUsageEventsPostPublish(ctx context.Context, arg UpdateUsageEventsPostPublishParams) error
UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error
UpdateUserGithubComUserID(ctx context.Context, arg UpdateUserGithubComUserIDParams) error
UpdateUserHashedOneTimePasscode(ctx context.Context, arg UpdateUserHashedOneTimePasscodeParams) error
+155
View File
@@ -13519,6 +13519,161 @@ func (q *sqlQuerier) DisableForeignKeysAndTriggers(ctx context.Context) error {
return err
}
const insertUsageEvent = `-- name: InsertUsageEvent :exec
INSERT INTO
usage_events (
id,
event_type,
event_data,
created_at,
publish_started_at,
published_at,
failure_message
)
VALUES
($1, $2, $3, $4, NULL, NULL, NULL)
ON CONFLICT (id) DO NOTHING
`
type InsertUsageEventParams struct {
ID string `db:"id" json:"id"`
EventType string `db:"event_type" json:"event_type"`
EventData json.RawMessage `db:"event_data" json:"event_data"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
}
// Duplicate events are ignored intentionally to allow for multiple replicas to
// publish heartbeat events.
func (q *sqlQuerier) InsertUsageEvent(ctx context.Context, arg InsertUsageEventParams) error {
_, err := q.db.ExecContext(ctx, insertUsageEvent,
arg.ID,
arg.EventType,
arg.EventData,
arg.CreatedAt,
)
return err
}
const selectUsageEventsForPublishing = `-- name: SelectUsageEventsForPublishing :many
WITH usage_events AS (
UPDATE
usage_events
SET
publish_started_at = $1::timestamptz
WHERE
id IN (
SELECT
potential_event.id
FROM
usage_events potential_event
WHERE
-- Do not publish events that have already been published or
-- have permanently failed to publish.
potential_event.published_at IS NULL
-- Do not publish events that are already being published by
-- another replica.
AND (
potential_event.publish_started_at IS NULL
-- If the event has publish_started_at set, it must be older
-- than an hour ago. This is so we can retry publishing
-- events where the replica exited or couldn't update the
-- row.
-- The parenthesis around @now::timestamptz are necessary to
-- avoid sqlc from generating an extra argument.
OR potential_event.publish_started_at < ($1::timestamptz) - INTERVAL '1 hour'
)
-- Do not publish events older than 30 days. Tallyman will
-- always permanently reject these events anyways. This is to
-- avoid duplicate events being billed to customers, as
-- Metronome will only deduplicate events within 34 days.
-- Also, the same parenthesis thing here as above.
AND potential_event.created_at > ($1::timestamptz) - INTERVAL '30 days'
ORDER BY potential_event.created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 100
)
RETURNING id, event_type, event_data, created_at, publish_started_at, published_at, failure_message
)
SELECT id, event_type, event_data, created_at, publish_started_at, published_at, failure_message
FROM usage_events
ORDER BY created_at ASC
`
// Note that this selects from the CTE, not the original table. The CTE is named
// the same as the original table to trick sqlc into reusing the existing struct
// for the table.
// The CTE and the reorder is required because UPDATE doesn't guarantee order.
func (q *sqlQuerier) SelectUsageEventsForPublishing(ctx context.Context, now time.Time) ([]UsageEvent, error) {
rows, err := q.db.QueryContext(ctx, selectUsageEventsForPublishing, now)
if err != nil {
return nil, err
}
defer rows.Close()
var items []UsageEvent
for rows.Next() {
var i UsageEvent
if err := rows.Scan(
&i.ID,
&i.EventType,
&i.EventData,
&i.CreatedAt,
&i.PublishStartedAt,
&i.PublishedAt,
&i.FailureMessage,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const updateUsageEventsPostPublish = `-- name: UpdateUsageEventsPostPublish :exec
UPDATE
usage_events
SET
publish_started_at = NULL,
published_at = CASE WHEN input.set_published_at THEN $1::timestamptz ELSE NULL END,
failure_message = NULLIF(input.failure_message, '')
FROM (
SELECT
UNNEST($2::text[]) AS id,
UNNEST($3::text[]) AS failure_message,
UNNEST($4::boolean[]) AS set_published_at
) input
WHERE
input.id = usage_events.id
-- If the number of ids, failure messages, and set published ats are not the
-- same, do not do anything. Unfortunately you can't really throw from a
-- query without writing a function or doing some jank like dividing by
-- zero, so this is the best we can do.
AND cardinality($2::text[]) = cardinality($3::text[])
AND cardinality($2::text[]) = cardinality($4::boolean[])
`
type UpdateUsageEventsPostPublishParams struct {
Now time.Time `db:"now" json:"now"`
IDs []string `db:"ids" json:"ids"`
FailureMessages []string `db:"failure_messages" json:"failure_messages"`
SetPublishedAts []bool `db:"set_published_ats" json:"set_published_ats"`
}
func (q *sqlQuerier) UpdateUsageEventsPostPublish(ctx context.Context, arg UpdateUsageEventsPostPublishParams) error {
_, err := q.db.ExecContext(ctx, updateUsageEventsPostPublish,
arg.Now,
pq.Array(arg.IDs),
pq.Array(arg.FailureMessages),
pq.Array(arg.SetPublishedAts),
)
return err
}
const getUserLinkByLinkedID = `-- name: GetUserLinkByLinkedID :one
SELECT
user_links.user_id, user_links.login_type, user_links.linked_id, user_links.oauth_access_token, user_links.oauth_refresh_token, user_links.oauth_expiry, user_links.oauth_access_token_key_id, user_links.oauth_refresh_token_key_id, user_links.claims
+86
View File
@@ -0,0 +1,86 @@
-- name: InsertUsageEvent :exec
-- Duplicate events are ignored intentionally to allow for multiple replicas to
-- publish heartbeat events.
INSERT INTO
usage_events (
id,
event_type,
event_data,
created_at,
publish_started_at,
published_at,
failure_message
)
VALUES
(@id, @event_type, @event_data, @created_at, NULL, NULL, NULL)
ON CONFLICT (id) DO NOTHING;
-- name: SelectUsageEventsForPublishing :many
WITH usage_events AS (
UPDATE
usage_events
SET
publish_started_at = @now::timestamptz
WHERE
id IN (
SELECT
potential_event.id
FROM
usage_events potential_event
WHERE
-- Do not publish events that have already been published or
-- have permanently failed to publish.
potential_event.published_at IS NULL
-- Do not publish events that are already being published by
-- another replica.
AND (
potential_event.publish_started_at IS NULL
-- If the event has publish_started_at set, it must be older
-- than an hour ago. This is so we can retry publishing
-- events where the replica exited or couldn't update the
-- row.
-- The parenthesis around @now::timestamptz are necessary to
-- avoid sqlc from generating an extra argument.
OR potential_event.publish_started_at < (@now::timestamptz) - INTERVAL '1 hour'
)
-- Do not publish events older than 30 days. Tallyman will
-- always permanently reject these events anyways. This is to
-- avoid duplicate events being billed to customers, as
-- Metronome will only deduplicate events within 34 days.
-- Also, the same parenthesis thing here as above.
AND potential_event.created_at > (@now::timestamptz) - INTERVAL '30 days'
ORDER BY potential_event.created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 100
)
RETURNING *
)
SELECT *
-- Note that this selects from the CTE, not the original table. The CTE is named
-- the same as the original table to trick sqlc into reusing the existing struct
-- for the table.
FROM usage_events
-- The CTE and the reorder is required because UPDATE doesn't guarantee order.
ORDER BY created_at ASC;
-- name: UpdateUsageEventsPostPublish :exec
UPDATE
usage_events
SET
publish_started_at = NULL,
published_at = CASE WHEN input.set_published_at THEN @now::timestamptz ELSE NULL END,
failure_message = NULLIF(input.failure_message, '')
FROM (
SELECT
UNNEST(@ids::text[]) AS id,
UNNEST(@failure_messages::text[]) AS failure_message,
UNNEST(@set_published_ats::boolean[]) AS set_published_at
) input
WHERE
input.id = usage_events.id
-- If the number of ids, failure messages, and set published ats are not the
-- same, do not do anything. Unfortunately you can't really throw from a
-- query without writing a function or doing some jank like dividing by
-- zero, so this is the best we can do.
AND cardinality(@ids::text[]) = cardinality(@failure_messages::text[])
AND cardinality(@ids::text[]) = cardinality(@set_published_ats::boolean[]);
+1
View File
@@ -67,6 +67,7 @@ const (
UniqueTemplateVersionsPkey UniqueConstraint = "template_versions_pkey" // ALTER TABLE ONLY template_versions ADD CONSTRAINT template_versions_pkey PRIMARY KEY (id);
UniqueTemplateVersionsTemplateIDNameKey UniqueConstraint = "template_versions_template_id_name_key" // ALTER TABLE ONLY template_versions ADD CONSTRAINT template_versions_template_id_name_key UNIQUE (template_id, name);
UniqueTemplatesPkey UniqueConstraint = "templates_pkey" // ALTER TABLE ONLY templates ADD CONSTRAINT templates_pkey PRIMARY KEY (id);
UniqueUsageEventsPkey UniqueConstraint = "usage_events_pkey" // ALTER TABLE ONLY usage_events ADD CONSTRAINT usage_events_pkey PRIMARY KEY (id);
UniqueUserConfigsPkey UniqueConstraint = "user_configs_pkey" // ALTER TABLE ONLY user_configs ADD CONSTRAINT user_configs_pkey PRIMARY KEY (user_id, key);
UniqueUserDeletedPkey UniqueConstraint = "user_deleted_pkey" // ALTER TABLE ONLY user_deleted ADD CONSTRAINT user_deleted_pkey PRIMARY KEY (id);
UniqueUserLinksPkey UniqueConstraint = "user_links_pkey" // ALTER TABLE ONLY user_links ADD CONSTRAINT user_links_pkey PRIMARY KEY (user_id, login_type);
+2
View File
@@ -32,6 +32,8 @@ const (
// ServiceAgentMetricAggregator merges agent metrics and exports them in a
// prometheus collector format.
ServiceAgentMetricAggregator = "agent-metrics-aggregator"
// ServiceTallymanPublisher publishes usage events to coder/tallyman.
ServiceTallymanPublisher = "tallyman-publisher"
RequestTypeTag = "coder_request_type"
)
+1
View File
@@ -76,6 +76,7 @@ const (
SubjectTypeNotifier SubjectType = "notifier"
SubjectTypeSubAgentAPI SubjectType = "sub_agent_api"
SubjectTypeFileReader SubjectType = "file_reader"
SubjectTypeUsageTracker SubjectType = "usage_tracker"
)
const (
+10
View File
@@ -289,6 +289,15 @@ var (
Type: "template",
}
// ResourceUsageEvent
// Valid Actions
// - "ActionCreate" :: create a usage event
// - "ActionRead" :: read usage events
// - "ActionUpdate" :: update usage events
ResourceUsageEvent = Object{
Type: "usage_event",
}
// ResourceUser
// Valid Actions
// - "ActionCreate" :: create a new user
@@ -412,6 +421,7 @@ func AllResources() []Objecter {
ResourceSystem,
ResourceTailnetCoordinator,
ResourceTemplate,
ResourceUsageEvent,
ResourceUser,
ResourceUserSecret,
ResourceWebpushSubscription,
+7
View File
@@ -351,4 +351,11 @@ var RBACPermissions = map[string]PermissionDefinition{
ActionDelete: "delete a user secret",
},
},
"usage_event": {
Actions: map[Action]ActionDefinition{
ActionCreate: "create a usage event",
ActionRead: "read usage events",
ActionUpdate: "update usage events",
},
},
}
+1 -1
View File
@@ -271,7 +271,7 @@ func ReloadBuiltinRoles(opts *RoleOptions) {
// Workspace dormancy and workspace are omitted.
// Workspace is specifically handled based on the opts.NoOwnerWorkspaceExec.
// Owners cannot access other users' secrets.
allPermsExcept(ResourceWorkspaceDormant, ResourcePrebuiltWorkspace, ResourceWorkspace, ResourceUserSecret),
allPermsExcept(ResourceWorkspaceDormant, ResourcePrebuiltWorkspace, ResourceWorkspace, ResourceUserSecret, ResourceUsageEvent),
// This adds back in the Workspace permissions.
Permissions(map[string][]policy.Action{
ResourceWorkspace.Type: ownerWorkspaceActions,
+16
View File
@@ -872,6 +872,22 @@ func TestRolePermissions(t *testing.T) {
},
},
},
{
Name: "UsageEvents",
Actions: []policy.Action{policy.ActionCreate, policy.ActionRead, policy.ActionUpdate},
Resource: rbac.ResourceUsageEvent,
AuthorizeMap: map[bool][]hasAuthSubjects{
true: {},
false: {
owner,
memberMe, orgMemberMe, otherOrgMember,
orgAdmin, otherOrgAdmin,
orgAuditor, otherOrgAuditor,
templateAdmin, orgTemplateAdmin, otherOrgTemplateAdmin,
userAdmin, orgUserAdmin, otherOrgUserAdmin,
},
},
},
}
// We expect every permission to be tested above.
+82
View File
@@ -0,0 +1,82 @@
package usage
import (
"strings"
"golang.org/x/xerrors"
)
// EventType is an enum of all usage event types. It mirrors the check
// constraint on the `event_type` column in the `usage_events` table.
type EventType string //nolint:revive
const (
UsageEventTypeDCManagedAgentsV1 EventType = "dc_managed_agents_v1"
)
func (e EventType) Valid() bool {
switch e {
case UsageEventTypeDCManagedAgentsV1:
return true
default:
return false
}
}
func (e EventType) IsDiscrete() bool {
return e.Valid() && strings.HasPrefix(string(e), "dc_")
}
func (e EventType) IsHeartbeat() bool {
return e.Valid() && strings.HasPrefix(string(e), "hb_")
}
// Event is a usage event that can be collected by the usage collector.
//
// Note that the following event types should not be updated once they are
// merged into the product. Please consult Dean before making any changes.
//
// Event types cannot be implemented outside of this package, as they are
// imported by the coder/tallyman repository.
type Event interface {
usageEvent() // to prevent external types from implementing this interface
EventType() EventType
Valid() error
Fields() map[string]any // fields to be marshaled and sent to tallyman/Metronome
}
// DiscreteEvent is a usage event that is collected as a discrete event.
type DiscreteEvent interface {
Event
discreteUsageEvent() // marker method, also prevents external types from implementing this interface
}
// DCManagedAgentsV1 is a discrete usage event for the number of managed agents.
// This event is sent in the following situations:
// - Once on first startup after usage tracking is added to the product with
// the count of all existing managed agents (count=N)
// - A new managed agent is created (count=1)
type DCManagedAgentsV1 struct {
Count uint64 `json:"count"`
}
var _ DiscreteEvent = DCManagedAgentsV1{}
func (DCManagedAgentsV1) usageEvent() {}
func (DCManagedAgentsV1) discreteUsageEvent() {}
func (DCManagedAgentsV1) EventType() EventType {
return UsageEventTypeDCManagedAgentsV1
}
func (e DCManagedAgentsV1) Valid() error {
if e.Count == 0 {
return xerrors.New("count must be greater than 0")
}
return nil
}
func (e DCManagedAgentsV1) Fields() map[string]any {
return map[string]any{
"count": e.Count,
}
}
+29
View File
@@ -0,0 +1,29 @@
package usage
import (
"context"
"github.com/coder/coder/v2/coderd/database"
)
// Inserter accepts usage events generated by the product.
type Inserter interface {
// InsertDiscreteUsageEvent writes a discrete usage event to the database
// within the given transaction.
InsertDiscreteUsageEvent(ctx context.Context, tx database.Store, event DiscreteEvent) error
}
// AGPLInserter is a no-op implementation of Inserter.
type AGPLInserter struct{}
var _ Inserter = AGPLInserter{}
func NewAGPLInserter() Inserter {
return AGPLInserter{}
}
// InsertDiscreteUsageEvent is a no-op implementation of
// InsertDiscreteUsageEvent.
func (AGPLInserter) InsertDiscreteUsageEvent(_ context.Context, _ database.Store, _ DiscreteEvent) error {
return nil
}
+2
View File
@@ -35,6 +35,7 @@ const (
ResourceSystem RBACResource = "system"
ResourceTailnetCoordinator RBACResource = "tailnet_coordinator"
ResourceTemplate RBACResource = "template"
ResourceUsageEvent RBACResource = "usage_event"
ResourceUser RBACResource = "user"
ResourceUserSecret RBACResource = "user_secret"
ResourceWebpushSubscription RBACResource = "webpush_subscription"
@@ -100,6 +101,7 @@ var RBACResourceActions = map[RBACResource][]RBACAction{
ResourceSystem: {ActionCreate, ActionDelete, ActionRead, ActionUpdate},
ResourceTailnetCoordinator: {ActionCreate, ActionDelete, ActionRead, ActionUpdate},
ResourceTemplate: {ActionCreate, ActionDelete, ActionRead, ActionUpdate, ActionUse, ActionViewInsights},
ResourceUsageEvent: {ActionCreate, ActionRead, ActionUpdate},
ResourceUser: {ActionCreate, ActionDelete, ActionRead, ActionReadPersonal, ActionUpdate, ActionUpdatePersonal},
ResourceUserSecret: {ActionCreate, ActionDelete, ActionRead, ActionUpdate},
ResourceWebpushSubscription: {ActionCreate, ActionDelete, ActionRead},
+5
View File
@@ -213,6 +213,7 @@ Status Code **200**
| `resource_type` | `system` |
| `resource_type` | `tailnet_coordinator` |
| `resource_type` | `template` |
| `resource_type` | `usage_event` |
| `resource_type` | `user` |
| `resource_type` | `user_secret` |
| `resource_type` | `webpush_subscription` |
@@ -384,6 +385,7 @@ Status Code **200**
| `resource_type` | `system` |
| `resource_type` | `tailnet_coordinator` |
| `resource_type` | `template` |
| `resource_type` | `usage_event` |
| `resource_type` | `user` |
| `resource_type` | `user_secret` |
| `resource_type` | `webpush_subscription` |
@@ -555,6 +557,7 @@ Status Code **200**
| `resource_type` | `system` |
| `resource_type` | `tailnet_coordinator` |
| `resource_type` | `template` |
| `resource_type` | `usage_event` |
| `resource_type` | `user` |
| `resource_type` | `user_secret` |
| `resource_type` | `webpush_subscription` |
@@ -695,6 +698,7 @@ Status Code **200**
| `resource_type` | `system` |
| `resource_type` | `tailnet_coordinator` |
| `resource_type` | `template` |
| `resource_type` | `usage_event` |
| `resource_type` | `user` |
| `resource_type` | `user_secret` |
| `resource_type` | `webpush_subscription` |
@@ -1057,6 +1061,7 @@ Status Code **200**
| `resource_type` | `system` |
| `resource_type` | `tailnet_coordinator` |
| `resource_type` | `template` |
| `resource_type` | `usage_event` |
| `resource_type` | `user` |
| `resource_type` | `user_secret` |
| `resource_type` | `webpush_subscription` |
+1
View File
@@ -6378,6 +6378,7 @@ Only certain features set these fields: - FeatureManagedAgentLimit|
| `system` |
| `tailnet_coordinator` |
| `template` |
| `usage_event` |
| `user` |
| `user_secret` |
| `webpush_subscription` |
@@ -161,12 +161,13 @@ func NewWithAPI(t *testing.T, options *Options) (
// LicenseOptions is used to generate a license for testing.
// It supports the builder pattern for easy customization.
type LicenseOptions struct {
AccountType string
AccountID string
DeploymentIDs []string
Trial bool
FeatureSet codersdk.FeatureSet
AllFeatures bool
AccountType string
AccountID string
DeploymentIDs []string
Trial bool
FeatureSet codersdk.FeatureSet
AllFeatures bool
PublishUsageData bool
// GraceAt is the time at which the license will enter the grace period.
GraceAt time.Time
// ExpiresAt is the time at which the license will hard expire.
@@ -271,6 +272,13 @@ func GenerateLicense(t *testing.T, options LicenseOptions) string {
issuedAt = time.Now().Add(-time.Minute)
}
if options.AccountType == "" {
options.AccountType = license.AccountTypeSalesforce
}
if options.AccountID == "" {
options.AccountID = "test-account-id"
}
c := &license.Claims{
RegisteredClaims: jwt.RegisteredClaims{
ID: uuid.NewString(),
@@ -279,15 +287,16 @@ func GenerateLicense(t *testing.T, options LicenseOptions) string {
NotBefore: jwt.NewNumericDate(options.NotBefore),
IssuedAt: jwt.NewNumericDate(issuedAt),
},
LicenseExpires: jwt.NewNumericDate(options.GraceAt),
AccountType: options.AccountType,
AccountID: options.AccountID,
DeploymentIDs: options.DeploymentIDs,
Trial: options.Trial,
Version: license.CurrentVersion,
AllFeatures: options.AllFeatures,
FeatureSet: options.FeatureSet,
Features: options.Features,
LicenseExpires: jwt.NewNumericDate(options.GraceAt),
AccountType: options.AccountType,
AccountID: options.AccountID,
DeploymentIDs: options.DeploymentIDs,
Trial: options.Trial,
Version: license.CurrentVersion,
AllFeatures: options.AllFeatures,
FeatureSet: options.FeatureSet,
Features: options.Features,
PublishUsageData: options.PublishUsageData,
}
return GenerateLicenseRaw(t, c)
}
+1
View File
@@ -584,6 +584,7 @@ type Claims struct {
Version uint64 `json:"version"`
Features Features `json:"features"`
RequireTelemetry bool `json:"require_telemetry,omitempty"`
PublishUsageData bool `json:"publish_usage_data,omitempty"`
}
var _ jwt.Claims = &Claims{}
+66
View File
@@ -0,0 +1,66 @@
package usage
import (
"context"
"encoding/json"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
agplusage "github.com/coder/coder/v2/coderd/usage"
"github.com/coder/quartz"
)
// Inserter accepts usage events and stores them in the database for publishing.
type Inserter struct {
clock quartz.Clock
}
var _ agplusage.Inserter = &Inserter{}
// NewInserter creates a new database-backed usage event inserter.
func NewInserter(opts ...InserterOptions) *Inserter {
c := &Inserter{
clock: quartz.NewReal(),
}
for _, opt := range opts {
opt(c)
}
return c
}
type InserterOptions func(*Inserter)
// InserterWithClock sets the quartz clock to use for the inserter.
func InserterWithClock(clock quartz.Clock) InserterOptions {
return func(c *Inserter) {
c.clock = clock
}
}
// InsertDiscreteUsageEvent implements agplusage.Inserter.
func (c *Inserter) InsertDiscreteUsageEvent(ctx context.Context, tx database.Store, event agplusage.DiscreteEvent) error {
if !event.EventType().IsDiscrete() {
return xerrors.Errorf("event type %q is not a discrete event", event.EventType())
}
if err := event.Valid(); err != nil {
return xerrors.Errorf("invalid %q event: %w", event.EventType(), err)
}
jsonData, err := json.Marshal(event.Fields())
if err != nil {
return xerrors.Errorf("marshal event as JSON: %w", err)
}
// Duplicate events are ignored by the query, so we don't need to check the
// error.
return tx.InsertUsageEvent(ctx, database.InsertUsageEventParams{
// Always generate a new UUID for discrete events.
ID: uuid.New().String(),
EventType: string(event.EventType()),
EventData: jsonData,
CreatedAt: dbtime.Time(c.clock.Now()),
})
}
+85
View File
@@ -0,0 +1,85 @@
package usage_test
import (
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmock"
"github.com/coder/coder/v2/coderd/database/dbtime"
agplusage "github.com/coder/coder/v2/coderd/usage"
"github.com/coder/coder/v2/enterprise/coderd/usage"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
)
func TestInserter(t *testing.T) {
t.Parallel()
t.Run("OK", func(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
clock := quartz.NewMock(t)
inserter := usage.NewInserter(usage.InserterWithClock(clock))
now := dbtime.Now()
events := []struct {
time time.Time
event agplusage.DiscreteEvent
}{
{
time: now,
event: agplusage.DCManagedAgentsV1{
Count: 1,
},
},
{
time: now.Add(1 * time.Minute),
event: agplusage.DCManagedAgentsV1{
Count: 2,
},
},
}
for _, event := range events {
eventJSON := jsoninate(t, event.event)
db.EXPECT().InsertUsageEvent(ctx, gomock.Any()).DoAndReturn(
func(ctx interface{}, params database.InsertUsageEventParams) error {
_, err := uuid.Parse(params.ID)
assert.NoError(t, err)
assert.Equal(t, string(event.event.EventType()), params.EventType)
assert.JSONEq(t, eventJSON, string(params.EventData))
assert.Equal(t, event.time, params.CreatedAt)
return nil
},
).Times(1)
clock.Set(event.time)
err := inserter.InsertDiscreteUsageEvent(ctx, db, event.event)
require.NoError(t, err)
}
})
t.Run("InvalidEvent", func(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
// We should get an error if the event is invalid.
inserter := usage.NewInserter()
err := inserter.InsertDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{
Count: 0, // invalid
})
assert.ErrorContains(t, err, `invalid "dc_managed_agents_v1" event: count must be greater than 0`)
})
}
+463
View File
@@ -0,0 +1,463 @@
package usage
import (
"bytes"
"context"
"crypto/ed25519"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/pproflabel"
agplusage "github.com/coder/coder/v2/coderd/usage"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/enterprise/coderd"
"github.com/coder/coder/v2/enterprise/coderd/license"
"github.com/coder/quartz"
)
const (
CoderLicenseJWTHeader = "Coder-License-JWT"
tallymanURL = "https://tallyman-prod.coder.com"
tallymanIngestURLV1 = tallymanURL + "/api/v1/events/ingest"
tallymanPublishInitialMinimumDelay = 5 * time.Minute
// Chosen to be a prime number and not a multiple of 5 like many other
// recurring tasks.
tallymanPublishInterval = 17 * time.Minute
tallymanPublishTimeout = 30 * time.Second
tallymanPublishBatchSize = 100
)
var errUsagePublishingDisabled = xerrors.New("usage publishing is not enabled by any license")
// Publisher publishes usage events ***somewhere***.
type Publisher interface {
// Close closes the publisher and waits for it to finish.
io.Closer
// Start starts the publisher. It must only be called once.
Start() error
}
type tallymanPublisher struct {
ctx context.Context
ctxCancel context.CancelFunc
log slog.Logger
db database.Store
done chan struct{}
// Configured with options:
ingestURL string
httpClient *http.Client
clock quartz.Clock
licenseKeys map[string]ed25519.PublicKey
initialDelay time.Duration
}
var _ Publisher = &tallymanPublisher{}
// NewTallymanPublisher creates a Publisher that publishes usage events to
// Coder's Tallyman service.
func NewTallymanPublisher(ctx context.Context, log slog.Logger, db database.Store, opts ...TallymanPublisherOption) Publisher {
ctx, cancel := context.WithCancel(ctx)
publisher := &tallymanPublisher{
ctx: ctx,
ctxCancel: cancel,
log: log,
db: db,
done: make(chan struct{}),
ingestURL: tallymanIngestURLV1,
httpClient: http.DefaultClient,
clock: quartz.NewReal(),
licenseKeys: coderd.Keys,
}
for _, opt := range opts {
opt(publisher)
}
return publisher
}
type TallymanPublisherOption func(*tallymanPublisher)
// PublisherWithHTTPClient sets the HTTP client to use for publishing usage events.
func PublisherWithHTTPClient(httpClient *http.Client) TallymanPublisherOption {
return func(p *tallymanPublisher) {
p.httpClient = httpClient
}
}
// PublisherWithClock sets the clock to use for publishing usage events.
func PublisherWithClock(clock quartz.Clock) TallymanPublisherOption {
return func(p *tallymanPublisher) {
p.clock = clock
}
}
// PublisherWithLicenseKeys sets the license public keys to use for license
// validation.
func PublisherWithLicenseKeys(keys map[string]ed25519.PublicKey) TallymanPublisherOption {
return func(p *tallymanPublisher) {
p.licenseKeys = keys
}
}
// PublisherWithIngestURL sets the ingest URL to use for publishing usage
// events.
func PublisherWithIngestURL(ingestURL string) TallymanPublisherOption {
return func(p *tallymanPublisher) {
p.ingestURL = ingestURL
}
}
// PublisherWithInitialDelay sets the initial delay for the publisher.
func PublisherWithInitialDelay(initialDelay time.Duration) TallymanPublisherOption {
return func(p *tallymanPublisher) {
p.initialDelay = initialDelay
}
}
// Start implements Publisher.
func (p *tallymanPublisher) Start() error {
ctx := p.ctx
deploymentID, err := p.db.GetDeploymentID(ctx)
if err != nil {
return xerrors.Errorf("get deployment ID: %w", err)
}
deploymentUUID, err := uuid.Parse(deploymentID)
if err != nil {
return xerrors.Errorf("parse deployment ID %q: %w", deploymentID, err)
}
if p.initialDelay <= 0 {
// Pick a random time between tallymanPublishInitialMinimumDelay and
// tallymanPublishInterval.
maxPlusDelay := int(tallymanPublishInterval - tallymanPublishInitialMinimumDelay)
plusDelay, err := cryptorand.Intn(maxPlusDelay)
if err != nil {
return xerrors.Errorf("could not generate random start delay: %w", err)
}
p.initialDelay = tallymanPublishInitialMinimumDelay + time.Duration(plusDelay)
}
pproflabel.Go(ctx, pproflabel.Service(pproflabel.ServiceTallymanPublisher), func(ctx context.Context) {
p.publishLoop(ctx, deploymentUUID)
})
return nil
}
func (p *tallymanPublisher) publishLoop(ctx context.Context, deploymentID uuid.UUID) {
defer close(p.done)
// Start the ticker with the initial delay. We will reset it to the interval
// after the first tick.
ticker := p.clock.NewTicker(p.initialDelay)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
err := p.publish(ctx, deploymentID)
if err != nil {
p.log.Warn(ctx, "publish usage events to tallyman", slog.Error(err))
}
ticker.Reset(tallymanPublishInterval)
}
}
// publish publishes usage events to Tallyman in a loop until there is an error
// (or any rejection) or there are no more events to publish.
func (p *tallymanPublisher) publish(ctx context.Context, deploymentID uuid.UUID) error {
for {
publishCtx, publishCtxCancel := context.WithTimeout(ctx, tallymanPublishTimeout)
accepted, err := p.publishOnce(publishCtx, deploymentID)
publishCtxCancel()
if err != nil {
return xerrors.Errorf("publish usage events to tallyman: %w", err)
}
if accepted < tallymanPublishBatchSize {
// We published less than the batch size, so we're done.
return nil
}
}
}
// publishOnce publishes up to tallymanPublishBatchSize usage events to
// tallyman. It returns the number of successfully published events.
func (p *tallymanPublisher) publishOnce(ctx context.Context, deploymentID uuid.UUID) (int, error) {
licenseJwt, err := p.getBestLicenseJWT(ctx)
if xerrors.Is(err, errUsagePublishingDisabled) {
return 0, nil
} else if err != nil {
return 0, xerrors.Errorf("find usage publishing license: %w", err)
}
events, err := p.db.SelectUsageEventsForPublishing(ctx, dbtime.Time(p.clock.Now()))
if err != nil {
return 0, xerrors.Errorf("select usage events for publishing: %w", err)
}
if len(events) == 0 {
// No events to publish.
return 0, nil
}
var (
eventIDs = make(map[string]struct{})
tallymanReq = TallymanIngestRequestV1{
DeploymentID: deploymentID,
Events: make([]TallymanIngestEventV1, 0, len(events)),
}
)
for _, event := range events {
eventIDs[event.ID] = struct{}{}
eventType := agplusage.EventType(event.EventType)
if !eventType.Valid() {
// This should never happen due to the check constraint in the
// database.
return 0, xerrors.Errorf("event %q has an invalid event type %q", event.ID, event.EventType)
}
tallymanReq.Events = append(tallymanReq.Events, TallymanIngestEventV1{
ID: event.ID,
EventType: eventType,
EventData: event.EventData,
CreatedAt: event.CreatedAt,
})
}
if len(eventIDs) != len(events) {
// This should never happen due to the unique constraint in the
// database.
return 0, xerrors.Errorf("duplicate event IDs found in events for publishing")
}
resp, err := p.sendPublishRequest(ctx, licenseJwt, tallymanReq)
allFailed := err != nil
if err != nil {
p.log.Warn(ctx, "failed to send publish request to tallyman", slog.F("count", len(events)), slog.Error(err))
// Fake a response with all events temporarily rejected.
resp = TallymanIngestResponseV1{
AcceptedEvents: []TallymanIngestAcceptedEventV1{},
RejectedEvents: make([]TallymanIngestRejectedEventV1, len(events)),
}
for i, event := range events {
resp.RejectedEvents[i] = TallymanIngestRejectedEventV1{
ID: event.ID,
Message: fmt.Sprintf("failed to publish to tallyman: %v", err),
Permanent: false,
}
}
} else {
p.log.Debug(ctx, "published usage events to tallyman", slog.F("accepted", len(resp.AcceptedEvents)), slog.F("rejected", len(resp.RejectedEvents)))
}
if len(resp.AcceptedEvents)+len(resp.RejectedEvents) != len(events) {
p.log.Warn(ctx, "tallyman returned a different number of events than we sent", slog.F("sent", len(events)), slog.F("accepted", len(resp.AcceptedEvents)), slog.F("rejected", len(resp.RejectedEvents)))
}
acceptedEvents := make(map[string]*TallymanIngestAcceptedEventV1)
rejectedEvents := make(map[string]*TallymanIngestRejectedEventV1)
for _, event := range resp.AcceptedEvents {
acceptedEvents[event.ID] = &event
}
for _, event := range resp.RejectedEvents {
rejectedEvents[event.ID] = &event
}
dbUpdate := database.UpdateUsageEventsPostPublishParams{
Now: dbtime.Time(p.clock.Now()),
IDs: make([]string, len(events)),
FailureMessages: make([]string, len(events)),
SetPublishedAts: make([]bool, len(events)),
}
for i, event := range events {
dbUpdate.IDs[i] = event.ID
if _, ok := acceptedEvents[event.ID]; ok {
dbUpdate.FailureMessages[i] = ""
dbUpdate.SetPublishedAts[i] = true
continue
}
if rejectedEvent, ok := rejectedEvents[event.ID]; ok {
dbUpdate.FailureMessages[i] = rejectedEvent.Message
dbUpdate.SetPublishedAts[i] = rejectedEvent.Permanent
continue
}
// It's not good if this path gets hit, but we'll handle it as if it
// was a temporary rejection.
dbUpdate.FailureMessages[i] = "tallyman did not include the event in the response"
dbUpdate.SetPublishedAts[i] = false
}
// Collate rejected events into a single map of ID to failure message for
// logging. We only want to log once.
// If all events failed, we'll log the overall error above.
if !allFailed {
rejectionReasonsForLog := make(map[string]string)
for i, id := range dbUpdate.IDs {
failureMessage := dbUpdate.FailureMessages[i]
if failureMessage == "" {
continue
}
setPublishedAt := dbUpdate.SetPublishedAts[i]
if setPublishedAt {
failureMessage = "permanently rejected: " + failureMessage
}
rejectionReasonsForLog[id] = failureMessage
}
if len(rejectionReasonsForLog) > 0 {
p.log.Warn(ctx, "tallyman rejected usage events", slog.F("count", len(rejectionReasonsForLog)), slog.F("event_failure_reasons", rejectionReasonsForLog))
}
}
err = p.db.UpdateUsageEventsPostPublish(ctx, dbUpdate)
if err != nil {
return 0, xerrors.Errorf("update usage events post publish: %w", err)
}
var returnErr error
if len(resp.RejectedEvents) > 0 {
returnErr = xerrors.New("some events were rejected by tallyman")
}
return len(resp.AcceptedEvents), returnErr
}
// getBestLicenseJWT returns the best license JWT to use for the request. The
// criteria is as follows:
// - The license must be valid and active (after nbf, before exp)
// - The license must have usage publishing enabled
// The most recently issued (iat) license is chosen.
//
// If no licenses are found or none have usage publishing enabled,
// errUsagePublishingDisabled is returned.
func (p *tallymanPublisher) getBestLicenseJWT(ctx context.Context) (string, error) {
licenses, err := p.db.GetUnexpiredLicenses(ctx)
if err != nil {
return "", xerrors.Errorf("get unexpired licenses: %w", err)
}
if len(licenses) == 0 {
return "", errUsagePublishingDisabled
}
type licenseJWTWithClaims struct {
Claims *license.Claims
Raw string
}
var bestLicense licenseJWTWithClaims
for _, dbLicense := range licenses {
claims, err := license.ParseClaims(dbLicense.JWT, p.licenseKeys)
if err != nil {
p.log.Warn(ctx, "failed to parse license claims", slog.F("license_id", dbLicense.ID), slog.Error(err))
continue
}
if claims.AccountType != license.AccountTypeSalesforce {
// Non-Salesforce accounts cannot be tracked as they do not have a
// trusted Salesforce opportunity ID encoded in the license.
continue
}
if !claims.PublishUsageData {
// Publishing is disabled.
continue
}
// Otherwise, if it's issued more recently, it's the best license.
// IssuedAt is verified to be non-nil in license.ParseClaims.
if bestLicense.Claims == nil || claims.IssuedAt.Time.After(bestLicense.Claims.IssuedAt.Time) {
bestLicense = licenseJWTWithClaims{
Claims: claims,
Raw: dbLicense.JWT,
}
}
}
if bestLicense.Raw == "" {
return "", errUsagePublishingDisabled
}
return bestLicense.Raw, nil
}
func (p *tallymanPublisher) sendPublishRequest(ctx context.Context, licenseJwt string, req TallymanIngestRequestV1) (TallymanIngestResponseV1, error) {
body, err := json.Marshal(req)
if err != nil {
return TallymanIngestResponseV1{}, err
}
r, err := http.NewRequestWithContext(ctx, http.MethodPost, p.ingestURL, bytes.NewReader(body))
if err != nil {
return TallymanIngestResponseV1{}, err
}
r.Header.Set(CoderLicenseJWTHeader, licenseJwt)
resp, err := p.httpClient.Do(r)
if err != nil {
return TallymanIngestResponseV1{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
var errBody TallymanErrorV1
if err := json.NewDecoder(resp.Body).Decode(&errBody); err != nil {
errBody = TallymanErrorV1{
Message: fmt.Sprintf("could not decode error response body: %v", err),
}
}
return TallymanIngestResponseV1{}, xerrors.Errorf("unexpected status code %v, error: %s", resp.StatusCode, errBody.Message)
}
var respBody TallymanIngestResponseV1
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
return TallymanIngestResponseV1{}, xerrors.Errorf("decode response body: %w", err)
}
return respBody, nil
}
// Close implements Publisher.
func (p *tallymanPublisher) Close() error {
p.ctxCancel()
<-p.done
return nil
}
type TallymanErrorV1 struct {
Message string `json:"message"`
}
type TallymanIngestRequestV1 struct {
DeploymentID uuid.UUID `json:"deployment_id"`
Events []TallymanIngestEventV1 `json:"events"`
}
type TallymanIngestEventV1 struct {
ID string `json:"id"`
EventType agplusage.EventType `json:"event_type"`
EventData json.RawMessage `json:"event_data"`
CreatedAt time.Time `json:"created_at"`
}
type TallymanIngestResponseV1 struct {
AcceptedEvents []TallymanIngestAcceptedEventV1 `json:"accepted_events"`
RejectedEvents []TallymanIngestRejectedEventV1 `json:"rejected_events"`
}
type TallymanIngestAcceptedEventV1 struct {
ID string `json:"id"`
}
type TallymanIngestRejectedEventV1 struct {
ID string `json:"id"`
Message string `json:"message"`
Permanent bool `json:"permanent"`
}
+729
View File
@@ -0,0 +1,729 @@
package usage_test
import (
"context"
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/mock/gomock"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmock"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
agplusage "github.com/coder/coder/v2/coderd/usage"
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
"github.com/coder/coder/v2/enterprise/coderd/usage"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.GoleakOptions...)
}
// TestIntegration tests the inserter and publisher by running them with a real
// database.
func TestIntegration(t *testing.T) {
t.Parallel()
const eventCount = 3
ctx := testutil.Context(t, testutil.WaitLong)
log := slogtest.Make(t, nil)
db, _ := dbtestutil.NewDB(t)
clock := quartz.NewMock(t)
deploymentID, licenseJWT := configureDeployment(ctx, t, db)
now := time.Now()
var (
calls int
handler func(req usage.TallymanIngestRequestV1) any
)
ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any {
calls++
t.Logf("tallyman backend received call %d", calls)
assert.Equal(t, deploymentID, req.DeploymentID)
if handler == nil {
t.Errorf("handler is nil")
return usage.TallymanIngestResponseV1{}
}
return handler(req)
}))
inserter := usage.NewInserter(
usage.InserterWithClock(clock),
)
// Insert an old event that should never be published.
clock.Set(now.Add(-31 * 24 * time.Hour))
err := inserter.InsertDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{
Count: 31,
})
require.NoError(t, err)
// Insert the events we expect to be published.
clock.Set(now.Add(1 * time.Second))
for i := 0; i < eventCount; i++ {
clock.Advance(time.Second)
err := inserter.InsertDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{
Count: uint64(i + 1), // nolint:gosec // these numbers are tiny and will not overflow
})
require.NoErrorf(t, err, "collecting event %d", i)
}
publisher := usage.NewTallymanPublisher(ctx, log, db,
usage.PublisherWithClock(clock),
usage.PublisherWithIngestURL(ingestURL),
usage.PublisherWithLicenseKeys(coderdenttest.Keys),
)
defer publisher.Close()
// Start the publisher with a trap.
tickerTrap := clock.Trap().NewTicker()
defer tickerTrap.Close()
startErr := make(chan error)
go func() {
err := publisher.Start()
testutil.AssertSend(ctx, t, startErr, err)
}()
tickerCall := tickerTrap.MustWait(ctx)
tickerCall.MustRelease(ctx)
// The initial duration will always be some time between 5m and 17m.
require.GreaterOrEqual(t, tickerCall.Duration, 5*time.Minute)
require.LessOrEqual(t, tickerCall.Duration, 17*time.Minute)
require.NoError(t, testutil.RequireReceive(ctx, t, startErr))
// Set up a trap for the ticker.Reset call.
tickerResetTrap := clock.Trap().TickerReset()
defer tickerResetTrap.Close()
// Configure the handler for the first publish. This handler will accept the
// first event, temporarily reject the second, and permanently reject the
// third.
var temporarilyRejectedEventID string
handler = func(req usage.TallymanIngestRequestV1) any {
// On the first call, accept the first event, temporarily reject the
// second, and permanently reject the third.
acceptedEvents := make([]usage.TallymanIngestAcceptedEventV1, 1)
rejectedEvents := make([]usage.TallymanIngestRejectedEventV1, 2)
if assert.Len(t, req.Events, eventCount) {
assert.JSONEqf(t, jsoninate(t, agplusage.DCManagedAgentsV1{
Count: 1,
}), string(req.Events[0].EventData), "event data did not match for event %d", 0)
acceptedEvents[0].ID = req.Events[0].ID
temporarilyRejectedEventID = req.Events[1].ID
assert.JSONEqf(t, jsoninate(t, agplusage.DCManagedAgentsV1{
Count: 2,
}), string(req.Events[1].EventData), "event data did not match for event %d", 1)
rejectedEvents[0].ID = req.Events[1].ID
rejectedEvents[0].Message = "temporarily rejected"
rejectedEvents[0].Permanent = false
assert.JSONEqf(t, jsoninate(t, agplusage.DCManagedAgentsV1{
Count: 3,
}), string(req.Events[2].EventData), "event data did not match for event %d", 2)
rejectedEvents[1].ID = req.Events[2].ID
rejectedEvents[1].Message = "permanently rejected"
rejectedEvents[1].Permanent = true
}
return usage.TallymanIngestResponseV1{
AcceptedEvents: acceptedEvents,
RejectedEvents: rejectedEvents,
}
}
// Advance the clock to the initial tick, which should trigger the first
// publish, then wait for the reset call. The duration will always be 17m
// for resets (only the initial tick is variable).
clock.Advance(tickerCall.Duration)
tickerResetCall := tickerResetTrap.MustWait(ctx)
require.Equal(t, 17*time.Minute, tickerResetCall.Duration)
tickerResetCall.MustRelease(ctx)
// The publisher should have published the events once.
require.Equal(t, 1, calls)
// Set the handler for the next publish call. This call should only include
// the temporarily rejected event from earlier. This time we'll accept it.
handler = func(req usage.TallymanIngestRequestV1) any {
assert.Len(t, req.Events, 1)
acceptedEvents := make([]usage.TallymanIngestAcceptedEventV1, len(req.Events))
for i, event := range req.Events {
assert.Equal(t, temporarilyRejectedEventID, event.ID)
acceptedEvents[i].ID = event.ID
}
return usage.TallymanIngestResponseV1{
AcceptedEvents: acceptedEvents,
RejectedEvents: []usage.TallymanIngestRejectedEventV1{},
}
}
// Advance the clock to the next tick and wait for the reset call.
clock.Advance(tickerResetCall.Duration)
tickerResetCall = tickerResetTrap.MustWait(ctx)
tickerResetCall.MustRelease(ctx)
// The publisher should have published the events again.
require.Equal(t, 2, calls)
// There should be no more publish calls after this, so set the handler to
// nil.
handler = nil
// Advance the clock to the next tick.
clock.Advance(tickerResetCall.Duration)
tickerResetTrap.MustWait(ctx).MustRelease(ctx)
// No publish should have taken place since there are no more events to
// publish.
require.Equal(t, 2, calls)
require.NoError(t, publisher.Close())
}
func TestPublisherNoEligibleLicenses(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
log := slogtest.Make(t, nil)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
clock := quartz.NewMock(t)
// Configure the deployment manually.
deploymentID := uuid.New()
db.EXPECT().GetDeploymentID(gomock.Any()).Return(deploymentID.String(), nil).Times(1)
var calls int
ingestURL := fakeServer(t, tallymanHandler(t, "", func(req usage.TallymanIngestRequestV1) any {
calls++
return usage.TallymanIngestResponseV1{
AcceptedEvents: []usage.TallymanIngestAcceptedEventV1{},
RejectedEvents: []usage.TallymanIngestRejectedEventV1{},
}
}))
publisher := usage.NewTallymanPublisher(ctx, log, db,
usage.PublisherWithClock(clock),
usage.PublisherWithIngestURL(ingestURL),
usage.PublisherWithLicenseKeys(coderdenttest.Keys),
)
defer publisher.Close()
// Start the publisher with a trap.
tickerTrap := clock.Trap().NewTicker()
defer tickerTrap.Close()
startErr := make(chan error)
go func() {
err := publisher.Start()
testutil.RequireSend(ctx, t, startErr, err)
}()
tickerCall := tickerTrap.MustWait(ctx)
tickerCall.MustRelease(ctx)
require.NoError(t, testutil.RequireReceive(ctx, t, startErr))
// Mock zero licenses.
db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{}, nil).Times(1)
// Tick and wait for the reset call.
tickerResetTrap := clock.Trap().TickerReset()
defer tickerResetTrap.Close()
clock.Advance(tickerCall.Duration)
tickerResetCall := tickerResetTrap.MustWait(ctx)
tickerResetCall.MustRelease(ctx)
// The publisher should not have published the events.
require.Equal(t, 0, calls)
// Mock a single license with usage publishing disabled.
licenseJWT := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: false,
})
db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{
{
ID: 1,
JWT: licenseJWT,
UploadedAt: dbtime.Now(),
Exp: dbtime.Now().Add(48 * time.Hour), // fake
UUID: uuid.New(),
},
}, nil).Times(1)
// Tick and wait for the reset call.
clock.Advance(tickerResetCall.Duration)
tickerResetTrap.MustWait(ctx).MustRelease(ctx)
// The publisher should still not have published the events.
require.Equal(t, 0, calls)
}
// TestPublisherClaimExpiry tests the claim query to ensure that events are not
// claimed if they've recently been claimed by another publisher.
func TestPublisherClaimExpiry(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
log := slogtest.Make(t, nil)
db, _ := dbtestutil.NewDB(t)
clock := quartz.NewMock(t)
_, licenseJWT := configureDeployment(ctx, t, db)
now := time.Now()
var calls int
ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any {
calls++
return tallymanAcceptAllHandler(req)
}))
inserter := usage.NewInserter(
usage.InserterWithClock(clock),
)
publisher := usage.NewTallymanPublisher(ctx, log, db,
usage.PublisherWithClock(clock),
usage.PublisherWithIngestURL(ingestURL),
usage.PublisherWithLicenseKeys(coderdenttest.Keys),
usage.PublisherWithInitialDelay(17*time.Minute),
)
defer publisher.Close()
// Create an event that was claimed 1h-18m ago. The ticker has a forced
// delay of 17m in this test.
clock.Set(now)
err := inserter.InsertDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{
Count: 1,
})
require.NoError(t, err)
// Claim the event in the past. Claiming it this way via the database
// directly means it won't be marked as published or unclaimed.
events, err := db.SelectUsageEventsForPublishing(ctx, now.Add(-42*time.Minute))
require.NoError(t, err)
require.Len(t, events, 1)
// Start the publisher with a trap.
tickerTrap := clock.Trap().NewTicker()
defer tickerTrap.Close()
startErr := make(chan error)
go func() {
err := publisher.Start()
testutil.RequireSend(ctx, t, startErr, err)
}()
tickerCall := tickerTrap.MustWait(ctx)
require.Equal(t, 17*time.Minute, tickerCall.Duration)
tickerCall.MustRelease(ctx)
require.NoError(t, testutil.RequireReceive(ctx, t, startErr))
// Set up a trap for the ticker.Reset call.
tickerResetTrap := clock.Trap().TickerReset()
defer tickerResetTrap.Close()
// Advance the clock to the initial tick, which should trigger the first
// publish, then wait for the reset call. The duration will always be 17m
// for resets (only the initial tick is variable).
clock.Advance(tickerCall.Duration)
tickerResetCall := tickerResetTrap.MustWait(ctx)
require.Equal(t, 17*time.Minute, tickerResetCall.Duration)
tickerResetCall.MustRelease(ctx)
// No events should have been published since none are eligible.
require.Equal(t, 0, calls)
// Advance the clock to the next tick and wait for the reset call.
clock.Advance(tickerResetCall.Duration)
tickerResetCall = tickerResetTrap.MustWait(ctx)
tickerResetCall.MustRelease(ctx)
// The publisher should have published the event, as it's now eligible.
require.Equal(t, 1, calls)
}
// TestPublisherMissingEvents tests that the publisher notices events that are
// not returned by the Tallyman server and marks them as temporarily rejected.
func TestPublisherMissingEvents(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
log := slogtest.Make(t, nil)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
_, licenseJWT := configureMockDeployment(t, db)
clock := quartz.NewMock(t)
now := time.Now()
clock.Set(now)
var calls int
ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any {
calls++
return usage.TallymanIngestResponseV1{
AcceptedEvents: []usage.TallymanIngestAcceptedEventV1{},
RejectedEvents: []usage.TallymanIngestRejectedEventV1{},
}
}))
publisher := usage.NewTallymanPublisher(ctx, log, db,
usage.PublisherWithClock(clock),
usage.PublisherWithIngestURL(ingestURL),
usage.PublisherWithLicenseKeys(coderdenttest.Keys),
)
// Expect the publisher to call SelectUsageEventsForPublishing, followed by
// UpdateUsageEventsPostPublish.
events := []database.UsageEvent{
{
ID: uuid.New().String(),
EventType: string(agplusage.UsageEventTypeDCManagedAgentsV1),
EventData: []byte(jsoninate(t, agplusage.DCManagedAgentsV1{
Count: 1,
})),
CreatedAt: now,
PublishedAt: sql.NullTime{},
PublishStartedAt: sql.NullTime{},
FailureMessage: sql.NullString{},
},
}
db.EXPECT().SelectUsageEventsForPublishing(gomock.Any(), gomock.Any()).Return(events, nil).Times(1)
db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, params database.UpdateUsageEventsPostPublishParams) error {
assert.Equal(t, []string{events[0].ID}, params.IDs)
assert.Equal(t, []string{"tallyman did not include the event in the response"}, params.FailureMessages)
assert.Equal(t, []bool{false}, params.SetPublishedAts)
return nil
},
).Times(1)
// Start the publisher with a trap.
tickerTrap := clock.Trap().NewTicker()
defer tickerTrap.Close()
startErr := make(chan error)
go func() {
err := publisher.Start()
testutil.RequireSend(ctx, t, startErr, err)
}()
tickerCall := tickerTrap.MustWait(ctx)
tickerCall.MustRelease(ctx)
require.NoError(t, testutil.RequireReceive(ctx, t, startErr))
// Tick and wait for the reset call.
tickerResetTrap := clock.Trap().TickerReset()
defer tickerResetTrap.Close()
clock.Advance(tickerCall.Duration)
tickerResetTrap.MustWait(ctx).MustRelease(ctx)
// The publisher should have published the events once.
require.Equal(t, 1, calls)
require.NoError(t, publisher.Close())
}
func TestPublisherLicenseSelection(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
log := slogtest.Make(t, nil)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
clock := quartz.NewMock(t)
now := time.Now()
// Configure the deployment manually.
deploymentID := uuid.New()
db.EXPECT().GetDeploymentID(gomock.Any()).Return(deploymentID.String(), nil).Times(1)
// Insert multiple licenses:
// 1. PublishUsageData false, type=salesforce, iat 30m ago (ineligible, publish not enabled)
// 2. PublishUsageData true, type=trial, iat 1h ago (ineligible, not salesforce)
// 3. PublishUsageData true, type=salesforce, iat 30m ago, exp 10m ago (ineligible, expired)
// 4. PublishUsageData true, type=salesforce, iat 1h ago (eligible)
// 5. PublishUsageData true, type=salesforce, iat 30m ago (eligible, and newer!)
badLicense1 := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: false,
IssuedAt: now.Add(-30 * time.Minute),
})
badLicense2 := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: true,
IssuedAt: now.Add(-1 * time.Hour),
AccountType: "trial",
})
badLicense3 := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: true,
IssuedAt: now.Add(-30 * time.Minute),
ExpiresAt: now.Add(-10 * time.Minute),
})
badLicense4 := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: true,
IssuedAt: now.Add(-1 * time.Hour),
})
expectedLicense := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: true,
IssuedAt: now.Add(-30 * time.Minute),
})
// GetUnexpiredLicenses is not supposed to return expired licenses, but for
// the purposes of this test we're going to do it anyway.
db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{
{
ID: 1,
JWT: badLicense1,
Exp: now.Add(48 * time.Hour), // fake times, the JWT should be checked
UUID: uuid.New(),
UploadedAt: now,
},
{
ID: 2,
JWT: badLicense2,
Exp: now.Add(48 * time.Hour),
UUID: uuid.New(),
UploadedAt: now,
},
{
ID: 3,
JWT: badLicense3,
Exp: now.Add(48 * time.Hour),
UUID: uuid.New(),
UploadedAt: now,
},
{
ID: 4,
JWT: badLicense4,
Exp: now.Add(48 * time.Hour),
UUID: uuid.New(),
UploadedAt: now,
},
{
ID: 5,
JWT: expectedLicense,
Exp: now.Add(48 * time.Hour),
UUID: uuid.New(),
UploadedAt: now,
},
}, nil)
called := false
ingestURL := fakeServer(t, tallymanHandler(t, expectedLicense, func(req usage.TallymanIngestRequestV1) any {
called = true
assert.Equal(t, deploymentID, req.DeploymentID)
return tallymanAcceptAllHandler(req)
}))
publisher := usage.NewTallymanPublisher(ctx, log, db,
usage.PublisherWithClock(clock),
usage.PublisherWithIngestURL(ingestURL),
usage.PublisherWithLicenseKeys(coderdenttest.Keys),
)
defer publisher.Close()
// Start the publisher with a trap.
tickerTrap := clock.Trap().NewTicker()
defer tickerTrap.Close()
startErr := make(chan error)
go func() {
err := publisher.Start()
testutil.RequireSend(ctx, t, startErr, err)
}()
tickerCall := tickerTrap.MustWait(ctx)
tickerCall.MustRelease(ctx)
require.NoError(t, testutil.RequireReceive(ctx, t, startErr))
// Mock events to be published.
events := []database.UsageEvent{
{
ID: uuid.New().String(),
EventType: string(agplusage.UsageEventTypeDCManagedAgentsV1),
EventData: []byte(jsoninate(t, agplusage.DCManagedAgentsV1{
Count: 1,
})),
},
}
db.EXPECT().SelectUsageEventsForPublishing(gomock.Any(), gomock.Any()).Return(events, nil).Times(1)
db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, params database.UpdateUsageEventsPostPublishParams) error {
assert.Equal(t, []string{events[0].ID}, params.IDs)
assert.Equal(t, []string{""}, params.FailureMessages)
assert.Equal(t, []bool{true}, params.SetPublishedAts)
return nil
},
).Times(1)
// Tick and wait for the reset call.
tickerResetTrap := clock.Trap().TickerReset()
defer tickerResetTrap.Close()
clock.Advance(tickerCall.Duration)
tickerResetTrap.MustWait(ctx).MustRelease(ctx)
// The publisher should have published the events once.
require.True(t, called)
}
func TestPublisherTallymanError(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
log := slogtest.Make(t, nil)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
clock := quartz.NewMock(t)
now := time.Now()
clock.Set(now)
_, licenseJWT := configureMockDeployment(t, db)
const errorMessage = "tallyman error"
var calls int
ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any {
calls++
return usage.TallymanErrorV1{
Message: errorMessage,
}
}))
publisher := usage.NewTallymanPublisher(ctx, log, db,
usage.PublisherWithClock(clock),
usage.PublisherWithIngestURL(ingestURL),
usage.PublisherWithLicenseKeys(coderdenttest.Keys),
)
defer publisher.Close()
// Start the publisher with a trap.
tickerTrap := clock.Trap().NewTicker()
defer tickerTrap.Close()
startErr := make(chan error)
go func() {
err := publisher.Start()
testutil.RequireSend(ctx, t, startErr, err)
}()
tickerCall := tickerTrap.MustWait(ctx)
tickerCall.MustRelease(ctx)
require.NoError(t, testutil.RequireReceive(ctx, t, startErr))
// Mock events to be published.
events := []database.UsageEvent{
{
ID: uuid.New().String(),
EventType: string(agplusage.UsageEventTypeDCManagedAgentsV1),
EventData: []byte(jsoninate(t, agplusage.DCManagedAgentsV1{
Count: 1,
})),
},
}
db.EXPECT().SelectUsageEventsForPublishing(gomock.Any(), gomock.Any()).Return(events, nil).Times(1)
db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, params database.UpdateUsageEventsPostPublishParams) error {
assert.Equal(t, []string{events[0].ID}, params.IDs)
assert.Contains(t, params.FailureMessages[0], errorMessage)
assert.Equal(t, []bool{false}, params.SetPublishedAts)
return nil
},
).Times(1)
// Tick and wait for the reset call.
tickerResetTrap := clock.Trap().TickerReset()
defer tickerResetTrap.Close()
clock.Advance(tickerCall.Duration)
tickerResetTrap.MustWait(ctx).MustRelease(ctx)
// The publisher should have published the events once.
require.Equal(t, 1, calls)
}
func jsoninate(t *testing.T, v any) string {
t.Helper()
if e, ok := v.(agplusage.Event); ok {
v = e.Fields()
}
buf, err := json.Marshal(v)
require.NoError(t, err)
return string(buf)
}
func configureDeployment(ctx context.Context, t *testing.T, db database.Store) (uuid.UUID, string) {
t.Helper()
deploymentID := uuid.New()
err := db.InsertDeploymentID(ctx, deploymentID.String())
require.NoError(t, err)
licenseRaw := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: true,
})
_, err = db.InsertLicense(ctx, database.InsertLicenseParams{
UploadedAt: dbtime.Now(),
JWT: licenseRaw,
Exp: dbtime.Now().Add(48 * time.Hour),
UUID: uuid.New(),
})
require.NoError(t, err)
return deploymentID, licenseRaw
}
func configureMockDeployment(t *testing.T, db *dbmock.MockStore) (uuid.UUID, string) {
t.Helper()
deploymentID := uuid.New()
db.EXPECT().GetDeploymentID(gomock.Any()).Return(deploymentID.String(), nil).Times(1)
licenseRaw := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{
PublishUsageData: true,
})
db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{
{
ID: 1,
UploadedAt: dbtime.Now(),
JWT: licenseRaw,
Exp: dbtime.Now().Add(48 * time.Hour),
UUID: uuid.New(),
},
}, nil)
return deploymentID, licenseRaw
}
func fakeServer(t *testing.T, handler http.Handler) string {
t.Helper()
server := httptest.NewServer(handler)
t.Cleanup(server.Close)
return server.URL
}
func tallymanHandler(t *testing.T, expectLicenseJWT string, handler func(req usage.TallymanIngestRequestV1) any) http.Handler {
t.Helper()
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
t.Helper()
licenseJWT := r.Header.Get(usage.CoderLicenseJWTHeader)
if expectLicenseJWT != "" && !assert.Equal(t, expectLicenseJWT, licenseJWT, "license JWT in request did not match") {
rw.WriteHeader(http.StatusUnauthorized)
err := json.NewEncoder(rw).Encode(usage.TallymanErrorV1{
Message: "license JWT in request did not match",
})
require.NoError(t, err)
return
}
var req usage.TallymanIngestRequestV1
err := json.NewDecoder(r.Body).Decode(&req)
require.NoError(t, err)
resp := handler(req)
switch resp.(type) {
case usage.TallymanErrorV1:
rw.WriteHeader(http.StatusInternalServerError)
default:
rw.WriteHeader(http.StatusOK)
}
err = json.NewEncoder(rw).Encode(resp)
require.NoError(t, err)
})
}
func tallymanAcceptAllHandler(req usage.TallymanIngestRequestV1) usage.TallymanIngestResponseV1 {
acceptedEvents := make([]usage.TallymanIngestAcceptedEventV1, len(req.Events))
for i, event := range req.Events {
acceptedEvents[i].ID = event.ID
}
return usage.TallymanIngestResponseV1{
AcceptedEvents: acceptedEvents,
RejectedEvents: []usage.TallymanIngestRejectedEventV1{},
}
}
+5
View File
@@ -159,6 +159,11 @@ export const RBACResourceActions: Partial<
use: "use the template to initially create a workspace, then workspace lifecycle permissions take over",
view_insights: "view insights",
},
usage_event: {
create: "create a usage event",
read: "read usage events",
update: "update usage events",
},
user: {
create: "create a new user",
delete: "delete an existing user",
+2
View File
@@ -2393,6 +2393,7 @@ export type RBACResource =
| "system"
| "tailnet_coordinator"
| "template"
| "usage_event"
| "user"
| "user_secret"
| "webpush_subscription"
@@ -2434,6 +2435,7 @@ export const RBACResources: RBACResource[] = [
"system",
"tailnet_coordinator",
"template",
"usage_event",
"user",
"user_secret",
"webpush_subscription",