From 947b390c5a38853e86e7e0fbfb812dfaa64ebb1e Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 24 Feb 2026 20:28:50 +0200 Subject: [PATCH] fix: allow agent-reported final states, add SSE reconnection (#22286) When AgentAPI is configured, `WithTaskReporter` unconditionally overrides all self-reported states to `working`. The intent was to distrust the agent's `idle` and rely on the screen watcher, but the override also blocks `failure` and `complete`, which only the agent can produce (the screen watcher only knows `running`/`stable`). Tasks get stuck as `working` or `null` forever. Now only `idle` is overridden to `working`; `failure`, `complete`, and `working` pass through as-is. Also: - Remove misplaced unconditional `"Failed to watch screen events"` log that fired on every startup - Add SSE reconnection with exponential backoff (1s-30s) in `startWatcher` so it recovers from dropped connections instead of dying silently - Add `complete` to the `coder_report_task` tool enum, which the `coder/claude-code` registry module already instructs agents to use but was missing from the schema Refs coder/internal#1350 --- cli/exp_mcp.go | 95 +++++++++--------- cli/exp_mcp_test.go | 186 +++++++++++++++++++++++++++++++++++- codersdk/toolsdk/toolsdk.go | 5 +- 3 files changed, 238 insertions(+), 48 deletions(-) diff --git a/cli/exp_mcp.go b/cli/exp_mcp.go index dfeac3669e..73ec187879 100644 --- a/cli/exp_mcp.go +++ b/cli/exp_mcp.go @@ -10,6 +10,7 @@ import ( "path/filepath" "slices" "strings" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -23,6 +24,7 @@ import ( "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/toolsdk" + "github.com/coder/retry" "github.com/coder/serpent" ) @@ -539,7 +541,6 @@ func (r *RootCmd) mcpServer() *serpent.Command { defer cancel() defer srv.queue.Close() - cliui.Infof(inv.Stderr, "Failed to watch screen events") // Start the reporter, watcher, and server. These are all tied to the // lifetime of the MCP server, which is itself tied to the lifetime of the // AI agent. @@ -613,48 +614,51 @@ func (s *mcpServer) startReporter(ctx context.Context, inv *serpent.Invocation) } func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) { - eventsCh, errCh, err := s.aiAgentAPIClient.SubscribeEvents(ctx) - if err != nil { - cliui.Warnf(inv.Stderr, "Failed to watch screen events: %s", err) - return - } go func() { - for { - select { - case <-ctx.Done(): - return - case event := <-eventsCh: - switch ev := event.(type) { - case agentapi.EventStatusChange: - // If the screen is stable, report idle. - state := codersdk.WorkspaceAppStatusStateWorking - if ev.Status == agentapi.StatusStable { - state = codersdk.WorkspaceAppStatusStateIdle - } - err := s.queue.Push(taskReport{ - state: state, - }) - if err != nil { - cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err) + for retrier := retry.New(time.Second, 30*time.Second); retrier.Wait(ctx); { + eventsCh, errCh, err := s.aiAgentAPIClient.SubscribeEvents(ctx) + if err == nil { + retrier.Reset() + loop: + for { + select { + case <-ctx.Done(): return - } - case agentapi.EventMessageUpdate: - if ev.Role == agentapi.RoleUser { - err := s.queue.Push(taskReport{ - messageID: &ev.Id, - state: codersdk.WorkspaceAppStatusStateWorking, - }) - if err != nil { - cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err) - return + case event := <-eventsCh: + switch ev := event.(type) { + case agentapi.EventStatusChange: + state := codersdk.WorkspaceAppStatusStateWorking + if ev.Status == agentapi.StatusStable { + state = codersdk.WorkspaceAppStatusStateIdle + } + err := s.queue.Push(taskReport{ + state: state, + }) + if err != nil { + cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err) + return + } + case agentapi.EventMessageUpdate: + if ev.Role == agentapi.RoleUser { + err := s.queue.Push(taskReport{ + messageID: &ev.Id, + state: codersdk.WorkspaceAppStatusStateWorking, + }) + if err != nil { + cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err) + return + } + } } + case err := <-errCh: + if !errors.Is(err, context.Canceled) { + cliui.Warnf(inv.Stderr, "Received error from screen event watcher: %s", err) + } + break loop } } - case err := <-errCh: - if !errors.Is(err, context.Canceled) { - cliui.Warnf(inv.Stderr, "Received error from screen event watcher: %s", err) - } - return + } else { + cliui.Warnf(inv.Stderr, "Failed to watch screen events: %s", err) } } }() @@ -692,13 +696,14 @@ func (s *mcpServer) startServer(ctx context.Context, inv *serpent.Invocation, in // Add tool dependencies. toolOpts := []func(*toolsdk.Deps){ toolsdk.WithTaskReporter(func(args toolsdk.ReportTaskArgs) error { - // The agent does not reliably report its status correctly. If AgentAPI - // is enabled, we will always set the status to "working" when we get an - // MCP message, and rely on the screen watcher to eventually catch the - // idle state. - state := codersdk.WorkspaceAppStatusStateWorking - if s.aiAgentAPIClient == nil { - state = codersdk.WorkspaceAppStatusState(args.State) + state := codersdk.WorkspaceAppStatusState(args.State) + // The agent does not reliably report idle, so when AgentAPI is + // enabled we override idle to working and let the screen watcher + // detect the real idle via StatusStable. Final states (failure, + // complete) are trusted from the agent since the screen watcher + // cannot produce them. + if s.aiAgentAPIClient != nil && state == codersdk.WorkspaceAppStatusStateIdle { + state = codersdk.WorkspaceAppStatusStateWorking } return s.queue.Push(taskReport{ link: args.Link, diff --git a/cli/exp_mcp_test.go b/cli/exp_mcp_test.go index 0a50a41e99..4ae2cf5753 100644 --- a/cli/exp_mcp_test.go +++ b/cli/exp_mcp_test.go @@ -921,7 +921,7 @@ func TestExpMcpReporter(t *testing.T) { }, }, }, - // We ignore the state from the agent and assume "working". + // We override idle from the agent to working, but trust final states. { name: "IgnoreAgentState", // AI agent reports that it is finished but the summary says it is doing @@ -953,6 +953,46 @@ func TestExpMcpReporter(t *testing.T) { Message: "finished", }, }, + // Agent reports failure; trusted even with AgentAPI enabled. + { + state: codersdk.WorkspaceAppStatusStateFailure, + summary: "something broke", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateFailure, + Message: "something broke", + }, + }, + // After failure, watcher reports stable -> idle. + { + event: makeStatusEvent(agentapi.StatusStable), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateIdle, + Message: "something broke", + }, + }, + }, + }, + // Final states pass through with AgentAPI enabled. + { + name: "AllowFinalStates", + tests: []test{ + { + state: codersdk.WorkspaceAppStatusStateWorking, + summary: "doing work", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "doing work", + }, + }, + // Agent reports complete; not overridden. + { + state: codersdk.WorkspaceAppStatusStateComplete, + summary: "all done", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateComplete, + Message: "all done", + }, + }, }, }, // When AgentAPI is not being used, we accept agent state updates as-is. @@ -1110,4 +1150,148 @@ func TestExpMcpReporter(t *testing.T) { <-cmdDone }) } + + t.Run("Reconnect", func(t *testing.T) { + t.Parallel() + + // Create a test deployment and workspace. + client, db := coderdtest.NewWithDatabase(t, nil) + user := coderdtest.CreateFirstUser(t, client) + client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID) + + r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + OrganizationID: user.OrganizationID, + OwnerID: user2.ID, + }).WithAgent(func(a []*proto.Agent) []*proto.Agent { + a[0].Apps = []*proto.App{ + { + Slug: "vscode", + }, + } + return a + }).Do() + + ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong)) + + // Watch the workspace for changes. + watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID) + require.NoError(t, err) + var lastAppStatus codersdk.WorkspaceAppStatus + nextUpdate := func() codersdk.WorkspaceAppStatus { + for { + select { + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for status update") + case w, ok := <-watcher: + require.True(t, ok, "watch channel closed") + if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID { + t.Logf("Got status update: %s > %s", lastAppStatus.State, w.LatestAppStatus.State) + lastAppStatus = *w.LatestAppStatus + return lastAppStatus + } + } + } + } + + // Mock AI AgentAPI server that supports disconnect/reconnect. + disconnect := make(chan struct{}) + listening := make(chan func(sse codersdk.ServerSentEvent) error) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Create a cancelable context so we can stop the SSE sender + // goroutine on disconnect without waiting for the HTTP + // serve loop to cancel r.Context(). + sseCtx, sseCancel := context.WithCancel(r.Context()) + defer sseCancel() + r = r.WithContext(sseCtx) + + send, closed, err := httpapi.ServerSentEventSender(w, r) + if err != nil { + httpapi.Write(sseCtx, w, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error setting up server-sent events.", + Detail: err.Error(), + }) + return + } + // Send initial message so the watcher knows the agent is active. + send(*makeMessageEvent(0, agentapi.RoleAgent)) + select { + case listening <- send: + case <-r.Context().Done(): + return + } + select { + case <-closed: + case <-disconnect: + sseCancel() + <-closed + } + })) + t.Cleanup(srv.Close) + + inv, _ := clitest.New(t, + "exp", "mcp", "server", + "--agent-url", client.URL.String(), + "--agent-token", r.AgentToken, + "--app-status-slug", "vscode", + "--allowed-tools=coder_report_task", + "--ai-agentapi-url", srv.URL, + ) + inv = inv.WithContext(ctx) + + pty := ptytest.New(t) + inv.Stdin = pty.Input() + inv.Stdout = pty.Output() + stderr := ptytest.New(t) + inv.Stderr = stderr.Output() + + // Run the MCP server. + clitest.Start(t, inv) + + // Initialize. + payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}` + pty.WriteLine(payload) + _ = pty.ReadLine(ctx) // ignore echo + _ = pty.ReadLine(ctx) // ignore init response + + // Get first sender from the initial SSE connection. + sender := testutil.RequireReceive(ctx, t, listening) + + // Self-report a working status via tool call. + toolPayload := `{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"doing work","link":""}}}` + pty.WriteLine(toolPayload) + _ = pty.ReadLine(ctx) // ignore echo + _ = pty.ReadLine(ctx) // ignore response + got := nextUpdate() + require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State) + require.Equal(t, "doing work", got.Message) + + // Watcher sends stable, verify idle is reported. + err = sender(*makeStatusEvent(agentapi.StatusStable)) + require.NoError(t, err) + got = nextUpdate() + require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State) + + // Disconnect the SSE connection by signaling the handler to return. + testutil.RequireSend(ctx, t, disconnect, struct{}{}) + + // Wait for the watcher to reconnect and get the new sender. + sender = testutil.RequireReceive(ctx, t, listening) + + // After reconnect, self-report a working status again. + toolPayload = `{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"reconnected","link":""}}}` + pty.WriteLine(toolPayload) + _ = pty.ReadLine(ctx) // ignore echo + _ = pty.ReadLine(ctx) // ignore response + got = nextUpdate() + require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State) + require.Equal(t, "reconnected", got.Message) + + // Verify the watcher still processes events after reconnect. + err = sender(*makeStatusEvent(agentapi.StatusStable)) + require.NoError(t, err) + got = nextUpdate() + require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State) + + cancel() + }) } diff --git a/codersdk/toolsdk/toolsdk.go b/codersdk/toolsdk/toolsdk.go index cf9fd55752..eab0e774f8 100644 --- a/codersdk/toolsdk/toolsdk.go +++ b/codersdk/toolsdk/toolsdk.go @@ -265,7 +265,7 @@ Bad Tasks Use the "state" field to indicate your progress. Periodically report progress with state "working" to keep the user updated. It is not possible to send too many updates! -ONLY report an "idle" or "failure" state if you have FULLY completed the task. +ONLY report a "complete", "idle", or "failure" state if you have FULLY completed the task. `, Schema: aisdk.Schema{ Properties: map[string]any{ @@ -279,9 +279,10 @@ ONLY report an "idle" or "failure" state if you have FULLY completed the task. }, "state": map[string]any{ "type": "string", - "description": "The state of your task. This can be one of the following: working, idle, or failure. Select the state that best represents your current progress.", + "description": "The state of your task. This can be one of the following: working, complete, idle, or failure. Select the state that best represents your current progress.", "enum": []string{ string(codersdk.WorkspaceAppStatusStateWorking), + string(codersdk.WorkspaceAppStatusStateComplete), string(codersdk.WorkspaceAppStatusStateIdle), string(codersdk.WorkspaceAppStatusStateFailure), },