diff --git a/scaletest/taskstatus/client.go b/scaletest/taskstatus/client.go index 48e934e5dd..0ddc7b8627 100644 --- a/scaletest/taskstatus/client.go +++ b/scaletest/taskstatus/client.go @@ -9,6 +9,7 @@ import ( "golang.org/x/xerrors" "cdr.dev/slog/v3" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/quartz" @@ -41,15 +42,20 @@ type client interface { initialize(logger slog.Logger) } -// appStatusPatcher abstracts the details of using agentsdk.Client for updating app status. -// This interface is separate from client because it requires an agent token which is only -// available after creating an external workspace. -type appStatusPatcher interface { - // patchAppStatus updates the status of a workspace app. - patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error +// appStatusUpdater abstracts the details of updating app status via the +// Agent dRPC API. This interface is separate from client because it +// requires an agent token which is only available after creating an +// external workspace. +type appStatusUpdater interface { + // updateAppStatus sends a status update for a workspace app. + updateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) error - // initialize sets up the patcher with the provided logger and agent token. - initialize(logger slog.Logger, agentToken string) + // initialize establishes the dRPC connection using the provided + // agent token. Must be called before updateAppStatus. + initialize(ctx context.Context, logger slog.Logger, agentToken string) error + + // close cleanly shuts down the underlying dRPC connection. + close() error } // sdkClient is the concrete implementation of the client interface using @@ -103,42 +109,57 @@ func (c *sdkClient) initialize(logger slog.Logger) { c.coderClient.SetLogBodies(true) } -// sdkAppStatusPatcher is the concrete implementation of the appStatusPatcher interface -// using agentsdk.Client. -type sdkAppStatusPatcher struct { - agentClient *agentsdk.Client - url *url.URL - httpClient *http.Client +// sdkAppStatusUpdater is the concrete implementation of the +// appStatusUpdater interface. It dials the Agent dRPC endpoint once +// during initialize and reuses the connection for all subsequent +// UpdateAppStatus calls. +type sdkAppStatusUpdater struct { + drpcClient agentproto.DRPCAgentClient28 + url *url.URL + httpClient *http.Client } -// newAppStatusPatcher creates a new appStatusPatcher implementation. -func newAppStatusPatcher(client *codersdk.Client) appStatusPatcher { - return &sdkAppStatusPatcher{ +// newAppStatusUpdater creates a new appStatusUpdater implementation. +func newAppStatusUpdater(client *codersdk.Client) appStatusUpdater { + return &sdkAppStatusUpdater{ url: client.URL, httpClient: client.HTTPClient, } } -func (p *sdkAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error { - if p.agentClient == nil { - panic("agentClient not initialized - call initialize first") +func (u *sdkAppStatusUpdater) updateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) error { + if u.drpcClient == nil { + return xerrors.New("dRPC client not initialized - call initialize first") } - return p.agentClient.PatchAppStatus(ctx, req) + _, err := u.drpcClient.UpdateAppStatus(ctx, req) + return err } -func (p *sdkAppStatusPatcher) initialize(logger slog.Logger, agentToken string) { - // Create and configure the agent client with the provided token - p.agentClient = agentsdk.New( - p.url, +func (u *sdkAppStatusUpdater) close() error { + if u.drpcClient == nil { + return nil + } + return u.drpcClient.DRPCConn().Close() +} + +func (u *sdkAppStatusUpdater) initialize(ctx context.Context, logger slog.Logger, agentToken string) error { + agentClient := agentsdk.New( + u.url, agentsdk.WithFixedToken(agentToken), - codersdk.WithHTTPClient(p.httpClient), + codersdk.WithHTTPClient(u.httpClient), codersdk.WithLogger(logger), codersdk.WithLogBodies(), ) + drpcClient, _, err := agentClient.ConnectRPC28WithRole(ctx, "") + if err != nil { + return xerrors.Errorf("connect to agent dRPC endpoint: %w", err) + } + u.drpcClient = drpcClient + return nil } // Ensure sdkClient implements the client interface. var _ client = (*sdkClient)(nil) -// Ensure sdkAppStatusPatcher implements the appStatusPatcher interface. -var _ appStatusPatcher = (*sdkAppStatusPatcher)(nil) +// Ensure sdkAppStatusUpdater implements the appStatusUpdater interface. +var _ appStatusUpdater = (*sdkAppStatusUpdater)(nil) diff --git a/scaletest/taskstatus/run.go b/scaletest/taskstatus/run.go index c727e86349..cc569b169f 100644 --- a/scaletest/taskstatus/run.go +++ b/scaletest/taskstatus/run.go @@ -13,8 +13,8 @@ import ( "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/sloghuman" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/scaletest/harness" "github.com/coder/coder/v2/scaletest/loadtestutil" "github.com/coder/quartz" @@ -30,7 +30,7 @@ type createExternalWorkspaceResult struct { type Runner struct { client client - patcher appStatusPatcher + updater appStatusUpdater cfg Config logger slog.Logger @@ -55,7 +55,7 @@ var ( func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner { return &Runner{ client: newClient(coderClient), - patcher: newAppStatusPatcher(coderClient), + updater: newAppStatusUpdater(coderClient), cfg: cfg, clock: quartz.NewReal(), reportTimes: make(map[int]time.Time), @@ -96,9 +96,17 @@ func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error { r.workspaceID = result.workspaceID r.logger.Info(ctx, "created external workspace", slog.F("workspace_id", r.workspaceID)) - // Initialize the patcher with the agent token - r.patcher.initialize(r.logger, result.agentToken) - r.logger.Info(ctx, "initialized app status patcher with agent token") + // Establish the dRPC connection using the agent token. + if err := r.updater.initialize(ctx, r.logger, result.agentToken); err != nil { + r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc() + return xerrors.Errorf("initialize app status updater: %w", err) + } + defer func() { + if err := r.updater.close(); err != nil { + r.logger.Error(ctx, "failed to close app status updater", slog.Error(err)) + } + }() + r.logger.Info(ctx, "initialized app status updater with agent token") workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx) defer cancelWorkspaceUpdates() @@ -227,11 +235,11 @@ func (r *Runner) reportTaskStatus(ctx context.Context) error { } r.mu.Unlock() - err := r.patcher.patchAppStatus(ctx, agentsdk.PatchAppStatus{ - AppSlug: r.cfg.AppSlug, + err := r.updater.updateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{ + Slug: r.cfg.AppSlug, Message: statusUpdatePrefix + strconv.Itoa(msgNo), - State: codersdk.WorkspaceAppStatusStateWorking, - URI: "https://example.com/example-status/", + State: agentproto.UpdateAppStatusRequest_WORKING, + Uri: "https://example.com/example-status/", }) if err != nil { r.logger.Error(ctx, "failed to report task status", slog.Error(err)) diff --git a/scaletest/taskstatus/run_internal_test.go b/scaletest/taskstatus/run_internal_test.go index 694331fffc..118330e43f 100644 --- a/scaletest/taskstatus/run_internal_test.go +++ b/scaletest/taskstatus/run_internal_test.go @@ -15,8 +15,8 @@ import ( "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/sloghuman" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/testutil" "github.com/coder/quartz" ) @@ -115,43 +115,46 @@ func (m *fakeClient) deleteWorkspace(ctx context.Context, workspaceID uuid.UUID) return nil } -// fakeAppStatusPatcher implements the appStatusPatcher interface for testing -type fakeAppStatusPatcher struct { +// fakeAppStatusUpdater implements the appStatusUpdater interface for testing. +type fakeAppStatusUpdater struct { t *testing.T logger slog.Logger agentToken string // Channels for controlling the behavior - patchStatusCalls chan agentsdk.PatchAppStatus - patchStatusErrors chan error + updateStatusCalls chan *agentproto.UpdateAppStatusRequest + updateStatusErrors chan error } -func newFakeAppStatusPatcher(t *testing.T) *fakeAppStatusPatcher { - return &fakeAppStatusPatcher{ - t: t, - patchStatusCalls: make(chan agentsdk.PatchAppStatus), - patchStatusErrors: make(chan error, 1), +func newFakeAppStatusUpdater(t *testing.T) *fakeAppStatusUpdater { + return &fakeAppStatusUpdater{ + t: t, + updateStatusCalls: make(chan *agentproto.UpdateAppStatusRequest), + updateStatusErrors: make(chan error, 1), } } -func (p *fakeAppStatusPatcher) initialize(logger slog.Logger, agentToken string) { - p.logger = logger - p.agentToken = agentToken +func (u *fakeAppStatusUpdater) initialize(_ context.Context, logger slog.Logger, agentToken string) error { + u.logger = logger + u.agentToken = agentToken + return nil } -func (p *fakeAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error { - assert.NotEmpty(p.t, p.agentToken) - p.logger.Debug(ctx, "called fake PatchAppStatus", slog.F("req", req)) - // Send the request to the channel so tests can verify it +func (*fakeAppStatusUpdater) close() error { + return nil +} + +func (u *fakeAppStatusUpdater) updateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) error { + assert.NotEmpty(u.t, u.agentToken) + u.logger.Debug(ctx, "called fake UpdateAppStatus", slog.F("req", req)) select { - case p.patchStatusCalls <- req: + case u.updateStatusCalls <- req: case <-ctx.Done(): return ctx.Err() } - // Check if there's an error to return select { - case err := <-p.patchStatusErrors: + case err := <-u.updateStatusErrors: return err default: return nil @@ -165,7 +168,7 @@ func TestRunner_Run(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -190,7 +193,7 @@ func TestRunner_Run(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -224,17 +227,17 @@ func TestRunner_Run(t *testing.T) { // Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off. reportTickerTrap.MustWait(ctx).MustRelease(ctx) - // at this point, the patcher must be initialized - require.Equal(t, testAgentToken, fPatcher.agentToken) + // at this point, the updater must be initialized + require.Equal(t, testAgentToken, fUpdater.agentToken) updateDelay := time.Duration(0) for i := 0; i < 4; i++ { tickWaiter := mClock.Advance((10 * time.Second) - updateDelay) - patchCall := testutil.RequireReceive(ctx, t, fPatcher.patchStatusCalls) - require.Equal(t, appSlug, patchCall.AppSlug) - require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message) - require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State) + updateCall := testutil.RequireReceive(ctx, t, fUpdater.updateStatusCalls) + require.Equal(t, appSlug, updateCall.Slug) + require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), updateCall.Message) + require.Equal(t, agentproto.UpdateAppStatusRequest_WORKING, updateCall.State) tickWaiter.MustWait(ctx) // Send workspace update 1, 2, 3, or 4 seconds after the report @@ -287,7 +290,7 @@ func TestRunner_RunMissedUpdate(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -312,7 +315,7 @@ func TestRunner_RunMissedUpdate(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -349,10 +352,10 @@ func TestRunner_RunMissedUpdate(t *testing.T) { updateDelay := time.Duration(0) for i := 0; i < 4; i++ { tickWaiter := mClock.Advance((10 * time.Second) - updateDelay) - patchCall := testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls) - require.Equal(t, appSlug, patchCall.AppSlug) - require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message) - require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State) + updateCall := testutil.RequireReceive(testCtx, t, fUpdater.updateStatusCalls) + require.Equal(t, appSlug, updateCall.Slug) + require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), updateCall.Message) + require.Equal(t, agentproto.UpdateAppStatusRequest_WORKING, updateCall.State) tickWaiter.MustWait(testCtx) // Send workspace update 1, 2, 3, or 4 seconds after the report @@ -412,7 +415,7 @@ func TestRunner_Run_WithErrors(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -437,7 +440,7 @@ func TestRunner_Run_WithErrors(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -467,8 +470,8 @@ func TestRunner_Run_WithErrors(t *testing.T) { for i := 0; i < 4; i++ { tickWaiter := mClock.Advance(10 * time.Second) - testutil.RequireSend(testCtx, t, fPatcher.patchStatusErrors, xerrors.New("a bad thing happened")) - _ = testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls) + testutil.RequireSend(testCtx, t, fUpdater.updateStatusErrors, xerrors.New("a bad thing happened")) + _ = testutil.RequireReceive(testCtx, t, fUpdater.updateStatusCalls) tickWaiter.MustWait(testCtx) } @@ -513,7 +516,7 @@ func TestRunner_Run_BuildFailed(t *testing.T) { mClock := quartz.NewMock(t) fClient := newFakeClient(t) - fPatcher := newFakeAppStatusPatcher(t) + fUpdater := newFakeAppStatusUpdater(t) templateID := uuid.UUID{5, 6, 7, 8} workspaceName := "test-workspace" appSlug := "test-app" @@ -538,7 +541,7 @@ func TestRunner_Run_BuildFailed(t *testing.T) { } runner := &Runner{ client: fClient, - patcher: fPatcher, + updater: fUpdater, cfg: cfg, clock: mClock, reportTimes: make(map[int]time.Time), @@ -671,7 +674,7 @@ func TestRunner_Cleanup(t *testing.T) { runner := &Runner{ client: fakeClient, - patcher: newFakeAppStatusPatcher(t), + updater: newFakeAppStatusUpdater(t), cfg: cfg, clock: quartz.NewMock(t), }