fix: wait for build in task status load generator (#20800)

Wait for the External workspace build job to complete before attempting to pull its credentials from Coder. This resolves a race in the load generator.
This commit is contained in:
Spike Curtis
2025-11-19 10:35:31 +04:00
committed by GitHub
parent 0bbb7dd0a3
commit 8ee6e9457e
3 changed files with 283 additions and 61 deletions
+21 -40
View File
@@ -11,14 +11,9 @@ import (
"cdr.dev/slog"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/quartz"
)
// createExternalWorkspaceResult contains the results from creating an external workspace.
type createExternalWorkspaceResult struct {
WorkspaceID uuid.UUID
AgentToken string
}
// client abstracts the details of using codersdk.Client for workspace operations.
// This interface allows for easier testing by enabling mock implementations and
// provides a cleaner separation of concerns.
@@ -27,9 +22,14 @@ type createExternalWorkspaceResult struct {
// 1. Create the client with newClient(coderClient)
// 2. Configure logging when the io.Writer is available in Run()
type client interface {
// createExternalWorkspace creates an external workspace and returns the workspace ID
// and agent token for the first external agent found in the workspace resources.
createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error)
// CreateUserWorkspace creates a workspace for a user.
CreateUserWorkspace(ctx context.Context, userID string, req codersdk.CreateWorkspaceRequest) (codersdk.Workspace, error)
// WorkspaceByOwnerAndName retrieves a workspace by owner and name.
WorkspaceByOwnerAndName(ctx context.Context, owner string, name string, params codersdk.WorkspaceOptions) (codersdk.Workspace, error)
// WorkspaceExternalAgentCredentials retrieves credentials for an external agent.
WorkspaceExternalAgentCredentials(ctx context.Context, workspaceID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error)
// watchWorkspace watches for updates to a workspace.
watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error)
@@ -56,48 +56,28 @@ type appStatusPatcher interface {
// codersdk.Client.
type sdkClient struct {
coderClient *codersdk.Client
clock quartz.Clock
logger slog.Logger
}
// newClient creates a new client implementation using the provided codersdk.Client.
func newClient(coderClient *codersdk.Client) client {
return &sdkClient{
coderClient: coderClient,
clock: quartz.NewReal(),
}
}
func (c *sdkClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) {
// Create the workspace
workspace, err := c.coderClient.CreateUserWorkspace(ctx, codersdk.Me, req)
if err != nil {
return createExternalWorkspaceResult{}, err
}
func (c *sdkClient) CreateUserWorkspace(ctx context.Context, userID string, req codersdk.CreateWorkspaceRequest) (codersdk.Workspace, error) {
return c.coderClient.CreateUserWorkspace(ctx, userID, req)
}
// Get the workspace with latest build details
workspace, err = c.coderClient.WorkspaceByOwnerAndName(ctx, codersdk.Me, workspace.Name, codersdk.WorkspaceOptions{})
if err != nil {
return createExternalWorkspaceResult{}, err
}
func (c *sdkClient) WorkspaceByOwnerAndName(ctx context.Context, owner string, name string, params codersdk.WorkspaceOptions) (codersdk.Workspace, error) {
return c.coderClient.WorkspaceByOwnerAndName(ctx, owner, name, params)
}
// Find external agents in resources
for _, resource := range workspace.LatestBuild.Resources {
if resource.Type != "coder_external_agent" || len(resource.Agents) == 0 {
continue
}
// Get credentials for the first agent
agent := resource.Agents[0]
credentials, err := c.coderClient.WorkspaceExternalAgentCredentials(ctx, workspace.ID, agent.Name)
if err != nil {
return createExternalWorkspaceResult{}, err
}
return createExternalWorkspaceResult{
WorkspaceID: workspace.ID,
AgentToken: credentials.AgentToken,
}, nil
}
return createExternalWorkspaceResult{}, xerrors.Errorf("no external agent found in workspace")
func (c *sdkClient) WorkspaceExternalAgentCredentials(ctx context.Context, workspaceID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error) {
return c.coderClient.WorkspaceExternalAgentCredentials(ctx, workspaceID, agentName)
}
func (c *sdkClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) {
@@ -118,6 +98,7 @@ func (c *sdkClient) deleteWorkspace(ctx context.Context, workspaceID uuid.UUID)
func (c *sdkClient) initialize(logger slog.Logger) {
// Configure the coder client logging
c.logger = logger
c.coderClient.SetLogger(logger)
c.coderClient.SetLogBodies(true)
}
+88 -7
View File
@@ -23,6 +23,12 @@ import (
const statusUpdatePrefix = "scaletest status update:"
// createExternalWorkspaceResult contains the results from creating an external workspace.
type createExternalWorkspaceResult struct {
workspaceID uuid.UUID
agentToken string
}
type Runner struct {
client client
patcher appStatusPatcher
@@ -65,6 +71,10 @@ func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error {
}
}()
// ensure these labels are initialized, so we see the time series right away in prometheus.
r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
logs = loadtestutil.NewSyncWriter(logs)
r.logger = slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name)
r.client.initialize(r.logger)
@@ -74,26 +84,23 @@ func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error {
slog.F("template_id", r.cfg.TemplateID),
slog.F("workspace_name", r.cfg.WorkspaceName))
result, err := r.client.createExternalWorkspace(ctx, codersdk.CreateWorkspaceRequest{
result, err := r.createExternalWorkspace(ctx, codersdk.CreateWorkspaceRequest{
TemplateID: r.cfg.TemplateID,
Name: r.cfg.WorkspaceName,
})
if err != nil {
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc()
return xerrors.Errorf("create external workspace: %w", err)
}
// Set the workspace ID
r.workspaceID = result.WorkspaceID
r.workspaceID = result.workspaceID
r.logger.Info(ctx, "created external workspace", slog.F("workspace_id", r.workspaceID))
// Initialize the patcher with the agent token
r.patcher.initialize(r.logger, result.AgentToken)
r.patcher.initialize(r.logger, result.agentToken)
r.logger.Info(ctx, "initialized app status patcher with agent token")
// ensure these labels are initialized, so we see the time series right away in prometheus.
r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx)
defer cancelWorkspaceUpdates()
workspaceUpdatesResult := make(chan error, 1)
@@ -257,3 +264,77 @@ func parseStatusMessage(message string) (int, bool) {
}
return msgNo, true
}
// createExternalWorkspace creates an external workspace and returns the workspace ID
// and agent token for the first external agent found in the workspace resources.
func (r *Runner) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) {
// Create the workspace
workspace, err := r.client.CreateUserWorkspace(ctx, codersdk.Me, req)
if err != nil {
return createExternalWorkspaceResult{}, err
}
r.logger.Info(ctx, "waiting for workspace build to complete",
slog.F("workspace_name", workspace.Name),
slog.F("workspace_id", workspace.ID))
// Poll the workspace until the build is complete
var finalWorkspace codersdk.Workspace
buildComplete := xerrors.New("build complete") // sentinel error
waiter := r.clock.TickerFunc(ctx, 30*time.Second, func() error {
// Get the workspace with latest build details
workspace, err := r.client.WorkspaceByOwnerAndName(ctx, codersdk.Me, workspace.Name, codersdk.WorkspaceOptions{})
if err != nil {
r.logger.Error(ctx, "failed to poll workspace while waiting for build to complete", slog.Error(err))
return nil
}
jobStatus := workspace.LatestBuild.Job.Status
r.logger.Debug(ctx, "checking workspace build status",
slog.F("status", jobStatus),
slog.F("build_id", workspace.LatestBuild.ID))
switch jobStatus {
case codersdk.ProvisionerJobSucceeded:
// Build succeeded
r.logger.Info(ctx, "workspace build succeeded")
finalWorkspace = workspace
return buildComplete
case codersdk.ProvisionerJobFailed:
return xerrors.Errorf("workspace build failed: %s", workspace.LatestBuild.Job.Error)
case codersdk.ProvisionerJobCanceled:
return xerrors.Errorf("workspace build was canceled")
case codersdk.ProvisionerJobPending, codersdk.ProvisionerJobRunning, codersdk.ProvisionerJobCanceling:
// Still in progress, continue polling
return nil
default:
return xerrors.Errorf("unexpected job status: %s", jobStatus)
}
}, "createExternalWorkspace")
err = waiter.Wait()
if err != nil && !xerrors.Is(err, buildComplete) {
return createExternalWorkspaceResult{}, xerrors.Errorf("wait for build completion: %w", err)
}
// Find external agents in resources
for _, resource := range finalWorkspace.LatestBuild.Resources {
if resource.Type != "coder_external_agent" || len(resource.Agents) == 0 {
continue
}
// Get credentials for the first agent
agent := resource.Agents[0]
credentials, err := r.client.WorkspaceExternalAgentCredentials(ctx, finalWorkspace.ID, agent.Name)
if err != nil {
return createExternalWorkspaceResult{}, err
}
return createExternalWorkspaceResult{
workspaceID: finalWorkspace.ID,
agentToken: credentials.AgentToken,
}, nil
}
return createExternalWorkspaceResult{}, xerrors.Errorf("no external agent found in workspace")
}
+174 -14
View File
@@ -28,13 +28,17 @@ type fakeClient struct {
logger slog.Logger
// Channels for controlling the behavior
workspaceUpdatesCh chan codersdk.Workspace
workspaceUpdatesCh chan codersdk.Workspace
workspaceByOwnerAndNameStatus chan codersdk.ProvisionerJobStatus
workspaceByOwnerAndNameErrors chan error
}
func newFakeClient(t *testing.T) *fakeClient {
return &fakeClient{
t: t,
workspaceUpdatesCh: make(chan codersdk.Workspace),
t: t,
workspaceUpdatesCh: make(chan codersdk.Workspace),
workspaceByOwnerAndNameStatus: make(chan codersdk.ProvisionerJobStatus),
workspaceByOwnerAndNameErrors: make(chan error, 1),
}
}
@@ -47,14 +51,62 @@ func (m *fakeClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID)
return m.workspaceUpdatesCh, nil
}
const testAgentToken = "test-agent-token"
const (
testAgentToken = "test-agent-token"
testAgentName = "test-agent"
testWorkspaceName = "test-workspace"
)
func (m *fakeClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) {
m.logger.Debug(ctx, "called fake CreateExternalWorkspace", slog.F("req", req))
// Return a fake workspace ID and token for testing
return createExternalWorkspaceResult{
WorkspaceID: uuid.UUID{1, 2, 3, 4}, // Fake workspace ID
AgentToken: testAgentToken,
var (
testWorkspaceID = uuid.UUID{1, 2, 3, 4}
testBuildID = uuid.UUID{5, 6, 7, 8}
)
func workspaceWithJobStatus(status codersdk.ProvisionerJobStatus) codersdk.Workspace {
return codersdk.Workspace{
ID: testWorkspaceID, // Fake workspace ID
Name: testWorkspaceName,
LatestBuild: codersdk.WorkspaceBuild{
ID: testBuildID,
Job: codersdk.ProvisionerJob{
Status: status,
},
Resources: []codersdk.WorkspaceResource{
{
Type: "coder_external_agent",
Agents: []codersdk.WorkspaceAgent{
{
Name: testAgentName,
},
},
},
},
},
}
}
func (m *fakeClient) CreateUserWorkspace(ctx context.Context, userID string, req codersdk.CreateWorkspaceRequest) (codersdk.Workspace, error) {
m.logger.Debug(ctx, "called fake CreateUserWorkspace", slog.F("user_id", userID), slog.F("req", req))
return workspaceWithJobStatus(codersdk.ProvisionerJobPending), nil
}
func (m *fakeClient) WorkspaceByOwnerAndName(ctx context.Context, owner string, name string, params codersdk.WorkspaceOptions) (codersdk.Workspace, error) {
m.logger.Debug(ctx, "called fake WorkspaceByOwnerAndName", slog.F("owner", owner), slog.F("name", name))
status := <-m.workspaceByOwnerAndNameStatus
var err error
select {
case err = <-m.workspaceByOwnerAndNameErrors:
return codersdk.Workspace{}, err
default:
return workspaceWithJobStatus(status), nil
}
}
func (m *fakeClient) WorkspaceExternalAgentCredentials(ctx context.Context, workspaceID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error) {
m.logger.Debug(ctx, "called fake WorkspaceExternalAgentCredentials", slog.F("workspace_id", workspaceID), slog.F("agent_name", agentName))
// Return fake credentials for testing
return codersdk.ExternalAgentCredentials{
AgentToken: testAgentToken,
}, nil
}
@@ -145,10 +197,12 @@ func TestRunner_Run(t *testing.T) {
reportTimes: make(map[int]time.Time),
}
tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
defer tickerTrap.Close()
reportTickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
defer reportTickerTrap.Close()
sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates")
defer sinceTrap.Close()
buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace")
defer buildTickerTrap.Close()
// Run the runner in a goroutine
runErr := make(chan error, 1)
@@ -156,6 +210,12 @@ func TestRunner_Run(t *testing.T) {
runErr <- runner.Run(ctx, "test-runner", testutil.NewTestLogWriter(t))
}()
// complete the build
buildTickerTrap.MustWait(ctx).MustRelease(ctx)
w := mClock.Advance(30 * time.Second)
testutil.RequireSend(ctx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobSucceeded)
w.MustWait(ctx)
// Wait for the runner to connect and watch workspace
connectedWaitGroup.Wait()
@@ -163,7 +223,7 @@ func TestRunner_Run(t *testing.T) {
close(startReporting)
// Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off.
tickerTrap.MustWait(ctx).MustRelease(ctx)
reportTickerTrap.MustWait(ctx).MustRelease(ctx)
// at this point, the patcher must be initialized
require.Equal(t, testAgentToken, fPatcher.agentToken)
@@ -263,6 +323,8 @@ func TestRunner_RunMissedUpdate(t *testing.T) {
defer tickerTrap.Close()
sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates")
defer sinceTrap.Close()
buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace")
defer buildTickerTrap.Close()
// Run the runner in a goroutine
runErr := make(chan error, 1)
@@ -270,6 +332,12 @@ func TestRunner_RunMissedUpdate(t *testing.T) {
runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t))
}()
// complete the build
buildTickerTrap.MustWait(testCtx).MustRelease(testCtx)
w := mClock.Advance(30 * time.Second)
testutil.RequireSend(testCtx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobSucceeded)
w.MustWait(testCtx)
// Wait for the runner to connect and watch workspace
connectedWaitGroup.Wait()
@@ -378,13 +446,20 @@ func TestRunner_Run_WithErrors(t *testing.T) {
tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
defer tickerTrap.Close()
buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace")
defer buildTickerTrap.Close()
// Run the runner in a goroutine
runErr := make(chan error, 1)
go func() {
runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t))
}()
// complete the build
buildTickerTrap.MustWait(testCtx).MustRelease(testCtx)
w := mClock.Advance(30 * time.Second)
testutil.RequireSend(testCtx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobSucceeded)
w.MustWait(testCtx)
connectedWaitGroup.Wait()
close(startReporting)
@@ -430,6 +505,91 @@ func TestRunner_Run_WithErrors(t *testing.T) {
assert.True(t, reportTaskStatusErrorsFound, "report task status errors metric not found")
}
func TestRunner_Run_BuildFailed(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
runCtx, cancel := context.WithCancel(testCtx)
defer cancel()
mClock := quartz.NewMock(t)
fClient := newFakeClient(t)
fPatcher := newFakeAppStatusPatcher(t)
templateID := uuid.UUID{5, 6, 7, 8}
workspaceName := "test-workspace"
appSlug := "test-app"
reg := prometheus.NewRegistry()
metrics := NewMetrics(reg, "test")
connectedWaitGroup := &sync.WaitGroup{}
connectedWaitGroup.Add(1)
startReporting := make(chan struct{})
cfg := Config{
TemplateID: templateID,
WorkspaceName: workspaceName,
AppSlug: appSlug,
ConnectedWaitGroup: connectedWaitGroup,
StartReporting: startReporting,
ReportStatusPeriod: 10 * time.Second,
ReportStatusDuration: 35 * time.Second,
Metrics: metrics,
MetricLabelValues: []string{"test"},
}
runner := &Runner{
client: fClient,
patcher: fPatcher,
cfg: cfg,
clock: mClock,
reportTimes: make(map[int]time.Time),
}
buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace")
defer buildTickerTrap.Close()
// Run the runner in a goroutine
runErr := make(chan error, 1)
go func() {
runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t))
}()
// complete the build
buildTickerTrap.MustWait(testCtx).MustRelease(testCtx)
w := mClock.Advance(30 * time.Second)
testutil.RequireSend(testCtx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobFailed)
w.MustWait(testCtx)
connectedWaitGroup.Wait()
// Wait for the runner to complete
err := testutil.RequireReceive(testCtx, t, runErr)
require.ErrorContains(t, err, "workspace build failed")
// Verify metrics were updated correctly
metricFamilies, err := reg.Gather()
require.NoError(t, err)
var missingUpdatesFound bool
var reportTaskStatusErrorsFound bool
for _, mf := range metricFamilies {
switch mf.GetName() {
case "coderd_scaletest_missing_status_updates_total":
missingUpdatesFound = true
require.Len(t, mf.GetMetric(), 1)
counter := mf.GetMetric()[0].GetCounter()
assert.Equal(t, float64(0), counter.GetValue())
case "coderd_scaletest_report_task_status_errors_total":
reportTaskStatusErrorsFound = true
require.Len(t, mf.GetMetric(), 1)
counter := mf.GetMetric()[0].GetCounter()
assert.Equal(t, float64(1), counter.GetValue())
}
}
assert.True(t, missingUpdatesFound, "missing updates metric not found")
assert.True(t, reportTaskStatusErrorsFound, "report task status errors metric not found")
}
func TestParseStatusMessage(t *testing.T) {
t.Parallel()