mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
36665e17b2
This PR adds a `WatchAllWorkspaces` function with `watch-all-workspaces` endpoint, which can be used to listen on a single global pubsub channel for _all_ workspace build updates, and makes use of it in the autostart scaletest. This negates the need to use a workspace watch pubsub channel _per_ workspace, which has auth overhead associated with each call. This is especially relevant in situations such as the autostart scaletest, where we need to start/stop a set of workspaces before we can configure their autostart config. The overhead associated with all the watch requests skews the scaletest results and makes it harder to reason about the performance of the autostart feature itself. The autostart scaletest also no longer generates its own metrics nor does it wait for all the workspaces to actually start via autostart. We should update the scaletest dashboard after both PRs are merged to measure autostart performance via the new metrics. The new function/endpoint and its usage in the autostart scaletest are gated behind an experiment feature flag, this is something we should discuss whether we want to enable the endpoint in prod by default or not. If so, we can remove the experiment. --------- Signed-off-by: Callum Styan <callumstyan@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Callum Styan <callum@coder.com>
112 lines
4.0 KiB
Go
112 lines
4.0 KiB
Go
package wspubsub
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/coder/coder/v2/coderd/database/pubsub"
|
|
"github.com/coder/coder/v2/codersdk"
|
|
)
|
|
|
|
// AllWorkspaceEventChannel is a global channel that receives events for all
|
|
// workspaces. This is useful when you need to watch N workspaces without
|
|
// creating N separate subscriptions.
|
|
const AllWorkspaceEventChannel = "workspace_updates:all"
|
|
|
|
// HandleWorkspaceBuildUpdate wraps a callback to parse WorkspaceBuildUpdate
|
|
// messages from the pubsub.
|
|
func HandleWorkspaceBuildUpdate(cb func(ctx context.Context, payload codersdk.WorkspaceBuildUpdate, err error)) func(ctx context.Context, message []byte, err error) {
|
|
return func(ctx context.Context, message []byte, err error) {
|
|
if err != nil {
|
|
cb(ctx, codersdk.WorkspaceBuildUpdate{}, xerrors.Errorf("workspace build update pubsub: %w", err))
|
|
return
|
|
}
|
|
var payload codersdk.WorkspaceBuildUpdate
|
|
if err := json.Unmarshal(message, &payload); err != nil {
|
|
cb(ctx, codersdk.WorkspaceBuildUpdate{}, xerrors.Errorf("unmarshal workspace build update: %w", err))
|
|
return
|
|
}
|
|
cb(ctx, payload, nil)
|
|
}
|
|
}
|
|
|
|
// PublishWorkspaceBuildUpdate is a helper to publish a workspace build update
|
|
// to the AllWorkspaceEventChannel. This should be called when a build
|
|
// completes (succeeds, fails, or is canceled).
|
|
func PublishWorkspaceBuildUpdate(_ context.Context, ps pubsub.Pubsub, update codersdk.WorkspaceBuildUpdate) error {
|
|
msg, err := json.Marshal(update)
|
|
if err != nil {
|
|
return xerrors.Errorf("marshal workspace build update: %w", err)
|
|
}
|
|
if err := ps.Publish(AllWorkspaceEventChannel, msg); err != nil {
|
|
return xerrors.Errorf("publish workspace build update: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WorkspaceEventChannel can be used to subscribe to events for
|
|
// workspaces owned by the provided user ID.
|
|
func WorkspaceEventChannel(ownerID uuid.UUID) string {
|
|
return fmt.Sprintf("workspace_owner:%s", ownerID)
|
|
}
|
|
|
|
func HandleWorkspaceEvent(cb func(ctx context.Context, payload WorkspaceEvent, err error)) func(ctx context.Context, message []byte, err error) {
|
|
return func(ctx context.Context, message []byte, err error) {
|
|
if err != nil {
|
|
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("workspace event pubsub: %w", err))
|
|
return
|
|
}
|
|
var payload WorkspaceEvent
|
|
if err := json.Unmarshal(message, &payload); err != nil {
|
|
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("unmarshal workspace event"))
|
|
return
|
|
}
|
|
if err := payload.Validate(); err != nil {
|
|
cb(ctx, payload, xerrors.Errorf("validate workspace event"))
|
|
return
|
|
}
|
|
cb(ctx, payload, err)
|
|
}
|
|
}
|
|
|
|
type WorkspaceEvent struct {
|
|
Kind WorkspaceEventKind `json:"kind"`
|
|
WorkspaceID uuid.UUID `json:"workspace_id" format:"uuid"`
|
|
// AgentID is only set for WorkspaceEventKindAgent* events
|
|
// (excluding AgentTimeout)
|
|
AgentID *uuid.UUID `json:"agent_id,omitempty" format:"uuid"`
|
|
}
|
|
|
|
type WorkspaceEventKind string
|
|
|
|
const (
|
|
WorkspaceEventKindStateChange WorkspaceEventKind = "state_change"
|
|
WorkspaceEventKindStatsUpdate WorkspaceEventKind = "stats_update"
|
|
WorkspaceEventKindMetadataUpdate WorkspaceEventKind = "mtd_update"
|
|
WorkspaceEventKindAppHealthUpdate WorkspaceEventKind = "app_health"
|
|
|
|
WorkspaceEventKindAgentLifecycleUpdate WorkspaceEventKind = "agt_lifecycle_update"
|
|
WorkspaceEventKindAgentConnectionUpdate WorkspaceEventKind = "agt_connection_update"
|
|
WorkspaceEventKindAgentFirstLogs WorkspaceEventKind = "agt_first_logs"
|
|
WorkspaceEventKindAgentLogsOverflow WorkspaceEventKind = "agt_logs_overflow"
|
|
WorkspaceEventKindAgentTimeout WorkspaceEventKind = "agt_timeout"
|
|
WorkspaceEventKindAgentAppStatusUpdate WorkspaceEventKind = "agt_app_status_update"
|
|
)
|
|
|
|
func (w *WorkspaceEvent) Validate() error {
|
|
if w.WorkspaceID == uuid.Nil {
|
|
return xerrors.New("workspaceID must be set")
|
|
}
|
|
if w.Kind == "" {
|
|
return xerrors.New("kind must be set")
|
|
}
|
|
if w.Kind == WorkspaceEventKindAgentLifecycleUpdate && w.AgentID == nil {
|
|
return xerrors.New("agentID must be set for Agent events")
|
|
}
|
|
return nil
|
|
}
|