From 9d37f63fbd34a0b13475e522c30306ef8f7c14e9 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 27 May 2026 13:49:42 -0700 Subject: [PATCH] feat: report synthetic metadata from fake agents (#25166) Fake agents now fetch their manifest, spawn a single per-agent metadata goroutine, and emit batched BatchUpdateMetadata calls with 3072-byte base64 payloads so scaletest runs mirror the load shape of real agents. This matches what the current scaletest workspace template does for metadata. In the future we can extend the harness here to take in a config option for the metadata payload size. --------- Signed-off-by: Callum Styan Co-authored-by: Mux --- enterprise/cli/exp_scaletest_agentfake.go | 2 +- enterprise/scaletest/agentfake/agent.go | 199 ++++++++++++- enterprise/scaletest/agentfake/agent_test.go | 145 +++++++--- enterprise/scaletest/agentfake/manager.go | 32 +- .../scaletest/agentfake/manager_test.go | 273 +++++++----------- 5 files changed, 435 insertions(+), 216 deletions(-) diff --git a/enterprise/cli/exp_scaletest_agentfake.go b/enterprise/cli/exp_scaletest_agentfake.go index a6c2e88649..cbfca70897 100644 --- a/enterprise/cli/exp_scaletest_agentfake.go +++ b/enterprise/cli/exp_scaletest_agentfake.go @@ -68,7 +68,7 @@ func (r *RootCmd) scaletestAgentFake() *serpent.Command { } logger := inv.Logger - mgr := agentfake.NewManager(client, logger, agentfake.ManagerOptions{ + mgr := agentfake.NewManager(client.URL, client, logger, agentfake.ManagerOptions{ Template: template, Owner: owner, }) diff --git a/enterprise/scaletest/agentfake/agent.go b/enterprise/scaletest/agentfake/agent.go index b03ebde8bd..c18d8e0310 100644 --- a/enterprise/scaletest/agentfake/agent.go +++ b/enterprise/scaletest/agentfake/agent.go @@ -2,34 +2,97 @@ package agentfake import ( "context" + "encoding/base64" "net/url" + "strings" "time" + "github.com/google/uuid" "golang.org/x/xerrors" "google.golang.org/protobuf/types/known/timestamppb" "cdr.dev/slog/v3" "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk/agentsdk" + tailnetproto "github.com/coder/coder/v2/tailnet/proto" + "github.com/coder/quartz" ) -const reconnectBackoff = 1 * time.Second +// rpcDialer is the subset of agentsdk.Client agentfake uses. Defined +// locally so tests can plug in *agent/agenttest.Client (or any other +// test double) without depending on the rest of the agentsdk.Client +// surface. +type rpcDialer interface { + ConnectRPC29WithRole(ctx context.Context, role string) ( + proto.DRPCAgentClient29, tailnetproto.DRPCTailnetClient28, error, + ) +} + +const ( + reconnectBackoff = 1 * time.Second + + // metadataTickInterval is the scheduler pulse for the per-agent metadata + // goroutine. Per-description cadence is enforced by tracking next-due + // timestamps; the ticker just wakes us up often enough to honor the + // shortest interval we expect (1s). + metadataTickInterval = 1 * time.Second + + // metadataValueBytes matches the payload size produced by the real + // scaletest template's metadata script (`dd if=/dev/urandom bs=3072 + // count=1 | base64`), so the synthetic load shape on the wire mirrors + // what a real agent emits. + metadataValueBytes = 3072 + + // metadataMinInterval is a floor applied to manifest-declared intervals + // to guard against a malformed manifest pinning the goroutine. + metadataMinInterval = 1 * time.Second +) // Agent is a single fake agent. It owns one workspace-agent auth token and one dRPC connection to coderd. type Agent struct { coderURL *url.URL token string logger slog.Logger + clock quartz.Clock + dialer rpcDialer // nil → built from coderURL+token in Run cancel context.CancelFunc } -func NewAgent(coderURL *url.URL, token string, logger slog.Logger) *Agent { - return &Agent{ +// Option configures an Agent. +type Option func(*Agent) + +// WithClock injects a clock for time-based operations. Defaults to +// quartz.NewReal(). Tests pass a *quartz.Mock to drive the metadata +// loop deterministically. The clock is per-agent so a future caller +// can give different agents slightly different cadences. +func WithClock(c quartz.Clock) Option { + return func(a *Agent) { + a.clock = c + } +} + +// WithDialer injects a custom RPC dialer. Defaults to a real +// agentsdk.Client built from coderURL + token. Tests use this to +// substitute *agent/agenttest.Client and avoid standing up a real +// coderd. +func WithDialer(d rpcDialer) Option { + return func(a *Agent) { + a.dialer = d + } +} + +func NewAgent(coderURL *url.URL, token string, logger slog.Logger, opts ...Option) *Agent { + a := &Agent{ coderURL: coderURL, token: token, logger: logger, + clock: quartz.NewReal(), } + for _, opt := range opts { + opt(a) + } + return a } // Run opens a dRPC websocket to coderd as the "agent" role and keeps it open until ctx is canceled or Close is called. @@ -42,7 +105,10 @@ func (a *Agent) Run(ctx context.Context) error { a.cancel = cancel defer a.cancel() - client := agentsdk.New(a.coderURL, agentsdk.WithFixedToken(a.token)) + client := a.dialer + if client == nil { + client = agentsdk.New(a.coderURL, agentsdk.WithFixedToken(a.token)) + } for { if err := runCtx.Err(); err != nil { return nil @@ -52,18 +118,20 @@ func (a *Agent) Run(ctx context.Context) error { a.logger.Warn(runCtx, "fake agent dRPC stream ended; reconnecting", slog.Error(err)) } + timer := a.clock.NewTimer(reconnectBackoff, "agentfake", "reconnect") select { case <-runCtx.Done(): + timer.Stop() return nil - case <-time.After(reconnectBackoff): + case <-timer.C: } } } // connectAndServe opens one dRPC websocket, announces lifecycle = READY, then blocks until ctx is canceled or the // connection is closed by either side. Returns the underlying error, if any. -func (a *Agent) connectAndServe(ctx context.Context, client *agentsdk.Client) error { - rpc, _, err := client.ConnectRPC28WithRole(ctx, "agent") +func (a *Agent) connectAndServe(ctx context.Context, client rpcDialer) error { + rpc, _, err := client.ConnectRPC29WithRole(ctx, "agent") if err != nil { return xerrors.Errorf("connect dRPC: %w", err) } @@ -87,6 +155,30 @@ func (a *Agent) connectAndServe(ctx context.Context, client *agentsdk.Client) er slog.Error(err)) } + // Fetch the agent manifest so we know which metadata descriptions the + // template declared. We synthesize values for each declared key at the + // declared interval. Failure here is non-fatal: a manifest fetch + // hiccup shouldn't tear the connection down, we just skip metadata + // for this session and let the next reconnect retry. + manifest, err := rpc.GetManifest(ctx, &proto.GetManifestRequest{}) + if err != nil { + if ctx.Err() == nil { + a.logger.Warn(ctx, "get manifest for metadata", slog.Error(err)) + } + } else if descs := manifest.GetMetadata(); len(descs) > 0 { + // Parse the workspace ID out of the manifest so we can embed it + // in the synthetic metadata payload below. If the manifest bytes + // are malformed (shouldn't happen in practice), fall back to + // uuid.Nil; the payload is still valid, just less identifiable. + workspaceID, idErr := uuid.FromBytes(manifest.GetWorkspaceId()) + if idErr != nil && ctx.Err() == nil { + a.logger.Warn(ctx, "parse workspace id from manifest; metadata payload will use uuid.Nil", + slog.Error(idErr)) + workspaceID = uuid.Nil + } + go a.runMetadata(ctx, rpc, workspaceID, descs) + } + select { case <-ctx.Done(): return nil @@ -95,6 +187,99 @@ func (a *Agent) connectAndServe(ctx context.Context, client *agentsdk.Client) er } } +// runMetadata sends synthetic values for every metadata description in the +// agent manifest, batching per-tick into a single BatchUpdateMetadata call. +// +// One goroutine per agent (not per description): a 1s ticker pulses and we +// track per-description next-due timestamps so each key reports at its own +// declared interval. The goroutine is scoped to the connection's ctx; on +// disconnect or shutdown it exits cleanly. +// +// The payload is a single fixed value, computed once: the workspace ID +// prepended to a constant padding so each metadata row in scaletest logs +// and the database is traceable back to the agent that emitted it. We +// intentionally do not vary the value per key or per tick; if a future +// scenario requires per-key/per-tick variation we can extend this then. +// +// Errors from BatchUpdateMetadata are logged and ignored. Tearing the +// connection down over a metadata RPC blip would be wasteful; real agents +// behave the same way (see agent.reportMetadata). +func (a *Agent) runMetadata(ctx context.Context, rpc proto.DRPCAgentClient29, workspaceID uuid.UUID, descs []*proto.WorkspaceAgentMetadata_Description) { + // Resolve declared intervals once, applying a floor so a malformed + // manifest can't spin us. Initialize all keys as immediately due so + // the first tick fires every description. + intervals := make([]time.Duration, len(descs)) + nextDue := make([]time.Time, len(descs)) + now := a.clock.Now() + for i, d := range descs { + // The Interval field on the proto is a durationpb.Duration but + // carries the raw int64 seconds value cast through time.Duration + // (see coderd/agentapi/manifest.go and agent/agent.go). Mirror the + // same recovery the real agent does so manifest-declared intervals + // of e.g. 10s are honored as 10s, not 10ns. + intervalSeconds := int64(d.GetInterval().AsDuration()) + interval := time.Duration(intervalSeconds) * time.Second + if interval < metadataMinInterval { + interval = metadataMinInterval + } + intervals[i] = interval + nextDue[i] = now + } + + // Build the metadata payload once: prepend the workspace ID so + // scaletest log lines and DB rows are traceable back to the + // emitting agent, then pad out to metadataValueBytes so the wire + // shape (base64-encoded ~4096 chars) mirrors the real scaletest + // template's `dd if=/dev/urandom bs=3072 count=1 | base64` output. + // coderd truncates the stored value to 2048 chars (see + // coderd/agentapi/metadata.go maxValueLen), and the workspace ID + // lives in the first ~50 chars of the base64 output, so it + // survives truncation. + const tag = "fake-agent-metadata workspace=" + prefix := tag + workspaceID.String() + " " + padLen := metadataValueBytes - len(prefix) + if padLen < 0 { + padLen = 0 + } + value := base64.StdEncoding.EncodeToString([]byte(prefix + strings.Repeat("a", padLen))) + + // TickerFunc spawns its own goroutine that ticks until ctx is + // done and then stops the underlying ticker. We Wait on the + // returned Waiter so that runMetadata (itself running in the + // goroutine spawned by connectAndServe) stays alive for the + // connection's lifetime, matching the pre-refactor for/select + // shape. The Wait error is discarded: ticker exits are expected + // (ctx cancellation), and our tick func never returns a non-nil + // error of its own. + _ = a.clock.TickerFunc(ctx, metadataTickInterval, func() error { + now := a.clock.Now() + var batch []*proto.Metadata + for i, d := range descs { + if now.Before(nextDue[i]) { + continue + } + batch = append(batch, &proto.Metadata{ + Key: d.GetKey(), + Result: &proto.WorkspaceAgentMetadata_Result{ + CollectedAt: timestamppb.New(now), + Value: value, + }, + }) + nextDue[i] = now.Add(intervals[i]) + } + if len(batch) == 0 { + return nil + } + if _, err := rpc.BatchUpdateMetadata(ctx, &proto.BatchUpdateMetadataRequest{ + Metadata: batch, + }); err != nil && ctx.Err() == nil { + a.logger.Debug(ctx, "batch update metadata failed", + slog.Error(err)) + } + return nil + }, "agentfake", "runMetadata").Wait() +} + // Close stops the agent. Safe to call multiple times. func (a *Agent) Close() { if a.cancel != nil { diff --git a/enterprise/scaletest/agentfake/agent_test.go b/enterprise/scaletest/agentfake/agent_test.go index d01776f66d..5997ef7f33 100644 --- a/enterprise/scaletest/agentfake/agent_test.go +++ b/enterprise/scaletest/agentfake/agent_test.go @@ -2,64 +2,62 @@ package agentfake_test import ( "context" + "encoding/base64" "testing" + "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/coderdtest" - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbfake" + "github.com/coder/coder/v2/agent/agenttest" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/enterprise/scaletest/agentfake" + "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/testutil" + "github.com/coder/quartz" ) // Assert that our fake agent routine establishes the drpc connection and sets its lifecycle status to Ready. func TestAgent_ConnectsAndReachesReady(t *testing.T) { t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - - client, db := coderdtest.NewWithDatabase(t, nil) - user := coderdtest.CreateFirstUser(t, client) - - r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ - OrganizationID: user.OrganizationID, - OwnerID: user.UserID, - }).WithAgent().Do() + ctx := testutil.Context(t, testutil.WaitShort) logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) - a := agentfake.NewAgent(client.URL, r.AgentToken, logger) - t.Cleanup(func() { a.Close() }) + agentID := uuid.New() + manifest := agentsdk.Manifest{ + AgentID: agentID, + WorkspaceID: uuid.New(), + } + statsCh := make(chan *agentproto.Stats, 1) + coord := tailnet.NewCoordinator(logger) + t.Cleanup(func() { _ = coord.Close() }) + dialer := agenttest.NewClient(t, logger, agentID, manifest, statsCh, coord) + t.Cleanup(dialer.Close) + + a := agentfake.NewAgent(nil, "", logger, agentfake.WithDialer(dialer)) + t.Cleanup(a.Close) runCtx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) runErr := make(chan error, 1) - go func() { - runErr <- a.Run(runCtx) - }() - - coderdtest.NewWorkspaceAgentWaiter(t, client, r.Workspace.ID). - WithContext(ctx). - Wait() + go func() { runErr <- a.Run(runCtx) }() + // The fake agent sends UpdateLifecycle(READY) once per dRPC + // connect; agenttest records every lifecycle update. require.Eventually(t, func() bool { - ws, err := client.Workspace(ctx, r.Workspace.ID) - if err != nil { - return false - } - for _, res := range ws.LatestBuild.Resources { - for _, agent := range res.Agents { - if agent.LifecycleState != codersdk.WorkspaceAgentLifecycleReady { - return false - } + for _, state := range dialer.GetLifecycleStates() { + if state == codersdk.WorkspaceAgentLifecycleReady { + return true } } - return true - }, testutil.WaitLong, testutil.IntervalFast, - "agent never reached Lifecycle=ready in workspace %s", r.Workspace.ID) + return false + }, testutil.WaitShort, testutil.IntervalFast, + "agent never reported Lifecycle=ready") // Cancel Run and confirm a clean exit (nil error, not ctx error). cancel() @@ -74,3 +72,84 @@ func TestAgent_ConnectsAndReachesReady(t *testing.T) { a.Close() a.Close() } + +// Assert that, when the workspace agent manifest declares metadata +// descriptions, the fake agent sends synthetic values for each key via +// BatchUpdateMetadata. The test drives the agent against +// agent/agenttest.Client (an in-process fake of the agent-side coderd +// API) rather than a real coderd, so the only quartz mock involved is +// the agentfake clock that drives the metadata ticker. +func TestAgent_SendsMetadata(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + + mClock := quartz.NewMock(t) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + + agentID := uuid.New() + manifest := agentsdk.Manifest{ + AgentID: agentID, + WorkspaceID: uuid.New(), + Metadata: []codersdk.WorkspaceAgentMetadataDescription{ + {Key: "01_meta", DisplayName: "Meta 01", Script: "noop", Interval: 1, Timeout: 10}, + {Key: "02_meta", DisplayName: "Meta 02", Script: "noop", Interval: 1, Timeout: 10}, + }, + } + + // statsCh and coord are required by agenttest.NewClient but + // unused by agentfake. The dialer is the standin for the real + // agentsdk.Client; it records every RPC the agent makes so we + // can assert against the metadata batch directly. + statsCh := make(chan *agentproto.Stats, 1) + coord := tailnet.NewCoordinator(logger) + t.Cleanup(func() { _ = coord.Close() }) + dialer := agenttest.NewClient(t, logger, agentID, manifest, statsCh, coord) + t.Cleanup(dialer.Close) + + a := agentfake.NewAgent(nil, "", logger, + agentfake.WithDialer(dialer), + agentfake.WithClock(mClock), + ) + t.Cleanup(a.Close) + + // Trap the agent's runMetadata TickerFunc registration so we know + // the goroutine is parked on the mock clock before we Advance. + // Otherwise Advance could race the goroutine startup and the + // first tick would be missed. + tickerTrap := mClock.Trap().TickerFunc("agentfake", "runMetadata") + defer tickerTrap.Close() + + runCtx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + runErr := make(chan error, 1) + go func() { runErr <- a.Run(runCtx) }() + + tickerTrap.MustWait(ctx).Release(ctx) + + // One tick fires runMetadata's tick func, which calls + // BatchUpdateMetadata against agenttest.Client. The fake records + // it synchronously in-process; no pubsub, batcher, or SSE involved. + mClock.Advance(time.Second).MustWait(ctx) + + require.Eventually(t, func() bool { + md := dialer.GetMetadata() + for _, key := range []string{"01_meta", "02_meta"} { + m, ok := md[key] + if !ok || m.Value == "" { + return false + } + if _, err := base64.StdEncoding.DecodeString(m.Value); err != nil { + return false + } + } + return true + }, testutil.WaitShort, testutil.IntervalFast) + + cancel() + select { + case err := <-runErr: + require.NoError(t, err, "Agent.Run returned unexpected error") + case <-ctx.Done(): + t.Fatalf("timed out waiting for Agent.Run to return: %v", ctx.Err()) + } +} diff --git a/enterprise/scaletest/agentfake/manager.go b/enterprise/scaletest/agentfake/manager.go index d03e48307b..69315e99c6 100644 --- a/enterprise/scaletest/agentfake/manager.go +++ b/enterprise/scaletest/agentfake/manager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "net/http" + "net/url" "strconv" "sync" "time" @@ -17,6 +18,16 @@ import ( "github.com/coder/coder/v2/codersdk" ) +// ExternalAgentClient is the subset of *codersdk.Client the Manager +// uses to enumerate external-agent workspaces under a template and +// fetch each agent's auth token. *codersdk.Client satisfies this +// interface, so production callers pass their client directly; tests +// substitute a fake without standing up a real coderd. +type ExternalAgentClient interface { + Workspaces(ctx context.Context, filter codersdk.WorkspaceFilter) (codersdk.WorkspacesResponse, error) + WorkspaceExternalAgentCredentials(ctx context.Context, workspaceID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error) +} + const ( enumeratePageSize = 100 maxEnumerateRetries = 5 @@ -48,9 +59,10 @@ type ManagerOptions struct { // (via coder_external_agent tokens on workspaces matching opts.Template), then opens a dRPC stream per agent and keeps // them connected until ctx is canceled. type Manager struct { - client *codersdk.Client - logger slog.Logger - opts ManagerOptions + coderURL *url.URL + client ExternalAgentClient + logger slog.Logger + opts ManagerOptions mu sync.Mutex agents []*Agent @@ -58,12 +70,14 @@ type Manager struct { // NewManager returns an Agent Manager. The provided client must already be authenticated with sufficient privilege // to list workspaces by template and to call the enterprise-only WorkspaceExternalAgentCredentials endpoint -// (template-admin or higher; FeatureWorkspaceExternalAgent must be enabled). -func NewManager(client *codersdk.Client, logger slog.Logger, opts ManagerOptions) *Manager { +// (template-admin or higher; FeatureWorkspaceExternalAgent must be enabled). coderURL is the URL the spawned +// fake agents will dial. +func NewManager(coderURL *url.URL, client ExternalAgentClient, logger slog.Logger, opts ManagerOptions) *Manager { return &Manager{ - client: client, - logger: logger, - opts: opts, + coderURL: coderURL, + client: client, + logger: logger, + opts: opts, } } @@ -84,7 +98,7 @@ func (m *Manager) Run(ctx context.Context) error { agents := make([]*Agent, 0, len(tokens)) for i, ti := range tokens { - agents = append(agents, NewAgent(m.client.URL, ti.Token, + agents = append(agents, NewAgent(m.coderURL, ti.Token, m.logger.Named("agent-"+strconv.Itoa(i)))) } m.mu.Lock() diff --git a/enterprise/scaletest/agentfake/manager_test.go b/enterprise/scaletest/agentfake/manager_test.go index 598729909f..769a773b1f 100644 --- a/enterprise/scaletest/agentfake/manager_test.go +++ b/enterprise/scaletest/agentfake/manager_test.go @@ -2,76 +2,131 @@ package agentfake_test import ( "context" - "database/sql" + "net/http" + "net/url" "sort" "testing" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/xerrors" "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/coderdtest" - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbfake" "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/enterprise/coderd/coderdenttest" - "github.com/coder/coder/v2/enterprise/coderd/license" "github.com/coder/coder/v2/enterprise/scaletest/agentfake" - sdkproto "github.com/coder/coder/v2/provisionersdk/proto" "github.com/coder/coder/v2/testutil" ) -// Asserts the TokenInfo shape (workspace IDs, agent names, tokens) returned by the enumeration loop. +// fakeExternalAgentClient is an in-package fake for the +// ExternalAgentClient interface used by +// Manager.EnumerateExternalAgents. Tests populate workspaces / +// credentials / workspacesErr before calling the Manager. +type fakeExternalAgentClient struct { + // workspaces, in the order Workspaces() should return them. Each + // call returns up to filter.Limit entries starting at filter.Offset + // to model pagination, matching real coderd behavior. + workspaces []codersdk.Workspace + // credentials, keyed by "{workspaceID}/{agentName}". A nil entry + // causes WorkspaceExternalAgentCredentials to error with notFoundErr. + credentials map[string]codersdk.ExternalAgentCredentials + + // workspacesErr, if non-nil, is returned from every Workspaces call. + workspacesErr error +} + +func (f *fakeExternalAgentClient) Workspaces(_ context.Context, filter codersdk.WorkspaceFilter) (codersdk.WorkspacesResponse, error) { + if f.workspacesErr != nil { + return codersdk.WorkspacesResponse{}, f.workspacesErr + } + start := filter.Offset + if start > len(f.workspaces) { + start = len(f.workspaces) + } + end := start + filter.Limit + if end > len(f.workspaces) { + end = len(f.workspaces) + } + page := f.workspaces[start:end] + return codersdk.WorkspacesResponse{ + Workspaces: page, + Count: len(f.workspaces), + }, nil +} + +func (f *fakeExternalAgentClient) WorkspaceExternalAgentCredentials(_ context.Context, wsID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error) { + key := wsID.String() + "/" + agentName + creds, ok := f.credentials[key] + if !ok { + return codersdk.ExternalAgentCredentials{}, xerrors.Errorf("no credentials for %s", key) + } + return creds, nil +} + +// externalAgentWorkspace returns a codersdk.Workspace whose latest +// build has HasExternalAgent=true and one agent with the given name. +func externalAgentWorkspace(t *testing.T, name, agentName string) (codersdk.Workspace, uuid.UUID) { + t.Helper() + wsID := uuid.New() + agentID := uuid.New() + hasExternal := true + return codersdk.Workspace{ + ID: wsID, + Name: name, + LatestBuild: codersdk.WorkspaceBuild{ + HasExternalAgent: &hasExternal, + Resources: []codersdk.WorkspaceResource{{ + Name: "external", + Type: "coder_external_agent", + Agents: []codersdk.WorkspaceAgent{{ + ID: agentID, + Name: agentName, + }}, + }}, + }, + }, agentID +} + +// Asserts the TokenInfo shape (workspace IDs, agent names, tokens) +// returned by the enumeration loop given a fake client. func Test_Manager_EnumerateExternalAgents_returnsAllTokens(t *testing.T) { t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - - client, db, user := coderdenttest.NewWithDatabase(t, &coderdenttest.Options{ - LicenseOptions: &coderdenttest.LicenseOptions{ - Features: license.Features{ - codersdk.FeatureWorkspaceExternalAgent: 1, - }, - }, - }) + ctx := testutil.Context(t, testutil.WaitShort) const numWorkspaces = 3 - first := buildExternalAgentWorkspace(t, db, user, uuid.Nil) - templateID := first.Workspace.TemplateID - want := []agentfake.TokenInfo{{ - WorkspaceID: first.Workspace.ID, - WorkspaceName: first.Workspace.Name, - AgentID: first.Agents[0].ID, - AgentName: first.Agents[0].Name, - Token: first.AgentToken, - }} - for i := 1; i < numWorkspaces; i++ { - r := buildExternalAgentWorkspace(t, db, user, templateID) + workspaces := make([]codersdk.Workspace, 0, numWorkspaces) + credentials := map[string]codersdk.ExternalAgentCredentials{} + want := make([]agentfake.TokenInfo, 0, numWorkspaces) + for i := 0; i < numWorkspaces; i++ { + agentName := "external" + ws, agentID := externalAgentWorkspace(t, "ws-"+uuid.NewString(), agentName) + workspaces = append(workspaces, ws) + token := uuid.NewString() + credentials[ws.ID.String()+"/"+agentName] = codersdk.ExternalAgentCredentials{ + AgentToken: token, + } want = append(want, agentfake.TokenInfo{ - WorkspaceID: r.Workspace.ID, - WorkspaceName: r.Workspace.Name, - AgentID: r.Agents[0].ID, - AgentName: r.Agents[0].Name, - Token: r.AgentToken, + WorkspaceID: ws.ID, + WorkspaceName: ws.Name, + AgentID: agentID, + AgentName: agentName, + Token: token, }) } - tmpl, err := client.Template(ctx, templateID) - require.NoError(t, err) - + client := &fakeExternalAgentClient{workspaces: workspaces, credentials: credentials} + coderURL, _ := url.Parse("http://fake") logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) - m := agentfake.NewManager(client, logger, agentfake.ManagerOptions{Template: tmpl.Name}) + m := agentfake.NewManager(coderURL, client, logger, agentfake.ManagerOptions{Template: "tmpl"}) got, err := m.EnumerateExternalAgents(ctx) require.NoError(t, err) - // Order returned by coderd isn't guaranteed; sort both sides by WorkspaceID before comparing. sortTokenInfosByWorkspaceID(want) sortTokenInfosByWorkspaceID(got) - require.Equal(t, len(want), len(got), - "expected one TokenInfo per external-agent workspace under the template") + require.Equal(t, len(want), len(got), "expected one TokenInfo per external-agent workspace") for i := range want { assert.Equal(t, want[i].WorkspaceID, got[i].WorkspaceID, "WorkspaceID for entry %d", i) assert.Equal(t, want[i].AgentName, got[i].AgentName, "AgentName for entry %d", i) @@ -80,109 +135,25 @@ func Test_Manager_EnumerateExternalAgents_returnsAllTokens(t *testing.T) { } } -// Heavier-weight integration test for the agentfake harness: builds 5 external agents, sets up the client/Manager, -// and asserts that each of the agents the Manager sees via its enumeration function is properly connected and Ready. -func TestManager_FiveAgentsHeartbeat(t *testing.T) { - t.Parallel() - - ctx := testutil.Context(t, testutil.WaitLong) - - client, db, user := coderdenttest.NewWithDatabase(t, &coderdenttest.Options{ - LicenseOptions: &coderdenttest.LicenseOptions{ - Features: license.Features{ - codersdk.FeatureWorkspaceExternalAgent: 1, - }, - }, - }) - - const numAgents = 5 - first := buildExternalAgentWorkspace(t, db, user, uuid.Nil) - templateID := first.Workspace.TemplateID - workspaceIDs := []uuid.UUID{first.Workspace.ID} - for i := 1; i < numAgents; i++ { - r := buildExternalAgentWorkspace(t, db, user, templateID) - workspaceIDs = append(workspaceIDs, r.Workspace.ID) - } - - tmpl, err := client.Template(ctx, templateID) - require.NoError(t, err) - - logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) - manager := agentfake.NewManager(client, logger, agentfake.ManagerOptions{ - Template: tmpl.Name, - }) - t.Cleanup(func() { manager.Close() }) - - managerCtx, cancelManager := context.WithCancel(ctx) - t.Cleanup(cancelManager) - - managerErr := make(chan error, 1) - go func() { - managerErr <- manager.Run(managerCtx) - }() - - // Each workspace's agent must reach Connected. Share the outer test ctx (testutil.WaitLong) across all five waiters - // so the total wait is bounded. - for _, wsID := range workspaceIDs { - coderdtest.NewWorkspaceAgentWaiter(t, client, wsID).WithContext(ctx).Wait() - } - - // Each workspace's agent must also reach Lifecycle=ready. The fake sends UpdateLifecycle(READY) once per dRPC - // connect; coderd persists that and exposes it on the agent. - for _, wsID := range workspaceIDs { - require.Eventually(t, func() bool { - ws, err := client.Workspace(ctx, wsID) - if err != nil { - return false - } - for _, res := range ws.LatestBuild.Resources { - for _, agent := range res.Agents { - if agent.LifecycleState != codersdk.WorkspaceAgentLifecycleReady { - return false - } - } - } - return true - }, testutil.WaitLong, testutil.IntervalFast, - "agent never reached Lifecycle=ready in workspace %s", wsID) - } - - // Cleanly stop the Manager and confirm it exits without a non-context error. - cancelManager() - select { - case err := <-managerErr: - if err != nil { - t.Fatalf("Manager.Run returned unexpected error: %v", err) - } - case <-ctx.Done(): - t.Fatalf("timed out waiting for Manager.Run to return: %v", ctx.Err()) - } -} - -// Asserts that an authentication failure during enumeration produces a fatal error, so the retry loop in -// enumerateWithRetry surfaces it immediately rather than hammering endpoints with credentials that will never work. +// Asserts that an authentication failure during enumeration produces a +// fatal error, so the retry loop in enumerateWithRetry surfaces it +// immediately rather than hammering endpoints with credentials that +// will never work. func Test_Manager_EnumerateExternalAgents_invalidTokenIsFatal(t *testing.T) { t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - - client, db := coderdtest.NewWithDatabase(t, nil) - user := coderdtest.CreateFirstUser(t, client) - - r := buildExternalAgentWorkspace(t, db, user, uuid.Nil) - tmpl, err := client.Template(ctx, r.Workspace.TemplateID) - require.NoError(t, err) - - // Replace the client's session token with garbage to provoke a 401 from coderd's workspace-list endpoint. - // The Manager should surface that as a fatal error. - client.SetSessionToken("not-a-valid-session-token") + ctx := testutil.Context(t, testutil.WaitShort) + client := &fakeExternalAgentClient{ + workspacesErr: codersdk.NewError(http.StatusUnauthorized, codersdk.Response{Message: "unauthorized"}), + } + coderURL, _ := url.Parse("http://fake") logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) - m := agentfake.NewManager(client, logger, agentfake.ManagerOptions{Template: tmpl.Name}) + m := agentfake.NewManager(coderURL, client, logger, agentfake.ManagerOptions{Template: "tmpl"}) - _, err = m.EnumerateExternalAgents(ctx) + _, err := m.EnumerateExternalAgents(ctx) require.Error(t, err, "expected enumeration to fail with an invalid session token") require.True(t, agentfake.IsFatalEnumerationError(err), - "expected error to be classified as fatal so the harness exits and Kubernetes can restart it; got: %v", err) + "expected error to be classified as fatal; got: %v", err) } func sortTokenInfosByWorkspaceID(s []agentfake.TokenInfo) { @@ -190,33 +161,3 @@ func sortTokenInfosByWorkspaceID(s []agentfake.TokenInfo) { return s[i].WorkspaceID.String() < s[j].WorkspaceID.String() }) } - -// buildExternalAgentWorkspace creates one workspace with a coder_external_agent resource, an agent, and -// HasExternalAgent=true on the latest build. If templateID is uuid.Nil, dbfake mints a fresh template (and the caller -// can pass the returned Workspace.TemplateID into subsequent calls to share the template). -func buildExternalAgentWorkspace( - t *testing.T, - db database.Store, - user codersdk.CreateFirstUserResponse, - templateID uuid.UUID, -) dbfake.WorkspaceResponse { - t.Helper() - - ws := database.WorkspaceTable{ - OrganizationID: user.OrganizationID, - OwnerID: user.UserID, - } - if templateID != uuid.Nil { - ws.TemplateID = templateID - } - return dbfake.WorkspaceBuild(t, db, ws). - Seed(database.WorkspaceBuild{ - HasExternalAgent: sql.NullBool{Bool: true, Valid: true}, - }). - Resource(&sdkproto.Resource{ - Name: "external", - Type: "coder_external_agent", - }). - WithAgent(). - Do() -}