diff --git a/cli/exp_scaletest.go b/cli/exp_scaletest.go index 1d124a2dcc..d46cca9b58 100644 --- a/cli/exp_scaletest.go +++ b/cli/exp_scaletest.go @@ -1732,19 +1732,18 @@ const ( func (r *RootCmd) scaletestAutostart() *serpent.Command { var ( - workspaceCount int64 - workspaceJobTimeout time.Duration - autostartDelay time.Duration - autostartTimeout time.Duration - template string - noCleanup bool + workspaceCount int64 + workspaceJobTimeout time.Duration + autostartBuildTimeout time.Duration + autostartDelay time.Duration + template string + noCleanup bool parameterFlags workspaceParameterFlags tracingFlags = &scaletestTracingFlags{} timeoutStrategy = &timeoutFlags{} cleanupStrategy = newScaletestCleanupStrategy() output = &scaletestOutputFlags{} - prometheusFlags = &scaletestPrometheusFlags{} ) cmd := &serpent.Command{ @@ -1772,7 +1771,7 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { outputs, err := output.parse() if err != nil { - return xerrors.Errorf("could not parse --output flags") + return xerrors.Errorf("parse output flags: %w", err) } tpl, err := parseTemplate(ctx, client, me.OrganizationIDs, template) @@ -1803,15 +1802,41 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { } tracer := tracerProvider.Tracer(scaletestTracerName) - reg := prometheus.NewRegistry() - metrics := autostart.NewMetrics(reg) - setupBarrier := new(sync.WaitGroup) setupBarrier.Add(int(workspaceCount)) - th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy()) + // The workspace-build-updates experiment must be enabled to use + // the centralized pubsub channel for coordinating workspace builds. + experiments, err := client.Experiments(ctx) + if err != nil { + return xerrors.Errorf("get experiments: %w", err) + } + if !experiments.Enabled(codersdk.ExperimentWorkspaceBuildUpdates) { + return xerrors.New("the workspace-build-updates experiment must be enabled to run the autostart scaletest") + } + + workspaceNames := make([]string, 0, workspaceCount) + resultSink := make(chan autostart.RunResult, workspaceCount) for i := range workspaceCount { id := strconv.Itoa(int(i)) + workspaceNames = append(workspaceNames, loadtestutil.GenerateDeterministicWorkspaceName(id)) + } + dispatcher := autostart.NewWorkspaceDispatcher(workspaceNames) + + decoder, err := client.WatchAllWorkspaceBuilds(ctx) + if err != nil { + return xerrors.Errorf("watch all workspace builds: %w", err) + } + defer decoder.Close() + + // Start the dispatcher. It will run in a goroutine and automatically + // close all workspace channels when the build updates channel closes. + dispatcher.Start(ctx, decoder.Chan()) + + th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy()) + for workspaceName, buildUpdatesChannel := range dispatcher.Channels { + id := strings.TrimPrefix(workspaceName, loadtestutil.ScaleTestPrefix+"-") + config := autostart.Config{ User: createusers.Config{ OrganizationID: me.OrganizationIDs[0], @@ -1821,13 +1846,16 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { Request: codersdk.CreateWorkspaceRequest{ TemplateID: tpl.ID, RichParameterValues: richParameters, + // Use deterministic workspace name so we can pre-create the channel. + Name: workspaceName, }, }, - WorkspaceJobTimeout: workspaceJobTimeout, - AutostartDelay: autostartDelay, - AutostartTimeout: autostartTimeout, - Metrics: metrics, - SetupBarrier: setupBarrier, + WorkspaceJobTimeout: workspaceJobTimeout, + AutostartBuildTimeout: autostartBuildTimeout, + AutostartDelay: autostartDelay, + SetupBarrier: setupBarrier, + BuildUpdates: buildUpdatesChannel, + ResultSink: resultSink, } if err := config.Validate(); err != nil { return xerrors.Errorf("validate config: %w", err) @@ -1849,18 +1877,11 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { th.AddRun(autostartTestName, id, runner) } - logger := inv.Logger - prometheusSrvClose := ServeHandler(ctx, logger, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), prometheusFlags.Address, "prometheus") - defer prometheusSrvClose() - defer func() { _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") if err := closeTracing(ctx); err != nil { _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) } - // Wait for prometheus metrics to be scraped - _, _ = fmt.Fprintf(inv.Stderr, "Waiting %s for prometheus metrics to be scraped\n", prometheusFlags.Wait) - <-time.After(prometheusFlags.Wait) }() _, _ = fmt.Fprintln(inv.Stderr, "Running autostart load test...") @@ -1871,31 +1892,40 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err) } - // If the command was interrupted, skip stats. - if notifyCtx.Err() != nil { - return notifyCtx.Err() + // Collect all metrics from the channel. + close(resultSink) + var runResults []autostart.RunResult + for r := range resultSink { + runResults = append(runResults, r) } res := th.Results() - for _, o := range outputs { - err = o.write(res, inv.Stdout) - if err != nil { - return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err) + if res.TotalFail > 0 { + return xerrors.New("load test failed, see above for more details") + } + + _, _ = fmt.Fprintf(inv.Stderr, "\nAll %d autostart builds completed successfully (elapsed: %s)\n", res.TotalRuns, time.Duration(res.Elapsed).Round(time.Millisecond)) + + if len(runResults) > 0 { + results := autostart.NewRunResults(runResults) + for _, out := range outputs { + if err := out.write(results.ToHarnessResults(), inv.Stdout); err != nil { + return xerrors.Errorf("write output: %w", err) + } } } if !noCleanup { _, _ = fmt.Fprintln(inv.Stderr, "\nCleaning up...") - cleanupCtx, cleanupCancel := cleanupStrategy.toContext(ctx) + cleanupCtx, cleanupCancel := cleanupStrategy.toContext(context.Background()) defer cleanupCancel() err = th.Cleanup(cleanupCtx) if err != nil { return xerrors.Errorf("cleanup tests: %w", err) } - } - - if res.TotalFail > 0 { - return xerrors.New("load test failed, see above for more details") + _, _ = fmt.Fprintln(inv.Stderr, "Cleanup complete") + } else { + _, _ = fmt.Fprintln(inv.Stderr, "\nSkipping cleanup (--no-cleanup specified). Resources left running.") } return nil @@ -1918,6 +1948,13 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { Description: "Timeout for workspace jobs (e.g. build, start).", Value: serpent.DurationOf(&workspaceJobTimeout), }, + { + Flag: "autostart-build-timeout", + Env: "CODER_SCALETEST_AUTOSTART_BUILD_TIMEOUT", + Default: "15m", + Description: "Timeout for the autostart build to complete. Must be longer than workspace-job-timeout to account for queueing time in high-load scenarios.", + Value: serpent.DurationOf(&autostartBuildTimeout), + }, { Flag: "autostart-delay", Env: "CODER_SCALETEST_AUTOSTART_DELAY", @@ -1925,13 +1962,6 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { Description: "How long after all the workspaces have been stopped to schedule them to be started again.", Value: serpent.DurationOf(&autostartDelay), }, - { - Flag: "autostart-timeout", - Env: "CODER_SCALETEST_AUTOSTART_TIMEOUT", - Default: "5m", - Description: "Timeout for the autostart build to be initiated after the scheduled start time.", - Value: serpent.DurationOf(&autostartTimeout), - }, { Flag: "template", FlagShorthand: "t", @@ -1950,10 +1980,9 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { cmd.Options = append(cmd.Options, parameterFlags.cliParameters()...) tracingFlags.attach(&cmd.Options) + output.attach(&cmd.Options) timeoutStrategy.attach(&cmd.Options) cleanupStrategy.attach(&cmd.Options) - output.attach(&cmd.Options) - prometheusFlags.attach(&cmd.Options) return cmd } diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index b9113389a5..1a2619d637 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -1134,6 +1134,31 @@ const docTemplate = `{ } } }, + "/experimental/watch-all-workspacebuilds": { + "get": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "produces": [ + "application/json" + ], + "tags": [ + "Workspaces" + ], + "summary": "Watch all workspace builds", + "operationId": "watch-all-workspace-builds", + "responses": { + "101": { + "description": "Switching Protocols" + } + }, + "x-apidocgen": { + "skip": true + } + } + }, "/experiments": { "get": { "security": [ @@ -15226,7 +15251,8 @@ const docTemplate = `{ "web-push", "oauth2", "agents", - "mcp-server-http" + "mcp-server-http", + "workspace-build-updates" ], "x-enum-comments": { "ExperimentAgents": "Enables agent-powered chat functionality.", @@ -15236,6 +15262,7 @@ const docTemplate = `{ "ExperimentNotifications": "Sends notifications via SMTP and webhooks following certain events.", "ExperimentOAuth2": "Enables OAuth2 provider functionality.", "ExperimentWebPush": "Enables web push notifications through the browser.", + "ExperimentWorkspaceBuildUpdates": "Enables publishing workspace build updates to the all builds pubsub channel.", "ExperimentWorkspaceUsage": "Enables the new workspace usage tracking." }, "x-enum-descriptions": [ @@ -15246,7 +15273,8 @@ const docTemplate = `{ "Enables web push notifications through the browser.", "Enables OAuth2 provider functionality.", "Enables agent-powered chat functionality.", - "Enables the MCP HTTP server functionality." + "Enables the MCP HTTP server functionality.", + "Enables publishing workspace build updates to the all builds pubsub channel." ], "x-enum-varnames": [ "ExperimentExample", @@ -15256,7 +15284,8 @@ const docTemplate = `{ "ExperimentWebPush", "ExperimentOAuth2", "ExperimentAgents", - "ExperimentMCPServerHTTP" + "ExperimentMCPServerHTTP", + "ExperimentWorkspaceBuildUpdates" ] }, "codersdk.ExternalAPIKeyScopes": { diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 911664bc15..69576767ad 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -983,6 +983,27 @@ } } }, + "/experimental/watch-all-workspacebuilds": { + "get": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "produces": ["application/json"], + "tags": ["Workspaces"], + "summary": "Watch all workspace builds", + "operationId": "watch-all-workspace-builds", + "responses": { + "101": { + "description": "Switching Protocols" + } + }, + "x-apidocgen": { + "skip": true + } + } + }, "/experiments": { "get": { "security": [ @@ -13744,7 +13765,8 @@ "web-push", "oauth2", "agents", - "mcp-server-http" + "mcp-server-http", + "workspace-build-updates" ], "x-enum-comments": { "ExperimentAgents": "Enables agent-powered chat functionality.", @@ -13754,6 +13776,7 @@ "ExperimentNotifications": "Sends notifications via SMTP and webhooks following certain events.", "ExperimentOAuth2": "Enables OAuth2 provider functionality.", "ExperimentWebPush": "Enables web push notifications through the browser.", + "ExperimentWorkspaceBuildUpdates": "Enables publishing workspace build updates to the all builds pubsub channel.", "ExperimentWorkspaceUsage": "Enables the new workspace usage tracking." }, "x-enum-descriptions": [ @@ -13764,7 +13787,8 @@ "Enables web push notifications through the browser.", "Enables OAuth2 provider functionality.", "Enables agent-powered chat functionality.", - "Enables the MCP HTTP server functionality." + "Enables the MCP HTTP server functionality.", + "Enables publishing workspace build updates to the all builds pubsub channel." ], "x-enum-varnames": [ "ExperimentExample", @@ -13774,7 +13798,8 @@ "ExperimentWebPush", "ExperimentOAuth2", "ExperimentAgents", - "ExperimentMCPServerHTTP" + "ExperimentMCPServerHTTP", + "ExperimentWorkspaceBuildUpdates" ] }, "codersdk.ExternalAPIKeyScopes": { diff --git a/coderd/coderd.go b/coderd/coderd.go index 4bd590753e..5699da43d9 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -1204,6 +1204,13 @@ func New(options *Options) *API { // MCP HTTP transport endpoint with mandatory authentication r.Mount("/http", api.mcpHTTPHandler()) }) + r.Route("/watch-all-workspacebuilds", func(r chi.Router) { + r.Use( + apiKeyMiddleware, + httpmw.RequireExperiment(api.Experiments, codersdk.ExperimentWorkspaceBuildUpdates), + ) + r.Get("/", api.watchAllWorkspaceBuilds) + }) }) r.Route("/api/v2", func(r chi.Router) { diff --git a/coderd/provisionerdserver/provisionerdserver.go b/coderd/provisionerdserver/provisionerdserver.go index af60397df9..7fcdc26060 100644 --- a/coderd/provisionerdserver/provisionerdserver.go +++ b/coderd/provisionerdserver/provisionerdserver.go @@ -1289,6 +1289,21 @@ func (s *server) FailJob(ctx context.Context, failJob *proto.FailedJob) (*proto. if err != nil { return nil, xerrors.Errorf("publish workspace update: %w", err) } + + // Publish workspace build update to the all builds channel if the experiment is enabled. + if s.Experiments.Enabled(codersdk.ExperimentWorkspaceBuildUpdates) { + err = wspubsub.PublishWorkspaceBuildUpdate(ctx, s.Pubsub, codersdk.WorkspaceBuildUpdate{ + WorkspaceID: workspace.ID, + WorkspaceName: workspace.Name, + BuildID: build.ID, + Transition: string(build.Transition), + JobStatus: string(database.ProvisionerJobStatusFailed), + BuildNumber: build.BuildNumber, + }) + if err != nil { + s.Logger.Warn(ctx, "failed to publish workspace build update", slog.Error(err)) + } + } case *proto.FailedJob_TemplateImport_: } @@ -2489,6 +2504,21 @@ func (s *server) completeWorkspaceBuildJob(ctx context.Context, job database.Pro return xerrors.Errorf("update workspace: %w", err) } + // Publish workspace build update to the all builds channel if the experiment is enabled. + if s.Experiments.Enabled(codersdk.ExperimentWorkspaceBuildUpdates) { + err = wspubsub.PublishWorkspaceBuildUpdate(ctx, s.Pubsub, codersdk.WorkspaceBuildUpdate{ + WorkspaceID: workspace.ID, + WorkspaceName: workspace.Name, + BuildID: workspaceBuild.ID, + Transition: string(workspaceBuild.Transition), + JobStatus: string(database.ProvisionerJobStatusSucceeded), + BuildNumber: workspaceBuild.BuildNumber, + }) + if err != nil { + s.Logger.Warn(ctx, "failed to publish workspace build update", slog.Error(err)) + } + } + if input.PrebuiltWorkspaceBuildStage == sdkproto.PrebuiltWorkspaceBuildStage_CLAIM { s.Logger.Info(ctx, "workspace prebuild successfully claimed by user", slog.F("workspace_id", workspace.ID)) diff --git a/coderd/workspacebuilds.go b/coderd/workspacebuilds.go index 80602102c7..453333d1c1 100644 --- a/coderd/workspacebuilds.go +++ b/coderd/workspacebuilds.go @@ -766,6 +766,21 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques WorkspaceID: workspace.ID, }) + // Publish workspace build update to the all builds channel if the experiment is enabled. + if api.Experiments.Enabled(codersdk.ExperimentWorkspaceBuildUpdates) { + err = wspubsub.PublishWorkspaceBuildUpdate(ctx, api.Pubsub, codersdk.WorkspaceBuildUpdate{ + WorkspaceID: workspace.ID, + WorkspaceName: workspace.Name, + BuildID: workspaceBuild.ID, + Transition: string(workspaceBuild.Transition), + JobStatus: string(database.ProvisionerJobStatusCanceled), + BuildNumber: workspaceBuild.BuildNumber, + }) + if err != nil { + api.Logger.Warn(ctx, "failed to publish workspace build update", slog.Error(err)) + } + } + httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{ Message: "Job has been marked as canceled...", }) diff --git a/coderd/workspaces.go b/coderd/workspaces.go index 4c1359f9de..76b42b36ec 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -43,6 +43,8 @@ import ( "github.com/coder/coder/v2/coderd/wspubsub" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/coder/v2/codersdk/wsjson" + "github.com/coder/websocket" ) var ( @@ -2175,6 +2177,78 @@ func (api *API) watchWorkspace( } } +// @Summary Watch all workspace builds +// @ID watch-all-workspace-builds +// @Security CoderSessionToken +// @Produce json +// @Tags Workspaces +// @Success 101 +// @Router /experimental/watch-all-workspacebuilds [get] +// @x-apidocgen {"skip": true} +func (api *API) watchAllWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Buffer enough updates to avoid blocking the pubsub callback while we're + // accepting the WebSocket connection. Accepting the connection signals to + // the client that the server is subscribed and ready to forward events. + updates := make(chan codersdk.WorkspaceBuildUpdate, 256) + + cancelSubscribe, err := api.Pubsub.SubscribeWithErr(wspubsub.AllWorkspaceEventChannel, + wspubsub.HandleWorkspaceBuildUpdate( + func(_ context.Context, update codersdk.WorkspaceBuildUpdate, err error) { + if err != nil { + api.Logger.Warn(ctx, "workspace build update subscription error", slog.Error(err)) + return + } + select { + case updates <- update: + default: + api.Logger.Warn(ctx, "workspace build update dropped, client too slow") + } + })) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error subscribing to workspace build events.", + Detail: err.Error(), + }) + return + } + defer cancelSubscribe() + + conn, err := websocket.Accept(rw, r, nil) + if err != nil { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: "Failed to accept WebSocket.", + Detail: err.Error(), + }) + return + } + defer conn.Close(websocket.StatusNormalClosure, "done") + + // CloseRead starts a goroutine to read and discard messages from the client, + // including Pong messages sent in response to our Ping heartbeats. + _ = conn.CloseRead(context.Background()) + + ctx, cancel := context.WithCancel(ctx) + go httpapi.HeartbeatClose(ctx, api.Logger, cancel, conn) + defer cancel() + + enc := wsjson.NewEncoder[codersdk.WorkspaceBuildUpdate](conn, websocket.MessageText) + for { + select { + case <-ctx.Done(): + return + case update, ok := <-updates: + if !ok { + return + } + if err := enc.Encode(update); err != nil { + return + } + } + } +} + // @Summary Get workspace timings by ID // @ID get-workspace-timings-by-id // @Security CoderSessionToken diff --git a/coderd/workspaces_test.go b/coderd/workspaces_test.go index 8acc8f1b5f..98f3f1e0a0 100644 --- a/coderd/workspaces_test.go +++ b/coderd/workspaces_test.go @@ -3674,6 +3674,113 @@ func TestWorkspaceWatcher(t *testing.T) { wait("second is for the build cancel", nil) } +func TestWatchAllWorkspaceBuilds(t *testing.T) { + t.Parallel() + + // Enable the workspace build updates experiment. + client, closer := coderdtest.NewWithProvisionerCloser(t, &coderdtest.Options{ + IncludeProvisionerDaemon: true, + DeploymentValues: coderdtest.DeploymentValues(t, func(dv *codersdk.DeploymentValues) { + dv.Experiments = []string{string(codersdk.ExperimentWorkspaceBuildUpdates)} + }), + }) + defer closer.Close() + user := coderdtest.CreateFirstUser(t, client) + + // Create a simple template version. + version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ + Parse: echo.ParseComplete, + ProvisionPlan: echo.PlanComplete, + ProvisionGraph: []*proto.Response{{ + Type: &proto.Response_Graph{ + Graph: &proto.GraphComplete{ + Resources: []*proto.Resource{{ + Name: "example", + Type: "aws_instance", + }}, + }, + }, + }}, + }) + coderdtest.AwaitTemplateVersionJobCompleted(t, client, version.ID) + template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + // Subscribe to all workspace build updates via SSE BEFORE creating workspaces + // so we can use it to wait for the initial builds. + decoder, err := client.WatchAllWorkspaceBuilds(ctx) + require.NoError(t, err) + defer decoder.Close() + + updates := decoder.Chan() + logger := testutil.Logger(t).Named(t.Name()) + + // Helper to wait for a specific update. + waitForUpdate := func(event string, workspaceID uuid.UUID, expectedTransition, expectedStatus string) codersdk.WorkspaceBuildUpdate { + t.Helper() + for { + select { + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for event", event) + return codersdk.WorkspaceBuildUpdate{} + case update, ok := <-updates: + if !ok { + require.FailNow(t, "updates channel closed", event) + return codersdk.WorkspaceBuildUpdate{} + } + logger.Info(ctx, "received workspace build update", + slog.F("event", event), + slog.F("workspace_id", update.WorkspaceID), + slog.F("build_id", update.BuildID), + slog.F("transition", update.Transition), + slog.F("job_status", update.JobStatus), + slog.F("build_number", update.BuildNumber)) + if update.WorkspaceID == workspaceID && update.Transition == expectedTransition && update.JobStatus == expectedStatus { + return update + } + // Keep waiting if this isn't the update we're looking for. + logger.Info(ctx, "skipping update, not matching expected", + slog.F("expected_workspace_id", workspaceID), + slog.F("expected_transition", expectedTransition), + slog.F("expected_status", expectedStatus)) + } + } + } + + // Create two workspaces and wait for their initial builds via the SSE channel. + workspace1 := coderdtest.CreateWorkspace(t, client, template.ID) + update := waitForUpdate("workspace1 initial build", workspace1.ID, "start", "succeeded") + require.Equal(t, workspace1.ID, update.WorkspaceID) + require.Equal(t, int32(1), update.BuildNumber) + + workspace2 := coderdtest.CreateWorkspace(t, client, template.ID) + update = waitForUpdate("workspace2 initial build", workspace2.ID, "start", "succeeded") + require.Equal(t, workspace2.ID, update.WorkspaceID) + require.Equal(t, int32(1), update.BuildNumber) + + // Stop workspace 1. + _ = coderdtest.CreateWorkspaceBuild(t, client, workspace1, database.WorkspaceTransitionStop) + update = waitForUpdate("workspace1 stop", workspace1.ID, "stop", "succeeded") + require.Equal(t, workspace1.ID, update.WorkspaceID) + + // Stop workspace 2. + _ = coderdtest.CreateWorkspaceBuild(t, client, workspace2, database.WorkspaceTransitionStop) + update = waitForUpdate("workspace2 stop", workspace2.ID, "stop", "succeeded") + require.Equal(t, workspace2.ID, update.WorkspaceID) + + // Start workspace 1 again. + _ = coderdtest.CreateWorkspaceBuild(t, client, workspace1, database.WorkspaceTransitionStart) + update = waitForUpdate("workspace1 start", workspace1.ID, "start", "succeeded") + require.Equal(t, workspace1.ID, update.WorkspaceID) + + // Start workspace 2 again. + _ = coderdtest.CreateWorkspaceBuild(t, client, workspace2, database.WorkspaceTransitionStart) + update = waitForUpdate("workspace2 start", workspace2.ID, "start", "succeeded") + require.Equal(t, workspace2.ID, update.WorkspaceID) +} + func mustLocation(t *testing.T, location string) *time.Location { t.Helper() loc, err := time.LoadLocation(location) diff --git a/coderd/wspubsub/wspubsub.go b/coderd/wspubsub/wspubsub.go index 1175ce5830..c648022e1d 100644 --- a/coderd/wspubsub/wspubsub.go +++ b/coderd/wspubsub/wspubsub.go @@ -7,8 +7,47 @@ import ( "github.com/google/uuid" "golang.org/x/xerrors" + + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/codersdk" ) +// AllWorkspaceEventChannel is a global channel that receives events for all +// workspaces. This is useful when you need to watch N workspaces without +// creating N separate subscriptions. +const AllWorkspaceEventChannel = "workspace_updates:all" + +// HandleWorkspaceBuildUpdate wraps a callback to parse WorkspaceBuildUpdate +// messages from the pubsub. +func HandleWorkspaceBuildUpdate(cb func(ctx context.Context, payload codersdk.WorkspaceBuildUpdate, err error)) func(ctx context.Context, message []byte, err error) { + return func(ctx context.Context, message []byte, err error) { + if err != nil { + cb(ctx, codersdk.WorkspaceBuildUpdate{}, xerrors.Errorf("workspace build update pubsub: %w", err)) + return + } + var payload codersdk.WorkspaceBuildUpdate + if err := json.Unmarshal(message, &payload); err != nil { + cb(ctx, codersdk.WorkspaceBuildUpdate{}, xerrors.Errorf("unmarshal workspace build update: %w", err)) + return + } + cb(ctx, payload, nil) + } +} + +// PublishWorkspaceBuildUpdate is a helper to publish a workspace build update +// to the AllWorkspaceEventChannel. This should be called when a build +// completes (succeeds, fails, or is canceled). +func PublishWorkspaceBuildUpdate(_ context.Context, ps pubsub.Pubsub, update codersdk.WorkspaceBuildUpdate) error { + msg, err := json.Marshal(update) + if err != nil { + return xerrors.Errorf("marshal workspace build update: %w", err) + } + if err := ps.Publish(AllWorkspaceEventChannel, msg); err != nil { + return xerrors.Errorf("publish workspace build update: %w", err) + } + return nil +} + // WorkspaceEventChannel can be used to subscribe to events for // workspaces owned by the provided user ID. func WorkspaceEventChannel(ownerID uuid.UUID) string { diff --git a/codersdk/deployment.go b/codersdk/deployment.go index 534ed5f4c9..cbe582f806 100644 --- a/codersdk/deployment.go +++ b/codersdk/deployment.go @@ -4298,14 +4298,15 @@ type Experiment string const ( // Add new experiments here! - ExperimentExample Experiment = "example" // This isn't used for anything. - ExperimentAutoFillParameters Experiment = "auto-fill-parameters" // This should not be taken out of experiments until we have redesigned the feature. - ExperimentNotifications Experiment = "notifications" // Sends notifications via SMTP and webhooks following certain events. - ExperimentWorkspaceUsage Experiment = "workspace-usage" // Enables the new workspace usage tracking. - ExperimentWebPush Experiment = "web-push" // Enables web push notifications through the browser. - ExperimentOAuth2 Experiment = "oauth2" // Enables OAuth2 provider functionality. - ExperimentAgents Experiment = "agents" // Enables agent-powered chat functionality. - ExperimentMCPServerHTTP Experiment = "mcp-server-http" // Enables the MCP HTTP server functionality. + ExperimentExample Experiment = "example" // This isn't used for anything. + ExperimentAutoFillParameters Experiment = "auto-fill-parameters" // This should not be taken out of experiments until we have redesigned the feature. + ExperimentNotifications Experiment = "notifications" // Sends notifications via SMTP and webhooks following certain events. + ExperimentWorkspaceUsage Experiment = "workspace-usage" // Enables the new workspace usage tracking. + ExperimentWebPush Experiment = "web-push" // Enables web push notifications through the browser. + ExperimentOAuth2 Experiment = "oauth2" // Enables OAuth2 provider functionality. + ExperimentAgents Experiment = "agents" // Enables agent-powered chat functionality. + ExperimentMCPServerHTTP Experiment = "mcp-server-http" // Enables the MCP HTTP server functionality. + ExperimentWorkspaceBuildUpdates Experiment = "workspace-build-updates" // Enables publishing workspace build updates to the all builds pubsub channel. ) func (e Experiment) DisplayName() string { @@ -4326,6 +4327,8 @@ func (e Experiment) DisplayName() string { return "Agents" case ExperimentMCPServerHTTP: return "MCP HTTP Server Functionality" + case ExperimentWorkspaceBuildUpdates: + return "Workspace Build Updates Channel" default: // Split on hyphen and convert to title case // e.g. "web-push" -> "Web Push", "mcp-server-http" -> "Mcp Server Http" @@ -4344,6 +4347,7 @@ var ExperimentsKnown = Experiments{ ExperimentOAuth2, ExperimentAgents, ExperimentMCPServerHTTP, + ExperimentWorkspaceBuildUpdates, } // ExperimentsSafe should include all experiments that are safe for diff --git a/codersdk/workspaces.go b/codersdk/workspaces.go index bf588722b8..75e5e1ace8 100644 --- a/codersdk/workspaces.go +++ b/codersdk/workspaces.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/http/cookiejar" "strings" "time" @@ -13,6 +14,8 @@ import ( "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/tracing" + "github.com/coder/coder/v2/codersdk/wsjson" + "github.com/coder/websocket" ) type AutomaticUpdates string @@ -788,6 +791,62 @@ func (c *Client) WorkspaceExternalAgentCredentials(ctx context.Context, workspac return credentials, json.NewDecoder(res.Body).Decode(&credentials) } +// WorkspaceBuildUpdate contains information about a workspace build state change. +// This is published via the /watch-all-workspacebuilds SSE endpoint when the +// workspace-build-updates experiment is enabled. +type WorkspaceBuildUpdate struct { + WorkspaceID uuid.UUID `json:"workspace_id" format:"uuid"` + WorkspaceName string `json:"workspace_name"` + BuildID uuid.UUID `json:"build_id" format:"uuid"` + // Transition is the workspace transition type: "start", "stop", or "delete". + Transition string `json:"transition"` + // JobStatus is the provisioner job status: "pending", "running", + // "succeeded", "canceling", "canceled", or "failed". + JobStatus string `json:"job_status"` + BuildNumber int32 `json:"build_number"` +} + +// WatchAllWorkspaceBuilds watches for workspace build updates across all workspaces. +// This requires the workspace-build-updates experiment to be enabled. +// The returned decoder should be closed by calling Close() when done to properly +// clean up the WebSocket connection. +func (c *Client) WatchAllWorkspaceBuilds(ctx context.Context) (*wsjson.Decoder[WorkspaceBuildUpdate], error) { + ctx, span := tracing.StartSpan(ctx) + defer span.End() + + serverURL, err := c.URL.Parse("/api/experimental/watch-all-workspacebuilds") + if err != nil { + return nil, xerrors.Errorf("parse url: %w", err) + } + + jar, err := cookiejar.New(nil) + if err != nil { + return nil, xerrors.Errorf("create cookie jar: %w", err) + } + jar.SetCookies(serverURL, []*http.Cookie{{ + Name: SessionTokenCookie, + Value: c.SessionToken(), + }}) + httpClient := &http.Client{ + Jar: jar, + Transport: c.HTTPClient.Transport, + } + + conn, res, err := websocket.Dial(ctx, serverURL.String(), &websocket.DialOptions{ + HTTPClient: httpClient, + CompressionMode: websocket.CompressionDisabled, + }) + if err != nil { + if res == nil { + return nil, err + } + return nil, ReadBodyAsError(res) + } + + d := wsjson.NewDecoder[WorkspaceBuildUpdate](conn, websocket.MessageText, c.logger) + return d, nil +} + // WorkspaceAvailableUsers returns users available for workspace creation. // This is used to populate the owner dropdown when creating workspaces for // other users. diff --git a/docs/reference/api/schemas.md b/docs/reference/api/schemas.md index 695395553d..3b563d3bb9 100644 --- a/docs/reference/api/schemas.md +++ b/docs/reference/api/schemas.md @@ -3998,9 +3998,9 @@ CreateWorkspaceRequest provides options for creating a new workspace. Only one o #### Enumerated Values -| Value(s) | -|--------------------------------------------------------------------------------------------------------------------------| -| `agents`, `auto-fill-parameters`, `example`, `mcp-server-http`, `notifications`, `oauth2`, `web-push`, `workspace-usage` | +| Value(s) | +|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| `agents`, `auto-fill-parameters`, `example`, `mcp-server-http`, `notifications`, `oauth2`, `web-push`, `workspace-build-updates`, `workspace-usage` | ## codersdk.ExternalAPIKeyScopes diff --git a/scaletest/autostart/config.go b/scaletest/autostart/config.go index ad804a0b89..15757f22e8 100644 --- a/scaletest/autostart/config.go +++ b/scaletest/autostart/config.go @@ -29,15 +29,24 @@ type Config struct { // to schedule them to be started again. AutostartDelay time.Duration `json:"autostart_delay"` - // AutostartTimeout is how long to wait for the autostart build to be - // initiated after the scheduled time. - AutostartTimeout time.Duration `json:"autostart_timeout"` - - Metrics *Metrics `json:"-"` + // AutostartBuildTimeout is how long to wait for the autostart build to + // complete after it has been triggered. This should be longer than + // WorkspaceJobTimeout to account for potential queueing time in high-load + // scenarios where provisioner capacity is limited. + AutostartBuildTimeout time.Duration `json:"autostart_build_timeout"` // SetupBarrier is used to ensure all runners own stopped workspaces // before setting the autostart schedule on each. SetupBarrier *sync.WaitGroup `json:"-"` + + // BuildUpdates is a channel that receives workspace build updates for + // this specific workspace. The channel is pre-created and keyed by the + // deterministic workspace name. + BuildUpdates <-chan codersdk.WorkspaceBuildUpdate `json:"-"` + + // ResultSink is a channel where the runner sends its result upon completion. + // This allows the CLI to aggregate results from all concurrent runners. + ResultSink chan<- RunResult `json:"-"` } func (c Config) Validate() error { @@ -55,6 +64,10 @@ func (c Config) Validate() error { return xerrors.New("setup barrier must be set") } + if c.BuildUpdates == nil { + return xerrors.New("build updates channel must be set") + } + if c.WorkspaceJobTimeout <= 0 { return xerrors.New("workspace_job_timeout must be greater than 0") } @@ -63,12 +76,13 @@ func (c Config) Validate() error { return xerrors.New("autostart_delay must be at least 2 minutes") } - if c.AutostartTimeout <= 0 { - return xerrors.New("autostart_timeout must be greater than 0") + if c.AutostartBuildTimeout <= 0 { + return xerrors.New("autostart_build_timeout must be greater than 0") } - if c.Metrics == nil { - return xerrors.New("metrics must be set") + if c.AutostartBuildTimeout <= c.WorkspaceJobTimeout { + return xerrors.Errorf("autostart_build_timeout (%s) must be greater than workspace_job_timeout (%s) to account for scheduling delay and queueing time", + c.AutostartBuildTimeout, c.WorkspaceJobTimeout) } return nil diff --git a/scaletest/autostart/dispatcher.go b/scaletest/autostart/dispatcher.go new file mode 100644 index 0000000000..e563f53c4a --- /dev/null +++ b/scaletest/autostart/dispatcher.go @@ -0,0 +1,52 @@ +package autostart + +import ( + "context" + + "github.com/coder/coder/v2/codersdk" +) + +// WorkspaceDispatcher manages the distribution of workspace build updates from +// a single source channel to multiple per-workspace channels. +type WorkspaceDispatcher struct { + // Channels maps workspace names to their respective update channels. + Channels map[string]chan codersdk.WorkspaceBuildUpdate +} + +// NewWorkspaceDispatcher creates a new dispatcher for the given workspace names. +// Each workspace gets a buffered channel that can hold all expected updates during +// the autostart test lifecycle: +// - initial build (~3 updates: pending, running, succeeded) +// - stop build (~3 updates: pending, running, succeeded) +// - autostart build (~3 updates: pending, running, succeeded) +// Total: ~9 updates. We use a buffer of 16 to provide headroom for timing variations. +func NewWorkspaceDispatcher(workspaceNames []string) *WorkspaceDispatcher { + channels := make(map[string]chan codersdk.WorkspaceBuildUpdate, len(workspaceNames)) + for _, name := range workspaceNames { + channels[name] = make(chan codersdk.WorkspaceBuildUpdate, 16) + } + return &WorkspaceDispatcher{ + Channels: channels, + } +} + +// Start begins listening for workspace build updates and dispatching them to +// the appropriate workspace channels. It runs in a goroutine and returns +// immediately. When the source channel closes, all workspace channels are +// closed automatically. +func (d *WorkspaceDispatcher) Start(ctx context.Context, source <-chan codersdk.WorkspaceBuildUpdate) { + go func() { + for update := range source { + if ch, ok := d.Channels[update.WorkspaceName]; ok { + select { + case ch <- update: + case <-ctx.Done(): + return + } + } + } + for _, ch := range d.Channels { + close(ch) + } + }() +} diff --git a/scaletest/autostart/dispatcher_test.go b/scaletest/autostart/dispatcher_test.go new file mode 100644 index 0000000000..03ab024211 --- /dev/null +++ b/scaletest/autostart/dispatcher_test.go @@ -0,0 +1,204 @@ +package autostart_test + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/scaletest/autostart" + "github.com/coder/coder/v2/testutil" +) + +func TestWorkspaceDispatcher(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + + // Create test workspace names. + workspaceNames := []string{"workspace-1", "workspace-2", "workspace-3"} + + // Create dispatcher. + dispatcher := autostart.NewWorkspaceDispatcher(workspaceNames) + require.Len(t, dispatcher.Channels, 3) + + // Create source channel for updates. + source := make(chan codersdk.WorkspaceBuildUpdate, 10) + + // Start the dispatcher. + dispatcher.Start(ctx, source) + + // Send updates for each workspace. + updates := []codersdk.WorkspaceBuildUpdate{ + { + WorkspaceName: "workspace-1", + Transition: "start", + JobStatus: "pending", + BuildNumber: 1, + }, + { + WorkspaceName: "workspace-2", + Transition: "start", + JobStatus: "running", + BuildNumber: 1, + }, + { + WorkspaceName: "workspace-3", + Transition: "start", + JobStatus: "succeeded", + BuildNumber: 1, + }, + { + WorkspaceName: "workspace-1", + Transition: "start", + JobStatus: "succeeded", + BuildNumber: 1, + }, + } + + for _, update := range updates { + source <- update + } + + // Verify each workspace receives its updates. + receivedWorkspace1 := <-dispatcher.Channels["workspace-1"] + require.Equal(t, "workspace-1", receivedWorkspace1.WorkspaceName) + require.Equal(t, "pending", receivedWorkspace1.JobStatus) + + receivedWorkspace2 := <-dispatcher.Channels["workspace-2"] + require.Equal(t, "workspace-2", receivedWorkspace2.WorkspaceName) + require.Equal(t, "running", receivedWorkspace2.JobStatus) + + receivedWorkspace3 := <-dispatcher.Channels["workspace-3"] + require.Equal(t, "workspace-3", receivedWorkspace3.WorkspaceName) + require.Equal(t, "succeeded", receivedWorkspace3.JobStatus) + + // workspace-1 should have another update. + receivedWorkspace1Again := <-dispatcher.Channels["workspace-1"] + require.Equal(t, "workspace-1", receivedWorkspace1Again.WorkspaceName) + require.Equal(t, "succeeded", receivedWorkspace1Again.JobStatus) + + // Close the source channel. + close(source) + + // All workspace channels should close. + for name, ch := range dispatcher.Channels { + select { + case _, ok := <-ch: + require.False(t, ok, "channel for %s should be closed", name) + case <-time.After(time.Second): + t.Fatalf("timeout waiting for channel %s to close", name) + } + } +} + +func TestWorkspaceDispatcher_UnknownWorkspace(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + + // Create dispatcher with known workspaces. + workspaceNames := []string{"workspace-1", "workspace-2"} + dispatcher := autostart.NewWorkspaceDispatcher(workspaceNames) + + // Create source channel. + source := make(chan codersdk.WorkspaceBuildUpdate, 10) + + // Start the dispatcher. + dispatcher.Start(ctx, source) + + // Send update for unknown workspace - should be ignored. + source <- codersdk.WorkspaceBuildUpdate{ + WorkspaceName: "unknown-workspace", + Transition: "start", + JobStatus: "pending", + BuildNumber: 1, + } + + // Send update for known workspace. + source <- codersdk.WorkspaceBuildUpdate{ + WorkspaceName: "workspace-1", + Transition: "start", + JobStatus: "succeeded", + BuildNumber: 1, + } + + // workspace-1 should receive its update. + received := <-dispatcher.Channels["workspace-1"] + require.Equal(t, "workspace-1", received.WorkspaceName) + require.Equal(t, "succeeded", received.JobStatus) + + // Close source and verify channels close. + close(source) + + for name, ch := range dispatcher.Channels { + select { + case _, ok := <-ch: + require.False(t, ok, "channel for %s should be closed", name) + case <-time.After(time.Second): + t.Fatalf("timeout waiting for channel %s to close", name) + } + } +} + +func TestWorkspaceDispatcher_ContextCancellation(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + + // Create dispatcher. + workspaceNames := []string{"workspace-1"} + dispatcher := autostart.NewWorkspaceDispatcher(workspaceNames) + + // Create source channel. + source := make(chan codersdk.WorkspaceBuildUpdate, 10) + + // Start the dispatcher. + dispatcher.Start(ctx, source) + + // Fill up the channel buffer. + for i := int32(0); i < 20; i++ { + source <- codersdk.WorkspaceBuildUpdate{ + WorkspaceID: uuid.New(), + WorkspaceName: "workspace-1", + Transition: "start", + JobStatus: "pending", + BuildNumber: i, + } + } + + // Cancel context - dispatcher should stop trying to send. + cancel() + + // Give dispatcher time to react to cancellation. + time.Sleep(100 * time.Millisecond) + + // Dispatcher goroutine should have stopped, so closing source shouldn't deadlock. + close(source) + + // Channels might not be closed yet since source was closed after cancellation, + // but the important thing is that we don't deadlock. + // Just drain the channel if there's anything. + drained := 0 + for { + select { + case _, ok := <-dispatcher.Channels["workspace-1"]: + if !ok { + // Channel closed. + return + } + drained++ + if drained > 100 { + t.Fatal("drained too many messages, dispatcher not respecting context cancellation") + } + case <-time.After(time.Second): + // Timeout is OK - channel may or may not be closed. + return + } + } +} diff --git a/scaletest/autostart/metrics.go b/scaletest/autostart/metrics.go deleted file mode 100644 index d1ff94e789..0000000000 --- a/scaletest/autostart/metrics.go +++ /dev/null @@ -1,65 +0,0 @@ -package autostart - -import ( - "time" - - "github.com/prometheus/client_golang/prometheus" -) - -type Metrics struct { - AutostartJobCreationLatencySeconds prometheus.HistogramVec - AutostartJobAcquiredLatencySeconds prometheus.HistogramVec - AutostartTotalLatencySeconds prometheus.HistogramVec - AutostartErrorsTotal prometheus.CounterVec -} - -func NewMetrics(reg prometheus.Registerer) *Metrics { - m := &Metrics{ - AutostartJobCreationLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "coderd", - Subsystem: "scaletest", - Name: "autostart_job_creation_latency_seconds", - Help: "Time from when the workspace is scheduled to be autostarted to when the autostart job has been created.", - }, []string{"username", "workspace_name"}), - AutostartJobAcquiredLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "coderd", - Subsystem: "scaletest", - Name: "autostart_job_acquired_latency_seconds", - Help: "Time from when the workspace is scheduled to be autostarted to when the job has been acquired by a provisioner daemon.", - }, []string{"username", "workspace_name"}), - AutostartTotalLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "coderd", - Subsystem: "scaletest", - Name: "autostart_total_latency_seconds", - Help: "Time from when the workspace is scheduled to be autostarted to when the autostart build has finished.", - }, []string{"username", "workspace_name"}), - AutostartErrorsTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "coderd", - Subsystem: "scaletest", - Name: "autostart_errors_total", - Help: "Total number of autostart errors", - }, []string{"username", "action"}), - } - - reg.MustRegister(m.AutostartTotalLatencySeconds) - reg.MustRegister(m.AutostartJobCreationLatencySeconds) - reg.MustRegister(m.AutostartJobAcquiredLatencySeconds) - reg.MustRegister(m.AutostartErrorsTotal) - return m -} - -func (m *Metrics) RecordCompletion(elapsed time.Duration, username string, workspace string) { - m.AutostartTotalLatencySeconds.WithLabelValues(username, workspace).Observe(elapsed.Seconds()) -} - -func (m *Metrics) RecordJobCreation(elapsed time.Duration, username string, workspace string) { - m.AutostartJobCreationLatencySeconds.WithLabelValues(username, workspace).Observe(elapsed.Seconds()) -} - -func (m *Metrics) RecordJobAcquired(elapsed time.Duration, username string, workspace string) { - m.AutostartJobAcquiredLatencySeconds.WithLabelValues(username, workspace).Observe(elapsed.Seconds()) -} - -func (m *Metrics) AddError(username string, action string) { - m.AutostartErrorsTotal.WithLabelValues(username, action).Inc() -} diff --git a/scaletest/autostart/output.go b/scaletest/autostart/output.go new file mode 100644 index 0000000000..bcad5266f7 --- /dev/null +++ b/scaletest/autostart/output.go @@ -0,0 +1,225 @@ +package autostart + +import ( + "encoding/json" + "fmt" + "io" + "sort" + "time" + + "golang.org/x/xerrors" + + "github.com/coder/coder/v2/scaletest/harness" +) + +// RunResults contains the aggregated metrics from all autostart test runs. +type RunResults struct { + TotalRuns int + SuccessfulRuns int + FailedRuns int + + // Individual run results. + Runs []RunResult + + // Aggregate latency statistics (end-to-end). + EndToEndLatencyP50 time.Duration + EndToEndLatencyP95 time.Duration + EndToEndLatencyP99 time.Duration + + // Aggregate latency statistics (trigger to completion). + TriggerToCompletionP50 time.Duration + TriggerToCompletionP95 time.Duration + TriggerToCompletionP99 time.Duration +} + +// NewRunResults creates a RunResults from a slice of RunResult. +func NewRunResults(runs []RunResult) RunResults { + results := RunResults{ + TotalRuns: len(runs), + Runs: runs, + } + + var ( + endToEndLatencies []time.Duration + triggerToCompletionLatencies []time.Duration + ) + + for _, run := range runs { + if run.Success { + results.SuccessfulRuns++ + endToEndLatencies = append(endToEndLatencies, run.EndToEndLatency()) + triggerToCompletionLatencies = append(triggerToCompletionLatencies, run.TriggerToCompletionLatency()) + } else { + results.FailedRuns++ + } + } + + // Calculate percentiles for end-to-end latency. + if len(endToEndLatencies) > 0 { + sort.Slice(endToEndLatencies, func(i, j int) bool { + return endToEndLatencies[i] < endToEndLatencies[j] + }) + results.EndToEndLatencyP50 = percentile(endToEndLatencies, 0.50) + results.EndToEndLatencyP95 = percentile(endToEndLatencies, 0.95) + results.EndToEndLatencyP99 = percentile(endToEndLatencies, 0.99) + } + + // Calculate percentiles for trigger to completion latency. + if len(triggerToCompletionLatencies) > 0 { + sort.Slice(triggerToCompletionLatencies, func(i, j int) bool { + return triggerToCompletionLatencies[i] < triggerToCompletionLatencies[j] + }) + results.TriggerToCompletionP50 = percentile(triggerToCompletionLatencies, 0.50) + results.TriggerToCompletionP95 = percentile(triggerToCompletionLatencies, 0.95) + results.TriggerToCompletionP99 = percentile(triggerToCompletionLatencies, 0.99) + } + + return results +} + +// percentile calculates the percentile value from a sorted slice of durations. +func percentile(sorted []time.Duration, p float64) time.Duration { + if len(sorted) == 0 { + return 0 + } + index := int(float64(len(sorted)-1) * p) + if index < 0 { + index = 0 + } + if index >= len(sorted) { + index = len(sorted) - 1 + } + return sorted[index] +} + +// PrintText writes the results in a human-readable text format. +func (r RunResults) PrintText(w io.Writer) { + _, _ = fmt.Fprintf(w, "Autostart Scale Test Results\n") + _, _ = fmt.Fprintf(w, "=============================\n\n") + + _, _ = fmt.Fprintf(w, "Total Runs: %d\n", r.TotalRuns) + _, _ = fmt.Fprintf(w, "Successful: %d\n", r.SuccessfulRuns) + _, _ = fmt.Fprintf(w, "Failed: %d\n\n", r.FailedRuns) + + if r.SuccessfulRuns > 0 { + _, _ = fmt.Fprintf(w, "End-to-End Latency (Config → Completion)\n") + _, _ = fmt.Fprintf(w, "-----------------------------------------\n") + _, _ = fmt.Fprintf(w, "P50: %v\n", r.EndToEndLatencyP50.Round(time.Millisecond)) + _, _ = fmt.Fprintf(w, "P95: %v\n", r.EndToEndLatencyP95.Round(time.Millisecond)) + _, _ = fmt.Fprintf(w, "P99: %v\n\n", r.EndToEndLatencyP99.Round(time.Millisecond)) + + _, _ = fmt.Fprintf(w, "Trigger to Completion Latency (Scheduled Time → Completion)\n") + _, _ = fmt.Fprintf(w, "------------------------------------------------------------\n") + _, _ = fmt.Fprintf(w, "P50: %v\n", r.TriggerToCompletionP50.Round(time.Millisecond)) + _, _ = fmt.Fprintf(w, "P95: %v\n", r.TriggerToCompletionP95.Round(time.Millisecond)) + _, _ = fmt.Fprintf(w, "P99: %v\n\n", r.TriggerToCompletionP99.Round(time.Millisecond)) + } + + if r.FailedRuns > 0 { + _, _ = fmt.Fprintf(w, "Failed Runs\n") + _, _ = fmt.Fprintf(w, "-----------\n") + for _, run := range r.Runs { + if !run.Success { + _, _ = fmt.Fprintf(w, "- %s (%s): %s\n", run.WorkspaceName, run.WorkspaceID, run.Error) + } + } + } +} + +// MarshalJSON implements json.Marshaler to provide custom JSON output. +func (r RunResults) MarshalJSON() ([]byte, error) { + // Convert durations to milliseconds for JSON output. + type jsonResults struct { + TotalRuns int `json:"total_runs"` + SuccessfulRuns int `json:"successful_runs"` + FailedRuns int `json:"failed_runs"` + + EndToEndLatencyP50MS int64 `json:"end_to_end_latency_p50_ms"` + EndToEndLatencyP95MS int64 `json:"end_to_end_latency_p95_ms"` + EndToEndLatencyP99MS int64 `json:"end_to_end_latency_p99_ms"` + + TriggerToCompletionP50MS int64 `json:"trigger_to_completion_p50_ms"` + TriggerToCompletionP95MS int64 `json:"trigger_to_completion_p95_ms"` + TriggerToCompletionP99MS int64 `json:"trigger_to_completion_p99_ms"` + + Runs []struct { + WorkspaceID string `json:"workspace_id"` + WorkspaceName string `json:"workspace_name"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` + + EndToEndLatencyMS int64 `json:"end_to_end_latency_ms"` + TriggerToCompletionMS int64 `json:"trigger_to_completion_ms"` + } `json:"runs"` + } + + jr := jsonResults{ + TotalRuns: r.TotalRuns, + SuccessfulRuns: r.SuccessfulRuns, + FailedRuns: r.FailedRuns, + + EndToEndLatencyP50MS: r.EndToEndLatencyP50.Milliseconds(), + EndToEndLatencyP95MS: r.EndToEndLatencyP95.Milliseconds(), + EndToEndLatencyP99MS: r.EndToEndLatencyP99.Milliseconds(), + + TriggerToCompletionP50MS: r.TriggerToCompletionP50.Milliseconds(), + TriggerToCompletionP95MS: r.TriggerToCompletionP95.Milliseconds(), + TriggerToCompletionP99MS: r.TriggerToCompletionP99.Milliseconds(), + } + + for _, run := range r.Runs { + jr.Runs = append(jr.Runs, struct { + WorkspaceID string `json:"workspace_id"` + WorkspaceName string `json:"workspace_name"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` + + EndToEndLatencyMS int64 `json:"end_to_end_latency_ms"` + TriggerToCompletionMS int64 `json:"trigger_to_completion_ms"` + }{ + WorkspaceID: run.WorkspaceID.String(), + WorkspaceName: run.WorkspaceName, + Success: run.Success, + Error: run.Error, + + EndToEndLatencyMS: run.EndToEndLatency().Milliseconds(), + TriggerToCompletionMS: run.TriggerToCompletionLatency().Milliseconds(), + }) + } + + return json.Marshal(jr) +} + +// ToHarnessResults converts autostart-specific results into the standard +// harness.Results format for use with existing output functions. +func (r RunResults) ToHarnessResults() harness.Results { + harnessRuns := make(map[string]harness.RunResult) + + for i, run := range r.Runs { + id := fmt.Sprintf("%d", i) + var err error + if !run.Success { + err = xerrors.New(run.Error) + } + + harnessRuns[id] = harness.RunResult{ + FullID: fmt.Sprintf("autostart/%s", run.WorkspaceName), + TestName: "autostart", + ID: id, + Error: err, + Metrics: map[string]any{ + "end_to_end_latency_seconds": run.EndToEndLatency().Seconds(), + "trigger_to_completion_seconds": run.TriggerToCompletionLatency().Seconds(), + "workspace_id": run.WorkspaceID.String(), + "workspace_name": run.WorkspaceName, + }, + } + } + + return harness.Results{ + TotalRuns: r.TotalRuns, + TotalPass: r.SuccessfulRuns, + TotalFail: r.FailedRuns, + Runs: harnessRuns, + } +} diff --git a/scaletest/autostart/output_test.go b/scaletest/autostart/output_test.go new file mode 100644 index 0000000000..b252faea9f --- /dev/null +++ b/scaletest/autostart/output_test.go @@ -0,0 +1,95 @@ +package autostart_test + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/scaletest/autostart" +) + +func TestRunResult(t *testing.T) { + t.Parallel() + + configTime := time.Now().UTC() + scheduledTime := configTime.Add(2 * time.Minute) + completionTime := scheduledTime.Add(30 * time.Second) + + result := autostart.RunResult{ + WorkspaceID: uuid.New(), + WorkspaceName: "test-workspace", + ConfigTime: configTime, + ScheduledTime: scheduledTime, + CompletionTime: completionTime, + Success: true, + } + + // Test end-to-end latency. + endToEnd := result.EndToEndLatency() + expectedEndToEnd := 2*time.Minute + 30*time.Second + require.Equal(t, expectedEndToEnd, endToEnd) + + // Test trigger to completion latency. + triggerToCompletion := result.TriggerToCompletionLatency() + expectedTriggerToCompletion := 30 * time.Second + require.Equal(t, expectedTriggerToCompletion, triggerToCompletion) +} + +func TestRunResults(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + runs := []autostart.RunResult{ + { + WorkspaceID: uuid.New(), + WorkspaceName: "workspace-1", + ConfigTime: now, + ScheduledTime: now.Add(1 * time.Minute), + CompletionTime: now.Add(1*time.Minute + 10*time.Second), + Success: true, + }, + { + WorkspaceID: uuid.New(), + WorkspaceName: "workspace-2", + ConfigTime: now, + ScheduledTime: now.Add(1 * time.Minute), + CompletionTime: now.Add(1*time.Minute + 20*time.Second), + Success: true, + }, + { + WorkspaceID: uuid.New(), + WorkspaceName: "workspace-3", + ConfigTime: now, + ScheduledTime: now.Add(1 * time.Minute), + CompletionTime: now.Add(1*time.Minute + 30*time.Second), + Success: true, + }, + { + WorkspaceID: uuid.New(), + WorkspaceName: "workspace-4", + Success: false, + Error: "build failed", + }, + } + + results := autostart.NewRunResults(runs) + + require.Equal(t, 4, results.TotalRuns) + require.Equal(t, 3, results.SuccessfulRuns) + require.Equal(t, 1, results.FailedRuns) + + // Verify percentiles are calculated correctly. + // P50 should be the middle value (20s). + require.Equal(t, 20*time.Second, results.TriggerToCompletionP50) + // With 3 values, P95 is at index int((3-1)*0.95) = 1, which is 20s. + require.Equal(t, 20*time.Second, results.TriggerToCompletionP95) + // P99 is also at index int((3-1)*0.99) = 1, which is 20s. + require.Equal(t, 20*time.Second, results.TriggerToCompletionP99) + + // End-to-end latencies should include the 1 minute delay. + require.Equal(t, 1*time.Minute+20*time.Second, results.EndToEndLatencyP50) + require.Equal(t, 1*time.Minute+20*time.Second, results.EndToEndLatencyP95) + require.Equal(t, 1*time.Minute+20*time.Second, results.EndToEndLatencyP99) +} diff --git a/scaletest/autostart/result.go b/scaletest/autostart/result.go new file mode 100644 index 0000000000..b0a7d2d664 --- /dev/null +++ b/scaletest/autostart/result.go @@ -0,0 +1,47 @@ +package autostart + +import ( + "time" + + "github.com/google/uuid" +) + +// RunResult captures timing and outcome information for a single autostart +// test run. +type RunResult struct { + // WorkspaceID is the ID of the workspace that was tested. + WorkspaceID uuid.UUID + // WorkspaceName is the name of the workspace that was tested. + WorkspaceName string + + // ConfigTime is when UpdateWorkspaceAutostart was called to set the + // autostart schedule. + ConfigTime time.Time + // ScheduledTime is the time the workspace was scheduled to autostart. + ScheduledTime time.Time + // CompletionTime is when the autostart build completed successfully. + CompletionTime time.Time + + // Success indicates whether the autostart build completed successfully. + Success bool + // Error contains the error message if Success is false. + Error string +} + +// EndToEndLatency returns the total time from setting the autostart config +// to the autostart build completing. +func (r RunResult) EndToEndLatency() time.Duration { + if r.ConfigTime.IsZero() || r.CompletionTime.IsZero() { + return 0 + } + return r.CompletionTime.Sub(r.ConfigTime) +} + +// TriggerToCompletionLatency returns the time from the scheduled autostart +// time to completion. This includes queueing time plus build execution time. +func (r RunResult) TriggerToCompletionLatency() time.Duration { + if r.ScheduledTime.IsZero() || r.CompletionTime.IsZero() { + return 0 + } + return r.CompletionTime.Sub(r.ScheduledTime) +} diff --git a/scaletest/autostart/run.go b/scaletest/autostart/run.go index 8b851eeeba..755280f4f3 100644 --- a/scaletest/autostart/run.go +++ b/scaletest/autostart/run.go @@ -24,10 +24,6 @@ type Runner struct { createUserRunner *createusers.Runner workspacebuildRunner *workspacebuild.Runner - - autostartTotalLatency time.Duration - autostartJobCreationLatency time.Duration - autostartJobAcquiredLatency time.Duration } func NewRunner(client *codersdk.Client, cfg Config) *Runner { @@ -38,15 +34,21 @@ func NewRunner(client *codersdk.Client, cfg Config) *Runner { } var ( - _ harness.Runnable = &Runner{} - _ harness.Cleanable = &Runner{} - _ harness.Collectable = &Runner{} + _ harness.Runnable = &Runner{} + _ harness.Cleanable = &Runner{} ) func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { + _, err := r.RunReturningResult(ctx, id, logs) + return err +} + +func (r *Runner) RunReturningResult(ctx context.Context, id string, logs io.Writer) (RunResult, error) { ctx, span := tracing.StartSpan(ctx) defer span.End() + result := RunResult{} + reachedBarrier := false defer func() { if !reachedBarrier { @@ -62,8 +64,7 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { r.createUserRunner = createusers.NewRunner(r.client, r.cfg.User) newUserAndToken, err := r.createUserRunner.RunReturningUser(ctx, id, logs) if err != nil { - r.cfg.Metrics.AddError("", "create_user") - return xerrors.Errorf("create user: %w", err) + return result, xerrors.Errorf("create user: %w", err) } newUser := newUserAndToken.User @@ -78,36 +79,31 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { workspaceBuildConfig := r.cfg.Workspace workspaceBuildConfig.OrganizationID = r.cfg.User.OrganizationID workspaceBuildConfig.UserID = newUser.ID.String() - // We'll wait for the build ourselves to avoid multiple API requests + // We'll wait for the build ourselves to avoid multiple API requests. workspaceBuildConfig.NoWaitForBuild = true workspaceBuildConfig.NoWaitForAgents = true r.workspacebuildRunner = workspacebuild.NewRunner(newUserClient, workspaceBuildConfig) workspace, err := r.workspacebuildRunner.RunReturningWorkspace(ctx, id, logs) if err != nil { - r.cfg.Metrics.AddError(newUser.Username, "create_workspace") - return xerrors.Errorf("create workspace: %w", err) + return result, xerrors.Errorf("create workspace: %w", err) } - watchCtx, cancel := context.WithCancel(ctx) + result.WorkspaceID = workspace.ID + result.WorkspaceName = workspace.Name + + buildUpdates := r.cfg.BuildUpdates + + createWorkspaceCtx, cancel := context.WithTimeout(ctx, r.cfg.WorkspaceJobTimeout) defer cancel() - workspaceUpdates, err := newUserClient.WatchWorkspace(watchCtx, workspace.ID) + + logger.Info(ctx, "waiting for initial workspace build", slog.F("workspace_name", workspace.Name), slog.F("workspace_id", workspace.ID.String())) + err = waitForBuild(createWorkspaceCtx, logger, buildUpdates, codersdk.WorkspaceTransitionStart) if err != nil { - r.cfg.Metrics.AddError(newUser.Username, "watch_workspace") - return xerrors.Errorf("watch workspace: %w", err) + return result, xerrors.Errorf("wait for initial workspace build (workspace=%s, id=%s): %w", workspace.Name, workspace.ID, err) } - createWorkspaceCtx, cancel2 := context.WithTimeout(ctx, r.cfg.WorkspaceJobTimeout) - defer cancel2() - - err = waitForWorkspaceUpdate(createWorkspaceCtx, logger, workspaceUpdates, func(ws codersdk.Workspace) bool { - return ws.LatestBuild.Transition == codersdk.WorkspaceTransitionStart && - ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded - }) - if err != nil { - r.cfg.Metrics.AddError(newUser.Username, "wait_for_initial_build") - return xerrors.Errorf("timeout waiting for initial workspace build to complete: %w", err) - } + logger.Info(ctx, "workspace started successfully", slog.F("workspace_name", workspace.Name)) logger.Info(ctx, "stopping workspace", slog.F("workspace_name", workspace.Name)) @@ -115,20 +111,15 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { Transition: codersdk.WorkspaceTransitionStop, }) if err != nil { - r.cfg.Metrics.AddError(newUser.Username, "create_stop_build") - return xerrors.Errorf("create stop build: %w", err) + return result, xerrors.Errorf("create stop build: %w", err) } - stopBuildCtx, cancel3 := context.WithTimeout(ctx, r.cfg.WorkspaceJobTimeout) - defer cancel3() + stopBuildCtx, cancel := context.WithTimeout(ctx, r.cfg.WorkspaceJobTimeout) + defer cancel() - err = waitForWorkspaceUpdate(stopBuildCtx, logger, workspaceUpdates, func(ws codersdk.Workspace) bool { - return ws.LatestBuild.Transition == codersdk.WorkspaceTransitionStop && - ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded - }) + err = waitForBuild(stopBuildCtx, logger, buildUpdates, codersdk.WorkspaceTransitionStop) if err != nil { - r.cfg.Metrics.AddError(newUser.Username, "wait_for_stop_build") - return xerrors.Errorf("timeout waiting for stop build to complete: %w", err) + return result, xerrors.Errorf("wait for stop build: %w", err) } logger.Info(ctx, "workspace stopped successfully", slog.F("workspace_name", workspace.Name)) @@ -139,75 +130,101 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { r.cfg.SetupBarrier.Wait() logger.Info(ctx, "all runners reached barrier, proceeding with autostart schedule") + // Schedule the workspace to autostart. testStartTime := time.Now().UTC() autostartTime := testStartTime.Add(r.cfg.AutostartDelay).Round(time.Minute) schedule := fmt.Sprintf("CRON_TZ=UTC %d %d * * *", autostartTime.Minute(), autostartTime.Hour()) logger.Info(ctx, "setting autostart schedule for workspace", slog.F("workspace_name", workspace.Name), slog.F("schedule", schedule)) + // Record the time we set the autostart configuration. + result.ConfigTime = time.Now().UTC() + result.ScheduledTime = autostartTime + err = newUserClient.UpdateWorkspaceAutostart(ctx, workspace.ID, codersdk.UpdateWorkspaceAutostartRequest{ Schedule: &schedule, }) if err != nil { - r.cfg.Metrics.AddError(newUser.Username, "update_workspace_autostart") - return xerrors.Errorf("update workspace autostart: %w", err) + return result, xerrors.Errorf("update workspace autostart: %w", err) } - logger.Info(ctx, "waiting for workspace to autostart", slog.F("workspace_name", workspace.Name)) + logger.Info(ctx, "autostart schedule configured successfully", + slog.F("workspace_name", workspace.Name), + slog.F("schedule", schedule), + slog.F("autostart_time", autostartTime), + slog.F("time_until_autostart", time.Until(autostartTime).Round(time.Second))) - autostartInitiateCtx, cancel4 := context.WithDeadline(ctx, autostartTime.Add(r.cfg.AutostartDelay)) - defer cancel4() + // Wait for the autostart build to complete. The build won't start until + // the scheduled time, so we use AutostartBuildTimeout which should account + // for: time until scheduled start + queueing time + build execution time. + autostartBuildCtx, cancel := context.WithTimeout(ctx, r.cfg.AutostartBuildTimeout) + defer cancel() - logger.Info(ctx, "listening for workspace updates to detect autostart build") + logger.Info(ctx, "waiting for autostart build to trigger and complete", + slog.F("workspace_name", workspace.Name), + slog.F("timeout", r.cfg.AutostartBuildTimeout)) - err = waitForWorkspaceUpdate(autostartInitiateCtx, logger, workspaceUpdates, func(ws codersdk.Workspace) bool { - if ws.LatestBuild.Transition != codersdk.WorkspaceTransitionStart { - return false - } - - // The job has been created, but it might be pending - if r.autostartJobCreationLatency == 0 { - r.autostartJobCreationLatency = time.Since(autostartTime) - r.cfg.Metrics.RecordJobCreation(r.autostartJobCreationLatency, newUser.Username, workspace.Name) - } - - if ws.LatestBuild.Job.Status == codersdk.ProvisionerJobRunning || - ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded { - // Job is no longer pending, but it might not have finished - if r.autostartJobAcquiredLatency == 0 { - r.autostartJobAcquiredLatency = time.Since(autostartTime) - r.cfg.Metrics.RecordJobAcquired(r.autostartJobAcquiredLatency, newUser.Username, workspace.Name) - } - return ws.LatestBuild.Job.Status == codersdk.ProvisionerJobSucceeded - } - - return false - }) + err = waitForBuild(autostartBuildCtx, logger, buildUpdates, codersdk.WorkspaceTransitionStart) if err != nil { - r.cfg.Metrics.AddError(newUser.Username, "wait_for_autostart_build") - return xerrors.Errorf("timeout waiting for autostart build to be created: %w", err) + result.Success = false + result.Error = err.Error() + if r.cfg.ResultSink != nil { + select { + case r.cfg.ResultSink <- result: + default: + } + } + return result, xerrors.Errorf("wait for autostart build: %w", err) } - r.autostartTotalLatency = time.Since(autostartTime) + // Record the completion time. + result.CompletionTime = time.Now().UTC() + result.Success = true - logger.Info(ctx, "autostart workspace build complete", slog.F("duration", r.autostartTotalLatency)) - r.cfg.Metrics.RecordCompletion(r.autostartTotalLatency, newUser.Username, workspace.Name) + logger.Info(ctx, "autostart build completed successfully", slog.F("workspace_name", workspace.Name)) - return nil + if r.cfg.ResultSink != nil { + select { + case r.cfg.ResultSink <- result: + default: + // Non-blocking send - if the channel is full, skip it. + } + } + + return result, nil } -func waitForWorkspaceUpdate(ctx context.Context, logger slog.Logger, updates <-chan codersdk.Workspace, shouldBreak func(codersdk.Workspace) bool) error { +// waitForBuild waits for a build with the given transition to reach a +// terminal state. It returns nil on success, or an error if the build +// fails, is canceled, or the context expires. If an unexpected transition +// is received, it returns an error immediately. +func waitForBuild(ctx context.Context, logger slog.Logger, updates <-chan codersdk.WorkspaceBuildUpdate, transition codersdk.WorkspaceTransition) error { for { select { case <-ctx.Done(): return ctx.Err() - case updatedWorkspace, ok := <-updates: + case update, ok := <-updates: if !ok { - return xerrors.New("workspace updates channel closed") + return xerrors.New("build updates channel closed") } - logger.Debug(ctx, "received workspace update", slog.F("update", updatedWorkspace)) - if shouldBreak(updatedWorkspace) { + logger.Debug(ctx, "received build update", + slog.F("transition", update.Transition), + slog.F("job_status", update.JobStatus), + slog.F("build_number", update.BuildNumber)) + + if update.Transition != string(transition) { + return xerrors.Errorf("unexpected transition: expected %s, got %s (build_number=%d)", transition, update.Transition, update.BuildNumber) + } + switch codersdk.ProvisionerJobStatus(update.JobStatus) { + case codersdk.ProvisionerJobSucceeded: return nil + case codersdk.ProvisionerJobFailed: + return xerrors.Errorf("workspace build failed (transition=%s, build_number=%d)", update.Transition, update.BuildNumber) + case codersdk.ProvisionerJobCanceled: + return xerrors.Errorf("workspace build canceled (transition=%s, build_number=%d)", update.Transition, update.BuildNumber) + default: + // Intermediate states (pending, running, canceling) + // are expected; keep waiting. } } } @@ -230,17 +247,3 @@ func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error { return nil } - -const ( - AutostartTotalLatencyMetric = "autostart_total_latency_seconds" - AutostartJobCreationLatencyMetric = "autostart_job_creation_latency_seconds" - AutostartJobAcquiredLatencyMetric = "autostart_job_acquired_latency_seconds" -) - -func (r *Runner) GetMetrics() map[string]any { - return map[string]any{ - AutostartTotalLatencyMetric: r.autostartTotalLatency.Seconds(), - AutostartJobCreationLatencyMetric: r.autostartJobCreationLatency.Seconds(), - AutostartJobAcquiredLatencyMetric: r.autostartJobAcquiredLatency.Seconds(), - } -} diff --git a/scaletest/autostart/run_test.go b/scaletest/autostart/run_test.go index 6fb23b47c9..0f630d8985 100644 --- a/scaletest/autostart/run_test.go +++ b/scaletest/autostart/run_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -18,6 +17,7 @@ import ( "github.com/coder/coder/v2/provisionersdk/proto" "github.com/coder/coder/v2/scaletest/autostart" "github.com/coder/coder/v2/scaletest/createusers" + "github.com/coder/coder/v2/scaletest/loadtestutil" "github.com/coder/coder/v2/scaletest/workspacebuild" "github.com/coder/coder/v2/testutil" ) @@ -28,7 +28,8 @@ func TestRun(t *testing.T) { autoStartDelay := 2 * time.Minute // Faking a workspace autostart schedule start time at the coderd level - // is difficult and error-prone. + // is difficult and error-prone. This test verifies the setup phase only + // (creating workspaces, stopping them, and configuring autostart schedules). t.Skip("This test takes several minutes to run, and is intended as a manual regression test") ctx := testutil.Context(t, time.Minute*3) @@ -36,6 +37,9 @@ func TestRun(t *testing.T) { client := coderdtest.New(t, &coderdtest.Options{ IncludeProvisionerDaemon: true, AutobuildTicker: time.NewTicker(time.Second * 1).C, + DeploymentValues: coderdtest.DeploymentValues(t, func(dv *codersdk.DeploymentValues) { + dv.Experiments = []string{string(codersdk.ExperimentWorkspaceBuildUpdates)} + }), }) user := coderdtest.CreateFirstUser(t, client) @@ -74,12 +78,42 @@ func TestRun(t *testing.T) { barrier := new(sync.WaitGroup) barrier.Add(numUsers) - metrics := autostart.NewMetrics(prometheus.NewRegistry()) + + // Pre-create channels for each workspace keyed by deterministic name. + workspaceChannels := make(map[string]chan codersdk.WorkspaceBuildUpdate) + for i := range numUsers { + id := strconv.Itoa(i) + workspaceName := loadtestutil.GenerateDeterministicWorkspaceName(id) + workspaceChannels[workspaceName] = make(chan codersdk.WorkspaceBuildUpdate, 16) + } + + // Start watching all workspace builds. + decoder, err := client.WatchAllWorkspaceBuilds(ctx) + require.NoError(t, err) + defer decoder.Close() + + // Start the dispatcher goroutine. + go func() { + for update := range decoder.Chan() { + if ch, ok := workspaceChannels[update.WorkspaceName]; ok { + select { + case ch <- update: + case <-ctx.Done(): + return + } + } + } + for _, ch := range workspaceChannels { + close(ch) + } + }() eg, runCtx := errgroup.WithContext(ctx) runners := make([]*autostart.Runner, 0, numUsers) for i := range numUsers { + id := strconv.Itoa(i) + workspaceName := loadtestutil.GenerateDeterministicWorkspaceName(id) cfg := autostart.Config{ User: createusers.Config{ OrganizationID: user.OrganizationID, @@ -88,14 +122,14 @@ func TestRun(t *testing.T) { OrganizationID: user.OrganizationID, Request: codersdk.CreateWorkspaceRequest{ TemplateID: template.ID, + Name: workspaceName, }, NoWaitForAgents: true, }, WorkspaceJobTimeout: testutil.WaitMedium, AutostartDelay: autoStartDelay, - AutostartTimeout: testutil.WaitShort, - Metrics: metrics, SetupBarrier: barrier, + BuildUpdates: workspaceChannels[workspaceName], } err := cfg.Validate() require.NoError(t, err) @@ -107,7 +141,7 @@ func TestRun(t *testing.T) { }) } - err := eg.Wait() + err = eg.Wait() require.NoError(t, err) users, err := client.Users(ctx, codersdk.UsersRequest{}) @@ -118,10 +152,11 @@ func TestRun(t *testing.T) { require.NoError(t, err) require.Len(t, workspaces.Workspaces, numUsers) // one workspace per user - // Verify that workspaces have autostart schedules set and are running + // Verify that workspaces have autostart schedules set and are stopped + // (the test exits after configuring autostart, before it triggers). for _, workspace := range workspaces.Workspaces { require.NotNil(t, workspace.AutostartSchedule) - require.Equal(t, codersdk.WorkspaceTransitionStart, workspace.LatestBuild.Transition) + require.Equal(t, codersdk.WorkspaceTransitionStop, workspace.LatestBuild.Transition) require.Equal(t, codersdk.ProvisionerJobSucceeded, workspace.LatestBuild.Job.Status) } @@ -141,18 +176,4 @@ func TestRun(t *testing.T) { users, err = client.Users(ctx, codersdk.UsersRequest{}) require.NoError(t, err) require.Len(t, users.Users, 1) // owner - - for _, runner := range runners { - metrics := runner.GetMetrics() - require.Contains(t, metrics, autostart.AutostartTotalLatencyMetric) - latency, ok := metrics[autostart.AutostartTotalLatencyMetric].(float64) - require.True(t, ok) - jobCreationLatency, ok := metrics[autostart.AutostartJobCreationLatencyMetric].(float64) - require.True(t, ok) - jobAcquiredLatency, ok := metrics[autostart.AutostartJobAcquiredLatencyMetric].(float64) - require.True(t, ok) - require.Greater(t, latency, float64(0)) - require.Greater(t, jobCreationLatency, float64(0)) - require.Greater(t, jobAcquiredLatency, float64(0)) - } } diff --git a/scaletest/loadtestutil/names.go b/scaletest/loadtestutil/names.go index f29ded1578..68d528b156 100644 --- a/scaletest/loadtestutil/names.go +++ b/scaletest/loadtestutil/names.go @@ -42,6 +42,15 @@ func GenerateWorkspaceName(id string) (name string, err error) { return fmt.Sprintf("%s-%s-%s", ScaleTestPrefix, randStr, id), nil } +// GenerateDeterministicWorkspaceName generates a deterministic workspace name +// for scale testing without a random component. This is useful when the +// workspace name needs to be known before the workspace is created, such as +// for pre-creating channels keyed by workspace name. +// The workspace name follows the pattern: scaletest- +func GenerateDeterministicWorkspaceName(id string) string { + return fmt.Sprintf("%s-%s", ScaleTestPrefix, id) +} + // IsScaleTestUser checks if a username indicates it was created for scale testing. func IsScaleTestUser(username, email string) bool { return strings.HasPrefix(username, ScaleTestPrefix+"-") || diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index df3f16c74b..c2163368de 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -2752,6 +2752,7 @@ export type Experiment = | "notifications" | "oauth2" | "web-push" + | "workspace-build-updates" | "workspace-usage"; export const Experiments: Experiment[] = [ @@ -2762,6 +2763,7 @@ export const Experiments: Experiment[] = [ "notifications", "oauth2", "web-push", + "workspace-build-updates", "workspace-usage", ]; @@ -7744,6 +7746,28 @@ export interface WorkspaceBuildTimings { readonly agent_connection_timings: readonly AgentConnectionTiming[]; } +// From codersdk/workspaces.go +/** + * WorkspaceBuildUpdate contains information about a workspace build state change. + * This is published via the /watch-all-workspacebuilds SSE endpoint when the + * workspace-build-updates experiment is enabled. + */ +export interface WorkspaceBuildUpdate { + readonly workspace_id: string; + readonly workspace_name: string; + readonly build_id: string; + /** + * Transition is the workspace transition type: "start", "stop", or "delete". + */ + readonly transition: string; + /** + * JobStatus is the provisioner job status: "pending", "running", + * "succeeded", "canceling", "canceled", or "failed". + */ + readonly job_status: string; + readonly build_number: number; +} + // From codersdk/workspaces.go export interface WorkspaceBuildsRequest extends Pagination { readonly since?: string;