mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
feat(agent): wire agentcontext into agent lifecycle
Boot the agentcontext.Manager during agent init, seed its sources from the existing CODER_AGENT_EXP_*_DIRS env vars, run its watcher/resolver loop in the graceful context, and tear it down in agent.Close. Mount the source CRUD and resync HTTP API at /api/v0/context. Connect to coderd via the new ConnectRPC210WithRole and push each snapshot via DRPCPusher. The push goroutine is registered through a new startAgentAPI210 helper that exposes the v2.10 client. When the manifest lands, trigger a Resync so the snapshot reflects the workspace directory immediately instead of waiting for the next filesystem event. Add agenttest.Client.ContextStatePushes and a FakeAgentAPI PushContextState implementation so tests can assert on the push traffic, plus an end-to-end test that boots a real agent against the fake and verifies AGENTS.md appears in a snapshot.
This commit is contained in:
+119
-3
@@ -40,6 +40,7 @@ import (
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/clistat"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers"
|
||||
"github.com/coder/coder/v2/agent/agentcontext"
|
||||
"github.com/coder/coder/v2/agent/agentcontextconfig"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/agent/agentfiles"
|
||||
@@ -129,6 +130,15 @@ type Client interface {
|
||||
ConnectRPC29WithRole(ctx context.Context, role string) (
|
||||
proto.DRPCAgentClient29, tailnetproto.DRPCTailnetClient28, error,
|
||||
)
|
||||
ConnectRPC210(ctx context.Context) (
|
||||
proto.DRPCAgentClient210, tailnetproto.DRPCTailnetClient28, error,
|
||||
)
|
||||
// ConnectRPC210WithRole is like ConnectRPC210 but sends an explicit
|
||||
// role query parameter to the server. The workspace agent should
|
||||
// use role "agent" to enable connection monitoring.
|
||||
ConnectRPC210WithRole(ctx context.Context, role string) (
|
||||
proto.DRPCAgentClient210, tailnetproto.DRPCTailnetClient28, error,
|
||||
)
|
||||
tailnet.DERPMapRewriter
|
||||
agentsdk.RefreshableSessionTokenProvider
|
||||
}
|
||||
@@ -334,6 +344,8 @@ type agent struct {
|
||||
mcpManager *agentmcp.Manager
|
||||
mcpAPI *agentmcp.API
|
||||
contextConfigAPI *agentcontextconfig.API
|
||||
contextManager *agentcontext.Manager
|
||||
contextAPI *agentcontext.API
|
||||
|
||||
socketServerEnabled bool
|
||||
socketPath string
|
||||
@@ -431,6 +443,37 @@ func (a *agent) init() {
|
||||
return ""
|
||||
}, a.contextConfig)
|
||||
a.mcpAPI = agentmcp.NewAPI(a.logger.Named("mcp"), a.mcpManager, a.contextConfigAPI.MCPConfigFiles)
|
||||
|
||||
// agentcontext.Manager is the new consolidated resolver,
|
||||
// watcher, and pusher. It coexists with contextConfigAPI
|
||||
// and the MCP manager during rollout. Initial sources are
|
||||
// seeded from the existing CODER_AGENT_EXP_* env vars and
|
||||
// from the agent's working directory at scan time.
|
||||
workingDirFn := func() string {
|
||||
if m := a.manifest.Load(); m != nil {
|
||||
return m.Directory
|
||||
}
|
||||
return ""
|
||||
}
|
||||
ctxMgr, ctxMgrErr := agentcontext.NewManager(agentcontext.ManagerOptions{
|
||||
Logger: a.logger.Named("agentcontext"),
|
||||
Clock: a.clock,
|
||||
WorkingDir: workingDirFn,
|
||||
BuiltinRoots: defaultContextRoots(a.contextConfig, workingDirFn),
|
||||
InitialSources: initialContextSources(a.contextConfig, workingDirFn),
|
||||
AllowedRoots: defaultContextAllowedRoots(workingDirFn),
|
||||
})
|
||||
if ctxMgrErr != nil {
|
||||
// NewManager only errors on programmer mistakes today.
|
||||
// Log loudly so a future regression surfaces fast, and
|
||||
// fall back to a no-op manager so the rest of init can
|
||||
// proceed.
|
||||
a.logger.Critical(a.gracefulCtx, "agentcontext manager init failed", slog.Error(ctxMgrErr))
|
||||
}
|
||||
a.contextManager = ctxMgr
|
||||
if a.contextManager != nil {
|
||||
a.contextAPI = agentcontext.NewAPI(a.contextManager)
|
||||
}
|
||||
a.reconnectingPTYServer = reconnectingpty.NewServer(
|
||||
a.logger.Named("reconnecting-pty"),
|
||||
a.sshServer,
|
||||
@@ -447,6 +490,18 @@ func (a *agent) init() {
|
||||
a.initSocketServer()
|
||||
a.startBoundaryLogProxyServer()
|
||||
|
||||
// Start the agentcontext manager's resolver/watcher loop.
|
||||
// It runs for the lifetime of the agent and is closed in
|
||||
// agent.Close. The push goroutine is started per-connection
|
||||
// inside run() so it picks up the right drpc client.
|
||||
if a.contextManager != nil {
|
||||
go func() {
|
||||
if err := a.contextManager.Run(a.gracefulCtx); err != nil && !errors.Is(err, context.Canceled) {
|
||||
a.logger.Warn(a.gracefulCtx, "agentcontext manager run exited", slog.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go a.runLoop()
|
||||
}
|
||||
|
||||
@@ -1071,7 +1126,7 @@ func (a *agent) run() (retErr error) {
|
||||
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs.
|
||||
// We pass role "agent" to enable connection monitoring on the server, which tracks
|
||||
// the agent's connectivity state (first_connected_at, last_connected_at, disconnected_at).
|
||||
aAPI, tAPI, err := a.client.ConnectRPC29WithRole(a.hardCtx, "agent")
|
||||
aAPI, tAPI, err := a.client.ConnectRPC210WithRole(a.hardCtx, "agent")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1165,6 +1220,21 @@ func (a *agent) run() (retErr error) {
|
||||
// gracefulShutdownBehaviorRemain.
|
||||
connMan.startAgentAPI("report connections", gracefulShutdownBehaviorRemain, a.reportConnectionsLoop)
|
||||
|
||||
// Push resolved workspace context (instructions, skills, MCP
|
||||
// configs, MCP server tool lists) to coderd. The push loop
|
||||
// uses gracefulShutdownBehaviorStop because the snapshot is
|
||||
// only useful while chats are alive, and a stale snapshot at
|
||||
// shutdown costs nothing.
|
||||
if a.contextManager != nil {
|
||||
connMan.startAgentAPI210("push context state", gracefulShutdownBehaviorStop,
|
||||
func(ctx context.Context, aAPI proto.DRPCAgentClient210) error {
|
||||
pusher := agentcontext.NewDRPCPusher(aAPI)
|
||||
return a.contextManager.RunPush(ctx, pusher, agentcontext.PushOptions{
|
||||
Logger: a.logger.Named("agentcontext-push"),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// channels to sync goroutines below
|
||||
// handle manifest
|
||||
// |
|
||||
@@ -1312,6 +1382,17 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
|
||||
manifestOK.complete(nil)
|
||||
sentResult = true
|
||||
|
||||
// Manifest just landed; the agentcontext manager now has
|
||||
// a working directory to scan and a known set of scan
|
||||
// roots. Trigger a resync so the snapshot reflects the
|
||||
// workspace immediately instead of waiting for the next
|
||||
// filesystem event.
|
||||
if a.contextManager != nil {
|
||||
if _, resyncErr := a.contextManager.Resync(ctx); resyncErr != nil {
|
||||
a.logger.Debug(ctx, "agentcontext resync after manifest failed", slog.Error(resyncErr))
|
||||
}
|
||||
}
|
||||
|
||||
// Write secret files after signaling manifest readiness so that network
|
||||
// initialization (which depends on manifestOK) starts as soon as
|
||||
// possible. This creates a theoretical race where an SSH session that
|
||||
@@ -2265,6 +2346,12 @@ func (a *agent) Close() error {
|
||||
a.logger.Error(a.hardCtx, "mcp manager close", slog.Error(err))
|
||||
}
|
||||
|
||||
if a.contextManager != nil {
|
||||
if err := a.contextManager.Close(); err != nil {
|
||||
a.logger.Error(a.hardCtx, "agentcontext manager close", slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if a.boundaryLogProxy != nil {
|
||||
err = a.boundaryLogProxy.Close()
|
||||
if err != nil {
|
||||
@@ -2400,7 +2487,7 @@ const (
|
||||
|
||||
type apiConnRoutineManager struct {
|
||||
logger slog.Logger
|
||||
aAPI proto.DRPCAgentClient28
|
||||
aAPI proto.DRPCAgentClient210
|
||||
tAPI tailnetproto.DRPCTailnetClient28
|
||||
eg *errgroup.Group
|
||||
stopCtx context.Context
|
||||
@@ -2409,7 +2496,7 @@ type apiConnRoutineManager struct {
|
||||
|
||||
func newAPIConnRoutineManager(
|
||||
gracefulCtx, hardCtx context.Context, logger slog.Logger,
|
||||
aAPI proto.DRPCAgentClient28, tAPI tailnetproto.DRPCTailnetClient28,
|
||||
aAPI proto.DRPCAgentClient210, tAPI tailnetproto.DRPCTailnetClient28,
|
||||
) *apiConnRoutineManager {
|
||||
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
|
||||
// exit if the errgroup hits an error, which usually means a problem with the conn.
|
||||
@@ -2466,6 +2553,35 @@ func (a *apiConnRoutineManager) startAgentAPI(
|
||||
})
|
||||
}
|
||||
|
||||
// startAgentAPI210 is identical to startAgentAPI but passes the
|
||||
// full v2.10 Agent API client. Use it for routines that need
|
||||
// RPCs introduced after v2.8 (notably PushContextState).
|
||||
func (a *apiConnRoutineManager) startAgentAPI210(
|
||||
name string, behavior gracefulShutdownBehavior,
|
||||
f func(context.Context, proto.DRPCAgentClient210) error,
|
||||
) {
|
||||
logger := a.logger.With(slog.F("name", name))
|
||||
var ctx context.Context
|
||||
switch behavior {
|
||||
case gracefulShutdownBehaviorStop:
|
||||
ctx = a.stopCtx
|
||||
case gracefulShutdownBehaviorRemain:
|
||||
ctx = a.remainCtx
|
||||
default:
|
||||
panic("unknown behavior")
|
||||
}
|
||||
a.eg.Go(func() error {
|
||||
logger.Debug(ctx, "starting agent routine")
|
||||
err := f(ctx, a.aAPI)
|
||||
err = shouldPropagateError(ctx, logger, err)
|
||||
logger.Debug(ctx, "routine exited", slog.Error(err))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error in routine %s: %w", name, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// startTailnetAPI starts a routine that uses the Tailnet API. c.f. startAgentAPI which is the same
|
||||
// but for the Agent API.
|
||||
func (a *apiConnRoutineManager) startTailnetAPI(
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
package agent_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/coder/coder/v2/agent"
|
||||
"github.com/coder/coder/v2/agent/agentcontextconfig"
|
||||
"github.com/coder/coder/v2/agent/agenttest"
|
||||
agentproto "github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/codersdk/agentsdk"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
// TestAgent_ContextStatePushed verifies the agent's
|
||||
// agentcontext.Manager pushes its initial Snapshot to coderd
|
||||
// over the v2.10 PushContextState RPC during a normal boot.
|
||||
//
|
||||
// The test does not depend on the chatd side; it asserts only
|
||||
// that the snapshot reaches the FakeAgentAPI, that it carries
|
||||
// the initial flag, and that it includes the seeded AGENTS.md.
|
||||
func TestAgent_ContextStatePushed(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir := t.TempDir()
|
||||
require.NoError(t,
|
||||
os.WriteFile(filepath.Join(dir, "AGENTS.md"), []byte("test rules"), 0o600))
|
||||
|
||||
//nolint:dogsled // setupAgent returns a wide tuple; we only care about the client.
|
||||
_, client, _, _, _ := setupAgent(t,
|
||||
agentsdk.Manifest{Directory: dir},
|
||||
0,
|
||||
func(_ *agenttest.Client, opts *agent.Options) {
|
||||
opts.ContextConfig = agentcontextconfig.Config{}
|
||||
},
|
||||
)
|
||||
|
||||
// The first push is the initial empty-workspace snapshot
|
||||
// because the manifest has not been fetched yet. Wait for a
|
||||
// later push that includes the seeded AGENTS.md.
|
||||
var pushes []*agentproto.PushContextStateRequest
|
||||
require.Eventually(t, func() bool {
|
||||
pushes = client.ContextStatePushes()
|
||||
for _, push := range pushes {
|
||||
for _, r := range push.GetResources() {
|
||||
if r.GetKind() == agentproto.ContextResource_INSTRUCTION_FILE &&
|
||||
filepath.Base(r.GetSource()) == "AGENTS.md" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, testutil.WaitMedium, testutil.IntervalFast,
|
||||
"expected the seeded AGENTS.md to appear in a snapshot push; got %d pushes", len(pushes))
|
||||
|
||||
require.NotEmpty(t, pushes)
|
||||
first := pushes[0]
|
||||
assert.True(t, first.GetInitial(), "first push must carry Initial=true")
|
||||
assert.Equal(t, uint64(1), first.GetSchemaVersion(), "schema_version must be the v1 wire shape")
|
||||
assert.NotEmpty(t, first.GetAggregateHash(), "aggregate_hash must be populated")
|
||||
|
||||
// Subsequent pushes must not be Initial.
|
||||
for _, p := range pushes[1:] {
|
||||
assert.False(t, p.GetInitial(), "only the first push must be Initial")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"github.com/coder/coder/v2/agent/agentcontext"
|
||||
"github.com/coder/coder/v2/agent/agentcontextconfig"
|
||||
)
|
||||
|
||||
// defaultContextRoots returns the built-in scan roots layered
|
||||
// in front of any user-added sources. These mirror the paths
|
||||
// the existing agentcontextconfig API resolves at every chat
|
||||
// hydrate so the new resolver covers the same surface area.
|
||||
//
|
||||
// The slice is intentionally tolerant of missing entries; the
|
||||
// resolver silently skips canonicalization failures and
|
||||
// non-existent paths.
|
||||
func defaultContextRoots(_ agentcontextconfig.Config, _ func() string) []string {
|
||||
roots := make([]string, 0, 8)
|
||||
|
||||
// Working directory is added by the manager itself via the
|
||||
// WorkingDir option, so we do not include it here.
|
||||
|
||||
// User home Coder config (~/.coder, ~/.coder/skills).
|
||||
roots = append(roots, "~/.coder", "~/.coder/skills")
|
||||
|
||||
// Claude Code plugin cache, picked up by the plugin RFC
|
||||
// follow-up. v1 ignores plugin manifests but watching the
|
||||
// directory is harmless and prevents a surprise dirty bit
|
||||
// when the resolver eventually classifies them.
|
||||
roots = append(roots, "~/.claude/plugins/cache")
|
||||
|
||||
// Project-relative ".agents/skills" requires a working
|
||||
// directory to anchor. We let the manager append the
|
||||
// working directory itself, and the resolver picks up
|
||||
// nested ".agents/skills" automatically.
|
||||
|
||||
return roots
|
||||
}
|
||||
|
||||
// initialContextSources translates the boot-time
|
||||
// CODER_AGENT_EXP_*_DIRS env vars into agentcontext.Source
|
||||
// entries. This preserves the "set it on the template" workflow
|
||||
// while the user-facing CLI for source CRUD ships in a follow-up.
|
||||
func initialContextSources(cfg agentcontextconfig.Config, workingDir func() string) []agentcontext.Source {
|
||||
base := ""
|
||||
if workingDir != nil {
|
||||
base = workingDir()
|
||||
}
|
||||
|
||||
seen := make(map[string]struct{})
|
||||
var sources []agentcontext.Source
|
||||
add := func(path string) {
|
||||
if path == "" {
|
||||
return
|
||||
}
|
||||
if _, ok := seen[path]; ok {
|
||||
return
|
||||
}
|
||||
seen[path] = struct{}{}
|
||||
sources = append(sources, agentcontext.Source{Path: path})
|
||||
}
|
||||
for _, p := range agentcontextconfig.ResolvePaths(cfg.InstructionsDirs, base) {
|
||||
add(p)
|
||||
}
|
||||
for _, p := range agentcontextconfig.ResolvePaths(cfg.SkillsDirs, base) {
|
||||
add(p)
|
||||
}
|
||||
for _, p := range agentcontextconfig.ResolvePaths(cfg.MCPConfigFiles, base) {
|
||||
add(p)
|
||||
}
|
||||
return sources
|
||||
}
|
||||
|
||||
// defaultContextAllowedRoots returns the allow-list applied to
|
||||
// runtime AddSource calls. The set matches the RFC's authorization
|
||||
// section: the home directory's Coder + Claude config trees plus
|
||||
// the workspace's working directory.
|
||||
func defaultContextAllowedRoots(workingDir func() string) []string {
|
||||
roots := []string{"~", "~/.coder", "~/.claude"}
|
||||
if workingDir != nil {
|
||||
if wd := workingDir(); wd != "" {
|
||||
roots = append(roots, wd)
|
||||
}
|
||||
}
|
||||
return roots
|
||||
}
|
||||
@@ -146,6 +146,30 @@ func (c *Client) ConnectRPC29WithRole(ctx context.Context, _ string) (
|
||||
return c.ConnectRPC29(ctx)
|
||||
}
|
||||
|
||||
func (c *Client) ConnectRPC210(ctx context.Context) (
|
||||
agentproto.DRPCAgentClient210, proto.DRPCTailnetClient28, error,
|
||||
) {
|
||||
aAPI, tAPI, err := c.ConnectRPC29(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// The concrete drpcAgentClient implements every method on
|
||||
// the generated DRPCAgentClient interface, including
|
||||
// PushContextState, so the assertion always succeeds for
|
||||
// the fixture's own connections.
|
||||
client, ok := aAPI.(agentproto.DRPCAgentClient210)
|
||||
if !ok {
|
||||
return nil, nil, xerrors.Errorf("agenttest: connection does not implement DRPCAgentClient210; got %T", aAPI)
|
||||
}
|
||||
return client, tAPI, nil
|
||||
}
|
||||
|
||||
func (c *Client) ConnectRPC210WithRole(ctx context.Context, _ string) (
|
||||
agentproto.DRPCAgentClient210, proto.DRPCTailnetClient28, error,
|
||||
) {
|
||||
return c.ConnectRPC210(ctx)
|
||||
}
|
||||
|
||||
func (c *Client) ConnectRPC29(ctx context.Context) (
|
||||
agentproto.DRPCAgentClient29, proto.DRPCTailnetClient28, error,
|
||||
) {
|
||||
@@ -227,6 +251,12 @@ func (c *Client) GetSubAgentApps(id uuid.UUID) ([]*agentproto.CreateSubAgentRequ
|
||||
return c.fakeAgentAPI.GetSubAgentApps(id)
|
||||
}
|
||||
|
||||
// ContextStatePushes returns every PushContextState request the
|
||||
// agent has issued to the fake server so far.
|
||||
func (c *Client) ContextStatePushes() []*agentproto.PushContextStateRequest {
|
||||
return c.fakeAgentAPI.ContextStatePushes()
|
||||
}
|
||||
|
||||
type FakeAgentAPI struct {
|
||||
sync.Mutex
|
||||
t testing.TB
|
||||
@@ -249,12 +279,34 @@ type FakeAgentAPI struct {
|
||||
getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error)
|
||||
getResourcesMonitoringConfigurationFunc func() (*agentproto.GetResourcesMonitoringConfigurationResponse, error)
|
||||
pushResourcesMonitoringUsageFunc func(*agentproto.PushResourcesMonitoringUsageRequest) (*agentproto.PushResourcesMonitoringUsageResponse, error)
|
||||
|
||||
contextStatePushes []*agentproto.PushContextStateRequest
|
||||
}
|
||||
|
||||
func (*FakeAgentAPI) UpdateAppStatus(context.Context, *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// PushContextState records the incoming snapshot and returns
|
||||
// Accepted=true. Tests that need to assert against the captured
|
||||
// pushes can read them via ContextStatePushes.
|
||||
func (f *FakeAgentAPI) PushContextState(_ context.Context, req *agentproto.PushContextStateRequest) (*agentproto.PushContextStateResponse, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
f.contextStatePushes = append(f.contextStatePushes, req)
|
||||
return &agentproto.PushContextStateResponse{Accepted: true}, nil
|
||||
}
|
||||
|
||||
// ContextStatePushes returns a snapshot of every
|
||||
// PushContextState request received so far.
|
||||
func (f *FakeAgentAPI) ContextStatePushes() []*agentproto.PushContextStateRequest {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
out := make([]*agentproto.PushContextStateRequest, len(f.contextStatePushes))
|
||||
copy(out, f.contextStatePushes)
|
||||
return out
|
||||
}
|
||||
|
||||
func (f *FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
|
||||
return f.manifest, nil
|
||||
}
|
||||
|
||||
@@ -35,6 +35,9 @@ func (a *agent) apiHandler() http.Handler {
|
||||
r.Mount("/api/v0/desktop", a.desktopAPI.Routes())
|
||||
r.Mount("/api/v0/mcp", a.mcpAPI.Routes())
|
||||
r.Mount("/api/v0/context-config", a.contextConfigAPI.Routes())
|
||||
if a.contextAPI != nil {
|
||||
r.Mount("/api/v0/context", a.contextAPI.Routes())
|
||||
}
|
||||
|
||||
if a.devcontainers {
|
||||
r.Mount("/api/v0/containers", a.containerAPI.Routes())
|
||||
|
||||
Reference in New Issue
Block a user