From fda181bb26e51f943c5aa7a3fca00a4349b153f0 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Wed, 4 Mar 2026 22:12:35 +0400 Subject: [PATCH] chore: modify task status scaletest to use Agent API dRPC (#22356) relates to #21335 Modifies our taskstatus scaletest load generator to use the dRPC connection to mimic what an actual running Task would do via the MCP server (c.f. PRs below this one in the stack). Disclosure: I used AI to generate large portions of this PR, but hand-reviewed and tweaked. --- scaletest/taskstatus/client.go | 77 ++++++++++++-------- scaletest/taskstatus/run.go | 28 +++++--- scaletest/taskstatus/run_internal_test.go | 85 ++++++++++++----------- 3 files changed, 111 insertions(+), 79 deletions(-) diff --git a/scaletest/taskstatus/client.go b/scaletest/taskstatus/client.go index 48e934e5dd..0ddc7b8627 100644 --- a/scaletest/taskstatus/client.go +++ b/scaletest/taskstatus/client.go @@ -9,6 +9,7 @@ import ( "golang.org/x/xerrors" "cdr.dev/slog/v3" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/quartz" @@ -41,15 +42,20 @@ type client interface { initialize(logger slog.Logger) } -// appStatusPatcher abstracts the details of using agentsdk.Client for updating app status. -// This interface is separate from client because it requires an agent token which is only -// available after creating an external workspace. -type appStatusPatcher interface { - // patchAppStatus updates the status of a workspace app. - patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error +// appStatusUpdater abstracts the details of updating app status via the +// Agent dRPC API. This interface is separate from client because it +// requires an agent token which is only available after creating an +// external workspace. +type appStatusUpdater interface { + // updateAppStatus sends a status update for a workspace app. + updateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) error - // initialize sets up the patcher with the provided logger and agent token. - initialize(logger slog.Logger, agentToken string) + // initialize establishes the dRPC connection using the provided + // agent token. Must be called before updateAppStatus. + initialize(ctx context.Context, logger slog.Logger, agentToken string) error + + // close cleanly shuts down the underlying dRPC connection. + close() error } // sdkClient is the concrete implementation of the client interface using @@ -103,42 +109,57 @@ func (c *sdkClient) initialize(logger slog.Logger) { c.coderClient.SetLogBodies(true) } -// sdkAppStatusPatcher is the concrete implementation of the appStatusPatcher interface -// using agentsdk.Client. -type sdkAppStatusPatcher struct { - agentClient *agentsdk.Client - url *url.URL - httpClient *http.Client +// sdkAppStatusUpdater is the concrete implementation of the +// appStatusUpdater interface. It dials the Agent dRPC endpoint once +// during initialize and reuses the connection for all subsequent +// UpdateAppStatus calls. +type sdkAppStatusUpdater struct { + drpcClient agentproto.DRPCAgentClient28 + url *url.URL + httpClient *http.Client } -// newAppStatusPatcher creates a new appStatusPatcher implementation. -func newAppStatusPatcher(client *codersdk.Client) appStatusPatcher { - return &sdkAppStatusPatcher{ +// newAppStatusUpdater creates a new appStatusUpdater implementation. +func newAppStatusUpdater(client *codersdk.Client) appStatusUpdater { + return &sdkAppStatusUpdater{ url: client.URL, httpClient: client.HTTPClient, } } -func (p *sdkAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error { - if p.agentClient == nil { - panic("agentClient not initialized - call initialize first") +func (u *sdkAppStatusUpdater) updateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) error { + if u.drpcClient == nil { + return xerrors.New("dRPC client not initialized - call initialize first") } - return p.agentClient.PatchAppStatus(ctx, req) + _, err := u.drpcClient.UpdateAppStatus(ctx, req) + return err } -func (p *sdkAppStatusPatcher) initialize(logger slog.Logger, agentToken string) { - // Create and configure the agent client with the provided token - p.agentClient = agentsdk.New( - p.url, +func (u *sdkAppStatusUpdater) close() error { + if u.drpcClient == nil { + return nil + } + return u.drpcClient.DRPCConn().Close() +} + +func (u *sdkAppStatusUpdater) initialize(ctx context.Context, logger slog.Logger, agentToken string) error { + agentClient := agentsdk.New( + u.url, agentsdk.WithFixedToken(agentToken), - codersdk.WithHTTPClient(p.httpClient), + codersdk.WithHTTPClient(u.httpClient), codersdk.WithLogger(logger), codersdk.WithLogBodies(), ) + drpcClient, _, err := agentClient.ConnectRPC28WithRole(ctx, "") + if err != nil { + return xerrors.Errorf("connect to agent dRPC endpoint: %w", err) + } + u.drpcClient = drpcClient + return nil } // Ensure sdkClient implements the client interface. var _ client = (*sdkClient)(nil) -// Ensure sdkAppStatusPatcher implements the appStatusPatcher interface. -var _ appStatusPatcher = (*sdkAppStatusPatcher)(nil) +// Ensure sdkAppStatusUpdater implements the appStatusUpdater interface. +var _ appStatusUpdater = (*sdkAppStatusUpdater)(nil) diff --git a/scaletest/taskstatus/run.go b/scaletest/taskstatus/run.go index c727e86349..cc569b169f 100644 --- a/scaletest/taskstatus/run.go +++ b/scaletest/taskstatus/run.go @@ -13,8 +13,8 @@ import ( "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/sloghuman" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/scaletest/harness" "github.com/coder/coder/v2/scaletest/loadtestutil" "github.com/coder/quartz" @@ -30,7 +30,7 @@ type createExternalWorkspaceResult struct { type Runner struct { client client - patcher appStatusPatcher + updater appStatusUpdater cfg Config logger slog.Logger @@ -55,7 +55,7 @@ var ( func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner { return &Runner{ client: newClient(coderClient), - patcher: newAppStatusPatcher(coderClient), + updater: newAppStatusUpdater(coderClient), cfg: cfg, clock: quartz.NewReal(), reportTimes: make(map[int]time.Time), @@ -96,9 +96,17 @@ func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error { r.workspaceID = result.workspaceID r.logger.Info(ctx, "created external workspace", slog.F("workspace_id", r.workspaceID)) - // Initialize the patcher with the agent token - r.patcher.initialize(r.logger, result.agentToken) - r.logger.Info(ctx, "initialized app status patcher with agent token") + // Establish the dRPC connection using the agent token. + if err := r.updater.initialize(ctx, r.logger, result.agentToken); err != nil { + r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc() + return xerrors.Errorf("initialize app status updater: %w", err) + } + defer func() { + if err := r.updater.close(); err != nil { + r.logger.Error(ctx, "failed to close app status updater", slog.Error(err)) + } + }() + r.logger.Info(ctx, "initialized app status updater with agent token") workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx) defer cancelWorkspaceUpdates() @@ -227,11 +235,11 @@ func (r *Runner) reportTaskStatus(ctx context.Context) error { } r.mu.Unlock() - err := r.patcher.patchAppStatus(ctx, agentsdk.PatchAppStatus{ - AppSlug: r.cfg.AppSlug, + err := r.updater.updateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{ + Slug: r.cfg.AppSlug, Message: statusUpdatePrefix + strconv.Itoa(msgNo), - State: codersdk.WorkspaceAppStatusStateWorking, - URI: "https://example.com/example-status/", + State: agentproto.UpdateAppStatusRequest_WORKING, + Uri: "https://example.com/example-status/", }) if err != nil { r.logger.Error(ctx, "failed to report task status", slog.Error(err)) diff --git a/scaletest/taskstatus/run_internal_test.go b/scaletest/taskstatus/run_internal_test.go index 694331fffc..118330e43f 100644 --- a/scaletest/taskstatus/run_internal_test.go +++ b/scaletest/taskstatus/run_internal_test.go @@ -15,8 +15,8 @@ import ( "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/sloghuman" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/testutil" "github.com/coder/quartz" ) @@ -115,43 +115,46 @@ func (m *fakeClient) deleteWorkspace(ctx context.Context, workspaceID uuid.UUID) return nil } -// fakeAppStatusPatcher implements the appStatusPatcher interface for testing -type fakeAppStatusPatcher struct { +// fakeAppStatusUpdater implements the appStatusUpdater interface for testing. +type fakeAppStatusUpdater struct { t *testing.T logger slog.Logger agentToken string // Channels for controlling the behavior - patchStatusCalls chan agentsdk.PatchAppStatus - patchStatusErrors chan error + updateStatusCalls chan *agentproto.UpdateAppStatusRequest + updateStatusErrors chan error } -func newFakeAppStatusPatcher(t *testing.T) *fakeAppStatusPatcher { - return &fakeAppStatusPatcher{ - t: t, - patchStatusCalls: make(chan agentsdk.PatchAppStatus), - patchStatusErrors: make(chan error, 1), +func newFakeAppStatusUpdater(t *testing.T) *fakeAppStatusUpdater { + return &fakeAppStatusUpdater{ + t: t, + updateStatusCalls: make(chan *agentproto.UpdateAppStatusRequest), + updateStatusErrors: make(chan error, 1), } } -func (p *fakeAppStatusPatcher) initialize(logger slog.Logger, agentToken string) { - p.logger = logger - p.agentToken = agentToken +func (u *fakeAppStatusUpdater) initialize(_ context.Context, logger slog.Logger, agentToken string) error { + u.logger = logger + u.agentToken = agentToken + return nil } -func (p *fakeAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error { - assert.NotEmpty(p.t, p.agentToken) - p.logger.Debug(ctx, "called fake PatchAppStatus", slog.F("req", req)) - // Send the request to the channel so tests can verify it +func (*fakeAppStatusUpdater) close() error { + return nil +} + +func (u *fakeAppStatusUpdater) updateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) error { + assert.NotEmpty(u.t, u.agentToken) + u.logger.Debug(ctx, "called fake UpdateAppStatus", slog.F("req", req)) select { - case p.patchStatusCalls <- req: + case u.updateStatusCalls <- req: case <-ctx.Done(): return ctx.Err() } - // Check if there's an error to return select { - case err := <-p.patchStatusErrors: + case err := <-u.updateStatusErrors: return err default: return nil @@ -165,7 +168,7 @@ func TestRunner_Run(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -190,7 +193,7 @@ func TestRunner_Run(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -224,17 +227,17 @@ func TestRunner_Run(t *testing.T) { // Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off. reportTickerTrap.MustWait(ctx).MustRelease(ctx) - // at this point, the patcher must be initialized - require.Equal(t, testAgentToken, fPatcher.agentToken) + // at this point, the updater must be initialized + require.Equal(t, testAgentToken, fUpdater.agentToken) updateDelay := time.Duration(0) for i := 0; i < 4; i++ { tickWaiter := mClock.Advance((10 * time.Second) - updateDelay) - patchCall := testutil.RequireReceive(ctx, t, fPatcher.patchStatusCalls) - require.Equal(t, appSlug, patchCall.AppSlug) - require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message) - require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State) + updateCall := testutil.RequireReceive(ctx, t, fUpdater.updateStatusCalls) + require.Equal(t, appSlug, updateCall.Slug) + require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), updateCall.Message) + require.Equal(t, agentproto.UpdateAppStatusRequest_WORKING, updateCall.State) tickWaiter.MustWait(ctx) // Send workspace update 1, 2, 3, or 4 seconds after the report @@ -287,7 +290,7 @@ func TestRunner_RunMissedUpdate(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -312,7 +315,7 @@ func TestRunner_RunMissedUpdate(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -349,10 +352,10 @@ func TestRunner_RunMissedUpdate(t *testing.T) { updateDelay := time.Duration(0) for i := 0; i < 4; i++ { tickWaiter := mClock.Advance((10 * time.Second) - updateDelay) - patchCall := testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls) - require.Equal(t, appSlug, patchCall.AppSlug) - require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message) - require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State) + updateCall := testutil.RequireReceive(testCtx, t, fUpdater.updateStatusCalls) + require.Equal(t, appSlug, updateCall.Slug) + require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), updateCall.Message) + require.Equal(t, agentproto.UpdateAppStatusRequest_WORKING, updateCall.State) tickWaiter.MustWait(testCtx) // Send workspace update 1, 2, 3, or 4 seconds after the report @@ -412,7 +415,7 @@ func TestRunner_Run_WithErrors(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -437,7 +440,7 @@ func TestRunner_Run_WithErrors(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -467,8 +470,8 @@ func TestRunner_Run_WithErrors(t *testing.T) { for i := 0; i < 4; i++ { tickWaiter := mClock.Advance(10 * time.Second) - testutil.RequireSend(testCtx, t, fPatcher.patchStatusErrors, xerrors.New("a bad thing happened")) - _ = testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls) + testutil.RequireSend(testCtx, t, fUpdater.updateStatusErrors, xerrors.New("a bad thing happened")) + _ = testutil.RequireReceive(testCtx, t, fUpdater.updateStatusCalls) tickWaiter.MustWait(testCtx) } @@ -513,7 +516,7 @@ func TestRunner_Run_BuildFailed(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -538,7 +541,7 @@ func TestRunner_Run_BuildFailed(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -671,7 +674,7 @@ func TestRunner_Cleanup(t *testing.T) { runner := &Runner{ client: fakeClient, - patcher: newFakeAppStatusPatcher(t), + updater: newFakeAppStatusUpdater(t), cfg: cfg, clock: quartz.NewMock(t), }