diff --git a/agent/agent.go b/agent/agent.go index 33ad83c804..33c7d3b6f6 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -40,6 +40,7 @@ import ( "cdr.dev/slog/v3" "github.com/coder/clistat" "github.com/coder/coder/v2/agent/agentcontainers" + "github.com/coder/coder/v2/agent/agentcontext" "github.com/coder/coder/v2/agent/agentcontextconfig" "github.com/coder/coder/v2/agent/agentexec" "github.com/coder/coder/v2/agent/agentfiles" @@ -129,6 +130,15 @@ type Client interface { ConnectRPC29WithRole(ctx context.Context, role string) ( proto.DRPCAgentClient29, tailnetproto.DRPCTailnetClient28, error, ) + ConnectRPC210(ctx context.Context) ( + proto.DRPCAgentClient210, tailnetproto.DRPCTailnetClient28, error, + ) + // ConnectRPC210WithRole is like ConnectRPC210 but sends an explicit + // role query parameter to the server. The workspace agent should + // use role "agent" to enable connection monitoring. + ConnectRPC210WithRole(ctx context.Context, role string) ( + proto.DRPCAgentClient210, tailnetproto.DRPCTailnetClient28, error, + ) tailnet.DERPMapRewriter agentsdk.RefreshableSessionTokenProvider } @@ -334,6 +344,8 @@ type agent struct { mcpManager *agentmcp.Manager mcpAPI *agentmcp.API contextConfigAPI *agentcontextconfig.API + contextManager *agentcontext.Manager + contextAPI *agentcontext.API socketServerEnabled bool socketPath string @@ -431,6 +443,37 @@ func (a *agent) init() { return "" }, a.contextConfig) a.mcpAPI = agentmcp.NewAPI(a.logger.Named("mcp"), a.mcpManager, a.contextConfigAPI.MCPConfigFiles) + + // agentcontext.Manager is the new consolidated resolver, + // watcher, and pusher. It coexists with contextConfigAPI + // and the MCP manager during rollout. Initial sources are + // seeded from the existing CODER_AGENT_EXP_* env vars and + // from the agent's working directory at scan time. + workingDirFn := func() string { + if m := a.manifest.Load(); m != nil { + return m.Directory + } + return "" + } + ctxMgr, ctxMgrErr := agentcontext.NewManager(agentcontext.ManagerOptions{ + Logger: a.logger.Named("agentcontext"), + Clock: a.clock, + WorkingDir: workingDirFn, + BuiltinRoots: defaultContextRoots(a.contextConfig, workingDirFn), + InitialSources: initialContextSources(a.contextConfig, workingDirFn), + AllowedRoots: defaultContextAllowedRoots(workingDirFn), + }) + if ctxMgrErr != nil { + // NewManager only errors on programmer mistakes today. + // Log loudly so a future regression surfaces fast, and + // fall back to a no-op manager so the rest of init can + // proceed. + a.logger.Critical(a.gracefulCtx, "agentcontext manager init failed", slog.Error(ctxMgrErr)) + } + a.contextManager = ctxMgr + if a.contextManager != nil { + a.contextAPI = agentcontext.NewAPI(a.contextManager) + } a.reconnectingPTYServer = reconnectingpty.NewServer( a.logger.Named("reconnecting-pty"), a.sshServer, @@ -447,6 +490,18 @@ func (a *agent) init() { a.initSocketServer() a.startBoundaryLogProxyServer() + // Start the agentcontext manager's resolver/watcher loop. + // It runs for the lifetime of the agent and is closed in + // agent.Close. The push goroutine is started per-connection + // inside run() so it picks up the right drpc client. + if a.contextManager != nil { + go func() { + if err := a.contextManager.Run(a.gracefulCtx); err != nil && !errors.Is(err, context.Canceled) { + a.logger.Warn(a.gracefulCtx, "agentcontext manager run exited", slog.Error(err)) + } + }() + } + go a.runLoop() } @@ -1071,7 +1126,7 @@ func (a *agent) run() (retErr error) { // ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs. // We pass role "agent" to enable connection monitoring on the server, which tracks // the agent's connectivity state (first_connected_at, last_connected_at, disconnected_at). - aAPI, tAPI, err := a.client.ConnectRPC29WithRole(a.hardCtx, "agent") + aAPI, tAPI, err := a.client.ConnectRPC210WithRole(a.hardCtx, "agent") if err != nil { return err } @@ -1165,6 +1220,21 @@ func (a *agent) run() (retErr error) { // gracefulShutdownBehaviorRemain. connMan.startAgentAPI("report connections", gracefulShutdownBehaviorRemain, a.reportConnectionsLoop) + // Push resolved workspace context (instructions, skills, MCP + // configs, MCP server tool lists) to coderd. The push loop + // uses gracefulShutdownBehaviorStop because the snapshot is + // only useful while chats are alive, and a stale snapshot at + // shutdown costs nothing. + if a.contextManager != nil { + connMan.startAgentAPI210("push context state", gracefulShutdownBehaviorStop, + func(ctx context.Context, aAPI proto.DRPCAgentClient210) error { + pusher := agentcontext.NewDRPCPusher(aAPI) + return a.contextManager.RunPush(ctx, pusher, agentcontext.PushOptions{ + Logger: a.logger.Named("agentcontext-push"), + }) + }) + } + // channels to sync goroutines below // handle manifest // | @@ -1312,6 +1382,17 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, manifestOK.complete(nil) sentResult = true + // Manifest just landed; the agentcontext manager now has + // a working directory to scan and a known set of scan + // roots. Trigger a resync so the snapshot reflects the + // workspace immediately instead of waiting for the next + // filesystem event. + if a.contextManager != nil { + if _, resyncErr := a.contextManager.Resync(ctx); resyncErr != nil { + a.logger.Debug(ctx, "agentcontext resync after manifest failed", slog.Error(resyncErr)) + } + } + // Write secret files after signaling manifest readiness so that network // initialization (which depends on manifestOK) starts as soon as // possible. This creates a theoretical race where an SSH session that @@ -2265,6 +2346,12 @@ func (a *agent) Close() error { a.logger.Error(a.hardCtx, "mcp manager close", slog.Error(err)) } + if a.contextManager != nil { + if err := a.contextManager.Close(); err != nil { + a.logger.Error(a.hardCtx, "agentcontext manager close", slog.Error(err)) + } + } + if a.boundaryLogProxy != nil { err = a.boundaryLogProxy.Close() if err != nil { @@ -2400,7 +2487,7 @@ const ( type apiConnRoutineManager struct { logger slog.Logger - aAPI proto.DRPCAgentClient28 + aAPI proto.DRPCAgentClient210 tAPI tailnetproto.DRPCTailnetClient28 eg *errgroup.Group stopCtx context.Context @@ -2409,7 +2496,7 @@ type apiConnRoutineManager struct { func newAPIConnRoutineManager( gracefulCtx, hardCtx context.Context, logger slog.Logger, - aAPI proto.DRPCAgentClient28, tAPI tailnetproto.DRPCTailnetClient28, + aAPI proto.DRPCAgentClient210, tAPI tailnetproto.DRPCTailnetClient28, ) *apiConnRoutineManager { // routines that remain in operation during graceful shutdown use the remainCtx. They'll still // exit if the errgroup hits an error, which usually means a problem with the conn. @@ -2466,6 +2553,35 @@ func (a *apiConnRoutineManager) startAgentAPI( }) } +// startAgentAPI210 is identical to startAgentAPI but passes the +// full v2.10 Agent API client. Use it for routines that need +// RPCs introduced after v2.8 (notably PushContextState). +func (a *apiConnRoutineManager) startAgentAPI210( + name string, behavior gracefulShutdownBehavior, + f func(context.Context, proto.DRPCAgentClient210) error, +) { + logger := a.logger.With(slog.F("name", name)) + var ctx context.Context + switch behavior { + case gracefulShutdownBehaviorStop: + ctx = a.stopCtx + case gracefulShutdownBehaviorRemain: + ctx = a.remainCtx + default: + panic("unknown behavior") + } + a.eg.Go(func() error { + logger.Debug(ctx, "starting agent routine") + err := f(ctx, a.aAPI) + err = shouldPropagateError(ctx, logger, err) + logger.Debug(ctx, "routine exited", slog.Error(err)) + if err != nil { + return xerrors.Errorf("error in routine %s: %w", name, err) + } + return nil + }) +} + // startTailnetAPI starts a routine that uses the Tailnet API. c.f. startAgentAPI which is the same // but for the Agent API. func (a *apiConnRoutineManager) startTailnetAPI( diff --git a/agent/agent_context_test.go b/agent/agent_context_test.go new file mode 100644 index 0000000000..ccab590368 --- /dev/null +++ b/agent/agent_context_test.go @@ -0,0 +1,70 @@ +package agent_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/agent" + "github.com/coder/coder/v2/agent/agentcontextconfig" + "github.com/coder/coder/v2/agent/agenttest" + agentproto "github.com/coder/coder/v2/agent/proto" + "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/coder/v2/testutil" +) + +// TestAgent_ContextStatePushed verifies the agent's +// agentcontext.Manager pushes its initial Snapshot to coderd +// over the v2.10 PushContextState RPC during a normal boot. +// +// The test does not depend on the chatd side; it asserts only +// that the snapshot reaches the FakeAgentAPI, that it carries +// the initial flag, and that it includes the seeded AGENTS.md. +func TestAgent_ContextStatePushed(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + require.NoError(t, + os.WriteFile(filepath.Join(dir, "AGENTS.md"), []byte("test rules"), 0o600)) + + //nolint:dogsled // setupAgent returns a wide tuple; we only care about the client. + _, client, _, _, _ := setupAgent(t, + agentsdk.Manifest{Directory: dir}, + 0, + func(_ *agenttest.Client, opts *agent.Options) { + opts.ContextConfig = agentcontextconfig.Config{} + }, + ) + + // The first push is the initial empty-workspace snapshot + // because the manifest has not been fetched yet. Wait for a + // later push that includes the seeded AGENTS.md. + var pushes []*agentproto.PushContextStateRequest + require.Eventually(t, func() bool { + pushes = client.ContextStatePushes() + for _, push := range pushes { + for _, r := range push.GetResources() { + if r.GetKind() == agentproto.ContextResource_INSTRUCTION_FILE && + filepath.Base(r.GetSource()) == "AGENTS.md" { + return true + } + } + } + return false + }, testutil.WaitMedium, testutil.IntervalFast, + "expected the seeded AGENTS.md to appear in a snapshot push; got %d pushes", len(pushes)) + + require.NotEmpty(t, pushes) + first := pushes[0] + assert.True(t, first.GetInitial(), "first push must carry Initial=true") + assert.Equal(t, uint64(1), first.GetSchemaVersion(), "schema_version must be the v1 wire shape") + assert.NotEmpty(t, first.GetAggregateHash(), "aggregate_hash must be populated") + + // Subsequent pushes must not be Initial. + for _, p := range pushes[1:] { + assert.False(t, p.GetInitial(), "only the first push must be Initial") + } +} diff --git a/agent/agentcontext_seed.go b/agent/agentcontext_seed.go new file mode 100644 index 0000000000..0fb93bcce0 --- /dev/null +++ b/agent/agentcontext_seed.go @@ -0,0 +1,85 @@ +package agent + +import ( + "github.com/coder/coder/v2/agent/agentcontext" + "github.com/coder/coder/v2/agent/agentcontextconfig" +) + +// defaultContextRoots returns the built-in scan roots layered +// in front of any user-added sources. These mirror the paths +// the existing agentcontextconfig API resolves at every chat +// hydrate so the new resolver covers the same surface area. +// +// The slice is intentionally tolerant of missing entries; the +// resolver silently skips canonicalization failures and +// non-existent paths. +func defaultContextRoots(_ agentcontextconfig.Config, _ func() string) []string { + roots := make([]string, 0, 8) + + // Working directory is added by the manager itself via the + // WorkingDir option, so we do not include it here. + + // User home Coder config (~/.coder, ~/.coder/skills). + roots = append(roots, "~/.coder", "~/.coder/skills") + + // Claude Code plugin cache, picked up by the plugin RFC + // follow-up. v1 ignores plugin manifests but watching the + // directory is harmless and prevents a surprise dirty bit + // when the resolver eventually classifies them. + roots = append(roots, "~/.claude/plugins/cache") + + // Project-relative ".agents/skills" requires a working + // directory to anchor. We let the manager append the + // working directory itself, and the resolver picks up + // nested ".agents/skills" automatically. + + return roots +} + +// initialContextSources translates the boot-time +// CODER_AGENT_EXP_*_DIRS env vars into agentcontext.Source +// entries. This preserves the "set it on the template" workflow +// while the user-facing CLI for source CRUD ships in a follow-up. +func initialContextSources(cfg agentcontextconfig.Config, workingDir func() string) []agentcontext.Source { + base := "" + if workingDir != nil { + base = workingDir() + } + + seen := make(map[string]struct{}) + var sources []agentcontext.Source + add := func(path string) { + if path == "" { + return + } + if _, ok := seen[path]; ok { + return + } + seen[path] = struct{}{} + sources = append(sources, agentcontext.Source{Path: path}) + } + for _, p := range agentcontextconfig.ResolvePaths(cfg.InstructionsDirs, base) { + add(p) + } + for _, p := range agentcontextconfig.ResolvePaths(cfg.SkillsDirs, base) { + add(p) + } + for _, p := range agentcontextconfig.ResolvePaths(cfg.MCPConfigFiles, base) { + add(p) + } + return sources +} + +// defaultContextAllowedRoots returns the allow-list applied to +// runtime AddSource calls. The set matches the RFC's authorization +// section: the home directory's Coder + Claude config trees plus +// the workspace's working directory. +func defaultContextAllowedRoots(workingDir func() string) []string { + roots := []string{"~", "~/.coder", "~/.claude"} + if workingDir != nil { + if wd := workingDir(); wd != "" { + roots = append(roots, wd) + } + } + return roots +} diff --git a/agent/agenttest/client.go b/agent/agenttest/client.go index 474469d7ff..e748dbe6fc 100644 --- a/agent/agenttest/client.go +++ b/agent/agenttest/client.go @@ -146,6 +146,30 @@ func (c *Client) ConnectRPC29WithRole(ctx context.Context, _ string) ( return c.ConnectRPC29(ctx) } +func (c *Client) ConnectRPC210(ctx context.Context) ( + agentproto.DRPCAgentClient210, proto.DRPCTailnetClient28, error, +) { + aAPI, tAPI, err := c.ConnectRPC29(ctx) + if err != nil { + return nil, nil, err + } + // The concrete drpcAgentClient implements every method on + // the generated DRPCAgentClient interface, including + // PushContextState, so the assertion always succeeds for + // the fixture's own connections. + client, ok := aAPI.(agentproto.DRPCAgentClient210) + if !ok { + return nil, nil, xerrors.Errorf("agenttest: connection does not implement DRPCAgentClient210; got %T", aAPI) + } + return client, tAPI, nil +} + +func (c *Client) ConnectRPC210WithRole(ctx context.Context, _ string) ( + agentproto.DRPCAgentClient210, proto.DRPCTailnetClient28, error, +) { + return c.ConnectRPC210(ctx) +} + func (c *Client) ConnectRPC29(ctx context.Context) ( agentproto.DRPCAgentClient29, proto.DRPCTailnetClient28, error, ) { @@ -227,6 +251,12 @@ func (c *Client) GetSubAgentApps(id uuid.UUID) ([]*agentproto.CreateSubAgentRequ return c.fakeAgentAPI.GetSubAgentApps(id) } +// ContextStatePushes returns every PushContextState request the +// agent has issued to the fake server so far. +func (c *Client) ContextStatePushes() []*agentproto.PushContextStateRequest { + return c.fakeAgentAPI.ContextStatePushes() +} + type FakeAgentAPI struct { sync.Mutex t testing.TB @@ -249,12 +279,34 @@ type FakeAgentAPI struct { getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error) getResourcesMonitoringConfigurationFunc func() (*agentproto.GetResourcesMonitoringConfigurationResponse, error) pushResourcesMonitoringUsageFunc func(*agentproto.PushResourcesMonitoringUsageRequest) (*agentproto.PushResourcesMonitoringUsageResponse, error) + + contextStatePushes []*agentproto.PushContextStateRequest } func (*FakeAgentAPI) UpdateAppStatus(context.Context, *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) { panic("unimplemented") } +// PushContextState records the incoming snapshot and returns +// Accepted=true. Tests that need to assert against the captured +// pushes can read them via ContextStatePushes. +func (f *FakeAgentAPI) PushContextState(_ context.Context, req *agentproto.PushContextStateRequest) (*agentproto.PushContextStateResponse, error) { + f.Lock() + defer f.Unlock() + f.contextStatePushes = append(f.contextStatePushes, req) + return &agentproto.PushContextStateResponse{Accepted: true}, nil +} + +// ContextStatePushes returns a snapshot of every +// PushContextState request received so far. +func (f *FakeAgentAPI) ContextStatePushes() []*agentproto.PushContextStateRequest { + f.Lock() + defer f.Unlock() + out := make([]*agentproto.PushContextStateRequest, len(f.contextStatePushes)) + copy(out, f.contextStatePushes) + return out +} + func (f *FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) { return f.manifest, nil } diff --git a/agent/api.go b/agent/api.go index 0346805528..91f575675f 100644 --- a/agent/api.go +++ b/agent/api.go @@ -35,6 +35,9 @@ func (a *agent) apiHandler() http.Handler { r.Mount("/api/v0/desktop", a.desktopAPI.Routes()) r.Mount("/api/v0/mcp", a.mcpAPI.Routes()) r.Mount("/api/v0/context-config", a.contextConfigAPI.Routes()) + if a.contextAPI != nil { + r.Mount("/api/v0/context", a.contextAPI.Routes()) + } if a.devcontainers { r.Mount("/api/v0/containers", a.containerAPI.Routes())