From fc7700a62f6472709a193a842fa31a4e925ecf20 Mon Sep 17 00:00:00 2001 From: Asher Date: Mon, 30 Jun 2025 12:12:20 -0800 Subject: [PATCH] fix: improve reliability of app statuses (#18622) We were discarding all "working" updates from the screen watcher because we cannot tell the difference between the agent or user changing the screen, but it makes sense to accept it as the very first update, because the agent could be working but neglected to report that fact, so you would never get an initial "working" update (it would just eventually go straight to "idle"). Also messages can start at zero, so I made a fix for that as well, although the first message will be from the LLM and we ignore those anyway, so this probably has no actual effect, but seems more technically correct. And it seems I forgot to actually update the last message ID, which also does not actually matter for user messages (since I think the SSE endpoint will not re-emit a user message it has already emitted), but seems more technically correct to check. Lastly, if we have the screen watcher, ignore the agent's self-reported state and always use "working" since it is unreliable. The idle state will eventually be caught by the watcher. --- cli/exp_mcp.go | 86 ++++--- cli/exp_mcp_test.go | 533 +++++++++++++++++++++++++++----------------- 2 files changed, 392 insertions(+), 227 deletions(-) diff --git a/cli/exp_mcp.go b/cli/exp_mcp.go index 0a1c9fcbea..5cfd902513 100644 --- a/cli/exp_mcp.go +++ b/cli/exp_mcp.go @@ -362,11 +362,19 @@ func (*RootCmd) mcpConfigureCursor() *serpent.Command { } type taskReport struct { - link string - messageID int64 + // link is optional. + link string + // messageID must be set if this update is from a *user* message. A user + // message only happens when interacting via the AI AgentAPI (as opposed to + // interacting with the terminal directly). + messageID *int64 + // selfReported must be set if the update is directly from the AI agent + // (as opposed to the screen watcher). selfReported bool - state codersdk.WorkspaceAppStatusState - summary string + // state must always be set. + state codersdk.WorkspaceAppStatusState + // summary is optional. + summary string } type mcpServer struct { @@ -388,31 +396,48 @@ func (r *RootCmd) mcpServer() *serpent.Command { return &serpent.Command{ Use: "server", Handler: func(inv *serpent.Invocation) error { - // lastUserMessageID is the ID of the last *user* message that we saw. A - // user message only happens when interacting via the AI AgentAPI (as - // opposed to interacting with the terminal directly). - var lastUserMessageID int64 var lastReport taskReport // Create a queue that skips duplicates and preserves summaries. queue := cliutil.NewQueue[taskReport](512).WithPredicate(func(report taskReport) (taskReport, bool) { - // Use "working" status if this is a new user message. If this is not a - // new user message, and the status is "working" and not self-reported - // (meaning it came from the screen watcher), then it means one of two - // things: - // 1. The AI agent is still working, so there is nothing to update. - // 2. The AI agent stopped working, then the user has interacted with - // the terminal directly. For now, we are ignoring these updates. - // This risks missing cases where the user manually submits a new - // prompt and the AI agent becomes active and does not update itself, - // but it avoids spamming useless status updates as the user is - // typing, so the tradeoff is worth it. In the future, if we can - // reliably distinguish between user and AI agent activity, we can - // change this. - if report.messageID > lastUserMessageID { - report.state = codersdk.WorkspaceAppStatusStateWorking - } else if report.state == codersdk.WorkspaceAppStatusStateWorking && !report.selfReported { + // Avoid queuing empty statuses (this would probably indicate a + // developer error) + if report.state == "" { return report, false } + // If this is a user message, discard if it is not new. + if report.messageID != nil && lastReport.messageID != nil && + *lastReport.messageID >= *report.messageID { + return report, false + } + // If this is not a user message, and the status is "working" and not + // self-reported (meaning it came from the screen watcher), then it + // means one of two things: + // + // 1. The AI agent is not working; the user is interacting with the + // terminal directly. + // 2. The AI agent is working. + // + // At the moment, we have no way to tell the difference between these + // two states. In the future, if we can reliably distinguish between + // user and AI agent activity, we can change this. + // + // If this is our first update, we assume it is the AI agent working and + // accept the update. + // + // Otherwise we discard the update. This risks missing cases where the + // user manually submits a new prompt and the AI agent becomes active + // (and does not update itself), but it avoids spamming useless status + // updates as the user is typing, so the tradeoff is worth it. + if report.messageID == nil && + report.state == codersdk.WorkspaceAppStatusStateWorking && + !report.selfReported && lastReport.state != "" { + return report, false + } + // Keep track of the last message ID so we can tell when a message is + // new or if it has been re-emitted. + if report.messageID == nil { + report.messageID = lastReport.messageID + } // Preserve previous message and URI if there was no message. if report.summary == "" { report.summary = lastReport.summary @@ -600,7 +625,8 @@ func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) { case agentapi.EventMessageUpdate: if ev.Role == agentapi.RoleUser { err := s.queue.Push(taskReport{ - messageID: ev.Id, + messageID: &ev.Id, + state: codersdk.WorkspaceAppStatusStateWorking, }) if err != nil { cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err) @@ -650,10 +676,18 @@ func (s *mcpServer) startServer(ctx context.Context, inv *serpent.Invocation, in // Add tool dependencies. toolOpts := []func(*toolsdk.Deps){ toolsdk.WithTaskReporter(func(args toolsdk.ReportTaskArgs) error { + // The agent does not reliably report its status correctly. If AgentAPI + // is enabled, we will always set the status to "working" when we get an + // MCP message, and rely on the screen watcher to eventually catch the + // idle state. + state := codersdk.WorkspaceAppStatusStateWorking + if s.aiAgentAPIClient == nil { + state = codersdk.WorkspaceAppStatusState(args.State) + } return s.queue.Push(taskReport{ link: args.Link, selfReported: true, - state: codersdk.WorkspaceAppStatusState(args.State), + state: state, summary: args.Summary, }) }), diff --git a/cli/exp_mcp_test.go b/cli/exp_mcp_test.go index bcfafb0204..0a50a41e99 100644 --- a/cli/exp_mcp_test.go +++ b/cli/exp_mcp_test.go @@ -763,220 +763,351 @@ func TestExpMcpReporter(t *testing.T) { <-cmdDone }) - t.Run("OK", func(t *testing.T) { - t.Parallel() + makeStatusEvent := func(status agentapi.AgentStatus) *codersdk.ServerSentEvent { + return &codersdk.ServerSentEvent{ + Type: ServerSentEventTypeStatusChange, + Data: agentapi.EventStatusChange{ + Status: status, + }, + } + } - // Create a test deployment and workspace. - client, db := coderdtest.NewWithDatabase(t, nil) - user := coderdtest.CreateFirstUser(t, client) - client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID) + makeMessageEvent := func(id int64, role agentapi.ConversationRole) *codersdk.ServerSentEvent { + return &codersdk.ServerSentEvent{ + Type: ServerSentEventTypeMessageUpdate, + Data: agentapi.EventMessageUpdate{ + Id: id, + Role: role, + }, + } + } - r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ - OrganizationID: user.OrganizationID, - OwnerID: user2.ID, - }).WithAgent(func(a []*proto.Agent) []*proto.Agent { - a[0].Apps = []*proto.App{ + type test struct { + // event simulates an event from the screen watcher. + event *codersdk.ServerSentEvent + // state, summary, and uri simulate a tool call from the AI agent. + state codersdk.WorkspaceAppStatusState + summary string + uri string + expected *codersdk.WorkspaceAppStatus + } + + runs := []struct { + name string + tests []test + disableAgentAPI bool + }{ + // In this run the AI agent starts with a state change but forgets to update + // that it finished. + { + name: "Active", + tests: []test{ + // First the AI agent updates with a state change. { - Slug: "vscode", + state: codersdk.WorkspaceAppStatusStateWorking, + summary: "doing work", + uri: "https://dev.coder.com", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "doing work", + URI: "https://dev.coder.com", + }, }, - } - return a - }).Do() - - makeStatusEvent := func(status agentapi.AgentStatus) *codersdk.ServerSentEvent { - return &codersdk.ServerSentEvent{ - Type: ServerSentEventTypeStatusChange, - Data: agentapi.EventStatusChange{ - Status: status, + // Terminal goes quiet but the AI agent forgot the update, and it is + // caught by the screen watcher. Message and URI are preserved. + { + event: makeStatusEvent(agentapi.StatusStable), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateIdle, + Message: "doing work", + URI: "https://dev.coder.com", + }, }, - } - } - - makeMessageEvent := func(id int64, role agentapi.ConversationRole) *codersdk.ServerSentEvent { - return &codersdk.ServerSentEvent{ - Type: ServerSentEventTypeMessageUpdate, - Data: agentapi.EventMessageUpdate{ - Id: id, - Role: role, + // A stable update now from the watcher should be discarded, as it is a + // duplicate. + { + event: makeStatusEvent(agentapi.StatusStable), }, - } - } + // Terminal becomes active again according to the screen watcher, but no + // new user message. This could be the AI agent being active again, but + // it could also be the user messing around. We will prefer not updating + // the status so the "working" update here should be skipped. + // + // TODO: How do we test the no-op updates? This update is skipped + // because of the logic mentioned above, but how do we prove this update + // was skipped because of that and not that the next update was skipped + // because it is a duplicate state? We could mock the queue? + { + event: makeStatusEvent(agentapi.StatusRunning), + }, + // Agent messages are ignored. + { + event: makeMessageEvent(0, agentapi.RoleAgent), + }, + // The watcher reports the screen is active again... + { + event: makeStatusEvent(agentapi.StatusRunning), + }, + // ... but this time we have a new user message so we know there is AI + // agent activity. This time the "working" update will not be skipped. + { + event: makeMessageEvent(1, agentapi.RoleUser), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "doing work", + URI: "https://dev.coder.com", + }, + }, + // Watcher reports stable again. + { + event: makeStatusEvent(agentapi.StatusStable), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateIdle, + Message: "doing work", + URI: "https://dev.coder.com", + }, + }, + }, + }, + // In this run the AI agent never sends any state changes. + { + name: "Inactive", + tests: []test{ + // The "working" status from the watcher should be accepted, even though + // there is no new user message, because it is the first update. + { + event: makeStatusEvent(agentapi.StatusRunning), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "", + URI: "", + }, + }, + // Stable update should be accepted. + { + event: makeStatusEvent(agentapi.StatusStable), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateIdle, + Message: "", + URI: "", + }, + }, + // Zero ID should be accepted. + { + event: makeMessageEvent(0, agentapi.RoleUser), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "", + URI: "", + }, + }, + // Stable again. + { + event: makeStatusEvent(agentapi.StatusStable), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateIdle, + Message: "", + URI: "", + }, + }, + // Next ID. + { + event: makeMessageEvent(1, agentapi.RoleUser), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "", + URI: "", + }, + }, + }, + }, + // We ignore the state from the agent and assume "working". + { + name: "IgnoreAgentState", + // AI agent reports that it is finished but the summary says it is doing + // work. + tests: []test{ + { + state: codersdk.WorkspaceAppStatusStateIdle, + summary: "doing work", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "doing work", + }, + }, + // AI agent reports finished again, with a matching summary. We still + // assume it is working. + { + state: codersdk.WorkspaceAppStatusStateIdle, + summary: "finished", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "finished", + }, + }, + // Once the watcher reports stable, then we record idle. + { + event: makeStatusEvent(agentapi.StatusStable), + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateIdle, + Message: "finished", + }, + }, + }, + }, + // When AgentAPI is not being used, we accept agent state updates as-is. + { + name: "KeepAgentState", + tests: []test{ + { + state: codersdk.WorkspaceAppStatusStateWorking, + summary: "doing work", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateWorking, + Message: "doing work", + }, + }, + { + state: codersdk.WorkspaceAppStatusStateIdle, + summary: "finished", + expected: &codersdk.WorkspaceAppStatus{ + State: codersdk.WorkspaceAppStatusStateIdle, + Message: "finished", + }, + }, + }, + disableAgentAPI: true, + }, + } - ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitShort)) + for _, run := range runs { + run := run + t.Run(run.name, func(t *testing.T) { + t.Parallel() - // Mock the AI AgentAPI server. - listening := make(chan func(sse codersdk.ServerSentEvent) error) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - send, closed, err := httpapi.ServerSentEventSender(w, r) - if err != nil { - httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{ - Message: "Internal error setting up server-sent events.", - Detail: err.Error(), - }) - return - } - // Send initial message. - send(*makeMessageEvent(0, agentapi.RoleAgent)) - listening <- send - <-closed - })) - t.Cleanup(srv.Close) - aiAgentAPIURL := srv.URL + ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitShort)) - // Watch the workspace for changes. - watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID) - require.NoError(t, err) - var lastAppStatus codersdk.WorkspaceAppStatus - nextUpdate := func() codersdk.WorkspaceAppStatus { - for { - select { - case <-ctx.Done(): - require.FailNow(t, "timed out waiting for status update") - case w, ok := <-watcher: - require.True(t, ok, "watch channel closed") - if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID { - lastAppStatus = *w.LatestAppStatus - return lastAppStatus + // Create a test deployment and workspace. + client, db := coderdtest.NewWithDatabase(t, nil) + user := coderdtest.CreateFirstUser(t, client) + client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID) + + r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + OrganizationID: user.OrganizationID, + OwnerID: user2.ID, + }).WithAgent(func(a []*proto.Agent) []*proto.Agent { + a[0].Apps = []*proto.App{ + { + Slug: "vscode", + }, + } + return a + }).Do() + + // Watch the workspace for changes. + watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID) + require.NoError(t, err) + var lastAppStatus codersdk.WorkspaceAppStatus + nextUpdate := func() codersdk.WorkspaceAppStatus { + for { + select { + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for status update") + case w, ok := <-watcher: + require.True(t, ok, "watch channel closed") + if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID { + t.Logf("Got status update: %s > %s", lastAppStatus.State, w.LatestAppStatus.State) + lastAppStatus = *w.LatestAppStatus + return lastAppStatus + } } } } - } - inv, _ := clitest.New(t, - "exp", "mcp", "server", - // We need the agent credentials, AI AgentAPI url, and a slug for reporting. - "--agent-url", client.URL.String(), - "--agent-token", r.AgentToken, - "--app-status-slug", "vscode", - "--ai-agentapi-url", aiAgentAPIURL, - "--allowed-tools=coder_report_task", - ) - inv = inv.WithContext(ctx) - - pty := ptytest.New(t) - inv.Stdin = pty.Input() - inv.Stdout = pty.Output() - stderr := ptytest.New(t) - inv.Stderr = stderr.Output() - - // Run the MCP server. - cmdDone := make(chan struct{}) - go func() { - defer close(cmdDone) - err := inv.Run() - assert.NoError(t, err) - }() - - // Initialize. - payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}` - pty.WriteLine(payload) - _ = pty.ReadLine(ctx) // ignore echo - _ = pty.ReadLine(ctx) // ignore init response - - sender := <-listening - - tests := []struct { - // event simulates an event from the screen watcher. - event *codersdk.ServerSentEvent - // state, summary, and uri simulate a tool call from the AI agent. - state codersdk.WorkspaceAppStatusState - summary string - uri string - expected *codersdk.WorkspaceAppStatus - }{ - // First the AI agent updates with a state change. - { - state: codersdk.WorkspaceAppStatusStateWorking, - summary: "doing work", - uri: "https://dev.coder.com", - expected: &codersdk.WorkspaceAppStatus{ - State: codersdk.WorkspaceAppStatusStateWorking, - Message: "doing work", - URI: "https://dev.coder.com", - }, - }, - // Terminal goes quiet but the AI agent forgot the update, and it is - // caught by the screen watcher. Message and URI are preserved. - { - event: makeStatusEvent(agentapi.StatusStable), - expected: &codersdk.WorkspaceAppStatus{ - State: codersdk.WorkspaceAppStatusStateIdle, - Message: "doing work", - URI: "https://dev.coder.com", - }, - }, - // A completed update at this point from the watcher should be discarded. - { - event: makeStatusEvent(agentapi.StatusStable), - }, - // Terminal becomes active again according to the screen watcher, but no - // new user message. This could be the AI agent being active again, but - // it could also be the user messing around. We will prefer not updating - // the status so the "working" update here should be skipped. - { - event: makeStatusEvent(agentapi.StatusRunning), - }, - // Agent messages are ignored. - { - event: makeMessageEvent(1, agentapi.RoleAgent), - }, - // AI agent reports that it failed and URI is blank. - { - state: codersdk.WorkspaceAppStatusStateFailure, - summary: "oops", - expected: &codersdk.WorkspaceAppStatus{ - State: codersdk.WorkspaceAppStatusStateFailure, - Message: "oops", - URI: "", - }, - }, - // The watcher reports the screen is active again... - { - event: makeStatusEvent(agentapi.StatusRunning), - }, - // ... but this time we have a new user message so we know there is AI - // agent activity. This time the "working" update will not be skipped. - { - event: makeMessageEvent(2, agentapi.RoleUser), - expected: &codersdk.WorkspaceAppStatus{ - State: codersdk.WorkspaceAppStatusStateWorking, - Message: "oops", - URI: "", - }, - }, - // Watcher reports stable again. - { - event: makeStatusEvent(agentapi.StatusStable), - expected: &codersdk.WorkspaceAppStatus{ - State: codersdk.WorkspaceAppStatusStateIdle, - Message: "oops", - URI: "", - }, - }, - } - for _, test := range tests { - if test.event != nil { - err := sender(*test.event) - require.NoError(t, err) - } else { - // Call the tool and ensure it works. - payload := fmt.Sprintf(`{"jsonrpc":"2.0","id":3,"method":"tools/call", "params": {"name": "coder_report_task", "arguments": {"state": %q, "summary": %q, "link": %q}}}`, test.state, test.summary, test.uri) - pty.WriteLine(payload) - _ = pty.ReadLine(ctx) // ignore echo - output := pty.ReadLine(ctx) - require.NotEmpty(t, output, "did not receive a response from coder_report_task") - // Ensure it is valid JSON. - _, err = json.Marshal(output) - require.NoError(t, err, "did not receive valid JSON from coder_report_task") + args := []string{ + "exp", "mcp", "server", + // We need the agent credentials, AI AgentAPI url (if not + // disabled), and a slug for reporting. + "--agent-url", client.URL.String(), + "--agent-token", r.AgentToken, + "--app-status-slug", "vscode", + "--allowed-tools=coder_report_task", } - if test.expected != nil { - got := nextUpdate() - require.Equal(t, got.State, test.expected.State) - require.Equal(t, got.Message, test.expected.Message) - require.Equal(t, got.URI, test.expected.URI) + + // Mock the AI AgentAPI server. + listening := make(chan func(sse codersdk.ServerSentEvent) error) + if !run.disableAgentAPI { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + send, closed, err := httpapi.ServerSentEventSender(w, r) + if err != nil { + httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error setting up server-sent events.", + Detail: err.Error(), + }) + return + } + // Send initial message. + send(*makeMessageEvent(0, agentapi.RoleAgent)) + listening <- send + <-closed + })) + t.Cleanup(srv.Close) + aiAgentAPIURL := srv.URL + args = append(args, "--ai-agentapi-url", aiAgentAPIURL) } - } - cancel() - <-cmdDone - }) + + inv, _ := clitest.New(t, args...) + inv = inv.WithContext(ctx) + + pty := ptytest.New(t) + inv.Stdin = pty.Input() + inv.Stdout = pty.Output() + stderr := ptytest.New(t) + inv.Stderr = stderr.Output() + + // Run the MCP server. + cmdDone := make(chan struct{}) + go func() { + defer close(cmdDone) + err := inv.Run() + assert.NoError(t, err) + }() + + // Initialize. + payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}` + pty.WriteLine(payload) + _ = pty.ReadLine(ctx) // ignore echo + _ = pty.ReadLine(ctx) // ignore init response + + var sender func(sse codersdk.ServerSentEvent) error + if !run.disableAgentAPI { + sender = <-listening + } + + for _, test := range run.tests { + if test.event != nil { + err := sender(*test.event) + require.NoError(t, err) + } else { + // Call the tool and ensure it works. + payload := fmt.Sprintf(`{"jsonrpc":"2.0","id":3,"method":"tools/call", "params": {"name": "coder_report_task", "arguments": {"state": %q, "summary": %q, "link": %q}}}`, test.state, test.summary, test.uri) + pty.WriteLine(payload) + _ = pty.ReadLine(ctx) // ignore echo + output := pty.ReadLine(ctx) + require.NotEmpty(t, output, "did not receive a response from coder_report_task") + // Ensure it is valid JSON. + _, err = json.Marshal(output) + require.NoError(t, err, "did not receive valid JSON from coder_report_task") + } + if test.expected != nil { + got := nextUpdate() + require.Equal(t, got.State, test.expected.State) + require.Equal(t, got.Message, test.expected.Message) + require.Equal(t, got.URI, test.expected.URI) + } + } + cancel() + <-cmdDone + }) + } }