Files
coder/coderd/wspubsub/wspubsub.go
T
Callum Styan 36665e17b2 feat: add WatchAllWorkspaceBuilds endpoint for autostart scaletests (#22057)
This PR adds a `WatchAllWorkspaces` function with `watch-all-workspaces`
endpoint, which can be used to listen on a single global pubsub channel
for _all_ workspace build updates, and makes use of it in the autostart
scaletest.

This negates the need to use a workspace watch pubsub channel _per_
workspace, which has auth overhead associated with each call. This is
especially relevant in situations such as the autostart scaletest, where
we need to start/stop a set of workspaces before we can configure their
autostart config. The overhead associated with all the watch requests
skews the scaletest results and makes it harder to reason about the
performance of the autostart feature itself.

The autostart scaletest also no longer generates its own metrics nor
does it wait for all the workspaces to actually start via autostart. We
should update the scaletest dashboard after both PRs are merged to
measure autostart performance via the new metrics.



The new function/endpoint and its usage in the autostart scaletest are
gated behind an experiment feature flag, this is something we should
discuss whether we want to enable the endpoint in prod by default or
not. If so, we can remove the experiment.

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Callum Styan <callum@coder.com>
2026-03-13 20:37:41 -07:00

112 lines
4.0 KiB
Go

package wspubsub
import (
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/codersdk"
)
// AllWorkspaceEventChannel is a global channel that receives events for all
// workspaces. This is useful when you need to watch N workspaces without
// creating N separate subscriptions.
const AllWorkspaceEventChannel = "workspace_updates:all"
// HandleWorkspaceBuildUpdate wraps a callback to parse WorkspaceBuildUpdate
// messages from the pubsub.
func HandleWorkspaceBuildUpdate(cb func(ctx context.Context, payload codersdk.WorkspaceBuildUpdate, err error)) func(ctx context.Context, message []byte, err error) {
return func(ctx context.Context, message []byte, err error) {
if err != nil {
cb(ctx, codersdk.WorkspaceBuildUpdate{}, xerrors.Errorf("workspace build update pubsub: %w", err))
return
}
var payload codersdk.WorkspaceBuildUpdate
if err := json.Unmarshal(message, &payload); err != nil {
cb(ctx, codersdk.WorkspaceBuildUpdate{}, xerrors.Errorf("unmarshal workspace build update: %w", err))
return
}
cb(ctx, payload, nil)
}
}
// PublishWorkspaceBuildUpdate is a helper to publish a workspace build update
// to the AllWorkspaceEventChannel. This should be called when a build
// completes (succeeds, fails, or is canceled).
func PublishWorkspaceBuildUpdate(_ context.Context, ps pubsub.Pubsub, update codersdk.WorkspaceBuildUpdate) error {
msg, err := json.Marshal(update)
if err != nil {
return xerrors.Errorf("marshal workspace build update: %w", err)
}
if err := ps.Publish(AllWorkspaceEventChannel, msg); err != nil {
return xerrors.Errorf("publish workspace build update: %w", err)
}
return nil
}
// WorkspaceEventChannel can be used to subscribe to events for
// workspaces owned by the provided user ID.
func WorkspaceEventChannel(ownerID uuid.UUID) string {
return fmt.Sprintf("workspace_owner:%s", ownerID)
}
func HandleWorkspaceEvent(cb func(ctx context.Context, payload WorkspaceEvent, err error)) func(ctx context.Context, message []byte, err error) {
return func(ctx context.Context, message []byte, err error) {
if err != nil {
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("workspace event pubsub: %w", err))
return
}
var payload WorkspaceEvent
if err := json.Unmarshal(message, &payload); err != nil {
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("unmarshal workspace event"))
return
}
if err := payload.Validate(); err != nil {
cb(ctx, payload, xerrors.Errorf("validate workspace event"))
return
}
cb(ctx, payload, err)
}
}
type WorkspaceEvent struct {
Kind WorkspaceEventKind `json:"kind"`
WorkspaceID uuid.UUID `json:"workspace_id" format:"uuid"`
// AgentID is only set for WorkspaceEventKindAgent* events
// (excluding AgentTimeout)
AgentID *uuid.UUID `json:"agent_id,omitempty" format:"uuid"`
}
type WorkspaceEventKind string
const (
WorkspaceEventKindStateChange WorkspaceEventKind = "state_change"
WorkspaceEventKindStatsUpdate WorkspaceEventKind = "stats_update"
WorkspaceEventKindMetadataUpdate WorkspaceEventKind = "mtd_update"
WorkspaceEventKindAppHealthUpdate WorkspaceEventKind = "app_health"
WorkspaceEventKindAgentLifecycleUpdate WorkspaceEventKind = "agt_lifecycle_update"
WorkspaceEventKindAgentConnectionUpdate WorkspaceEventKind = "agt_connection_update"
WorkspaceEventKindAgentFirstLogs WorkspaceEventKind = "agt_first_logs"
WorkspaceEventKindAgentLogsOverflow WorkspaceEventKind = "agt_logs_overflow"
WorkspaceEventKindAgentTimeout WorkspaceEventKind = "agt_timeout"
WorkspaceEventKindAgentAppStatusUpdate WorkspaceEventKind = "agt_app_status_update"
)
func (w *WorkspaceEvent) Validate() error {
if w.WorkspaceID == uuid.Nil {
return xerrors.New("workspaceID must be set")
}
if w.Kind == "" {
return xerrors.New("kind must be set")
}
if w.Kind == WorkspaceEventKindAgentLifecycleUpdate && w.AgentID == nil {
return xerrors.New("agentID must be set for Agent events")
}
return nil
}