mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix: allow agent-reported final states, add SSE reconnection (#22286)
When AgentAPI is configured, `WithTaskReporter` unconditionally overrides all self-reported states to `working`. The intent was to distrust the agent's `idle` and rely on the screen watcher, but the override also blocks `failure` and `complete`, which only the agent can produce (the screen watcher only knows `running`/`stable`). Tasks get stuck as `working` or `null` forever. Now only `idle` is overridden to `working`; `failure`, `complete`, and `working` pass through as-is. Also: - Remove misplaced unconditional `"Failed to watch screen events"` log that fired on every startup - Add SSE reconnection with exponential backoff (1s-30s) in `startWatcher` so it recovers from dropped connections instead of dying silently - Add `complete` to the `coder_report_task` tool enum, which the `coder/claude-code` registry module already instructs agents to use but was missing from the schema Refs coder/internal#1350
This commit is contained in:
committed by
GitHub
parent
6336fee3a7
commit
947b390c5a
+50
-45
@@ -10,6 +10,7 @@ import (
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
"github.com/mark3labs/mcp-go/server"
|
||||
@@ -23,6 +24,7 @@ import (
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/codersdk/agentsdk"
|
||||
"github.com/coder/coder/v2/codersdk/toolsdk"
|
||||
"github.com/coder/retry"
|
||||
"github.com/coder/serpent"
|
||||
)
|
||||
|
||||
@@ -539,7 +541,6 @@ func (r *RootCmd) mcpServer() *serpent.Command {
|
||||
defer cancel()
|
||||
defer srv.queue.Close()
|
||||
|
||||
cliui.Infof(inv.Stderr, "Failed to watch screen events")
|
||||
// Start the reporter, watcher, and server. These are all tied to the
|
||||
// lifetime of the MCP server, which is itself tied to the lifetime of the
|
||||
// AI agent.
|
||||
@@ -613,48 +614,51 @@ func (s *mcpServer) startReporter(ctx context.Context, inv *serpent.Invocation)
|
||||
}
|
||||
|
||||
func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) {
|
||||
eventsCh, errCh, err := s.aiAgentAPIClient.SubscribeEvents(ctx)
|
||||
if err != nil {
|
||||
cliui.Warnf(inv.Stderr, "Failed to watch screen events: %s", err)
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case event := <-eventsCh:
|
||||
switch ev := event.(type) {
|
||||
case agentapi.EventStatusChange:
|
||||
// If the screen is stable, report idle.
|
||||
state := codersdk.WorkspaceAppStatusStateWorking
|
||||
if ev.Status == agentapi.StatusStable {
|
||||
state = codersdk.WorkspaceAppStatusStateIdle
|
||||
}
|
||||
err := s.queue.Push(taskReport{
|
||||
state: state,
|
||||
})
|
||||
if err != nil {
|
||||
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
|
||||
for retrier := retry.New(time.Second, 30*time.Second); retrier.Wait(ctx); {
|
||||
eventsCh, errCh, err := s.aiAgentAPIClient.SubscribeEvents(ctx)
|
||||
if err == nil {
|
||||
retrier.Reset()
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
case agentapi.EventMessageUpdate:
|
||||
if ev.Role == agentapi.RoleUser {
|
||||
err := s.queue.Push(taskReport{
|
||||
messageID: &ev.Id,
|
||||
state: codersdk.WorkspaceAppStatusStateWorking,
|
||||
})
|
||||
if err != nil {
|
||||
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
|
||||
return
|
||||
case event := <-eventsCh:
|
||||
switch ev := event.(type) {
|
||||
case agentapi.EventStatusChange:
|
||||
state := codersdk.WorkspaceAppStatusStateWorking
|
||||
if ev.Status == agentapi.StatusStable {
|
||||
state = codersdk.WorkspaceAppStatusStateIdle
|
||||
}
|
||||
err := s.queue.Push(taskReport{
|
||||
state: state,
|
||||
})
|
||||
if err != nil {
|
||||
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
|
||||
return
|
||||
}
|
||||
case agentapi.EventMessageUpdate:
|
||||
if ev.Role == agentapi.RoleUser {
|
||||
err := s.queue.Push(taskReport{
|
||||
messageID: &ev.Id,
|
||||
state: codersdk.WorkspaceAppStatusStateWorking,
|
||||
})
|
||||
if err != nil {
|
||||
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
case err := <-errCh:
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
cliui.Warnf(inv.Stderr, "Received error from screen event watcher: %s", err)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
case err := <-errCh:
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
cliui.Warnf(inv.Stderr, "Received error from screen event watcher: %s", err)
|
||||
}
|
||||
return
|
||||
} else {
|
||||
cliui.Warnf(inv.Stderr, "Failed to watch screen events: %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -692,13 +696,14 @@ func (s *mcpServer) startServer(ctx context.Context, inv *serpent.Invocation, in
|
||||
// Add tool dependencies.
|
||||
toolOpts := []func(*toolsdk.Deps){
|
||||
toolsdk.WithTaskReporter(func(args toolsdk.ReportTaskArgs) error {
|
||||
// The agent does not reliably report its status correctly. If AgentAPI
|
||||
// is enabled, we will always set the status to "working" when we get an
|
||||
// MCP message, and rely on the screen watcher to eventually catch the
|
||||
// idle state.
|
||||
state := codersdk.WorkspaceAppStatusStateWorking
|
||||
if s.aiAgentAPIClient == nil {
|
||||
state = codersdk.WorkspaceAppStatusState(args.State)
|
||||
state := codersdk.WorkspaceAppStatusState(args.State)
|
||||
// The agent does not reliably report idle, so when AgentAPI is
|
||||
// enabled we override idle to working and let the screen watcher
|
||||
// detect the real idle via StatusStable. Final states (failure,
|
||||
// complete) are trusted from the agent since the screen watcher
|
||||
// cannot produce them.
|
||||
if s.aiAgentAPIClient != nil && state == codersdk.WorkspaceAppStatusStateIdle {
|
||||
state = codersdk.WorkspaceAppStatusStateWorking
|
||||
}
|
||||
return s.queue.Push(taskReport{
|
||||
link: args.Link,
|
||||
|
||||
+185
-1
@@ -921,7 +921,7 @@ func TestExpMcpReporter(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// We ignore the state from the agent and assume "working".
|
||||
// We override idle from the agent to working, but trust final states.
|
||||
{
|
||||
name: "IgnoreAgentState",
|
||||
// AI agent reports that it is finished but the summary says it is doing
|
||||
@@ -953,6 +953,46 @@ func TestExpMcpReporter(t *testing.T) {
|
||||
Message: "finished",
|
||||
},
|
||||
},
|
||||
// Agent reports failure; trusted even with AgentAPI enabled.
|
||||
{
|
||||
state: codersdk.WorkspaceAppStatusStateFailure,
|
||||
summary: "something broke",
|
||||
expected: &codersdk.WorkspaceAppStatus{
|
||||
State: codersdk.WorkspaceAppStatusStateFailure,
|
||||
Message: "something broke",
|
||||
},
|
||||
},
|
||||
// After failure, watcher reports stable -> idle.
|
||||
{
|
||||
event: makeStatusEvent(agentapi.StatusStable),
|
||||
expected: &codersdk.WorkspaceAppStatus{
|
||||
State: codersdk.WorkspaceAppStatusStateIdle,
|
||||
Message: "something broke",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Final states pass through with AgentAPI enabled.
|
||||
{
|
||||
name: "AllowFinalStates",
|
||||
tests: []test{
|
||||
{
|
||||
state: codersdk.WorkspaceAppStatusStateWorking,
|
||||
summary: "doing work",
|
||||
expected: &codersdk.WorkspaceAppStatus{
|
||||
State: codersdk.WorkspaceAppStatusStateWorking,
|
||||
Message: "doing work",
|
||||
},
|
||||
},
|
||||
// Agent reports complete; not overridden.
|
||||
{
|
||||
state: codersdk.WorkspaceAppStatusStateComplete,
|
||||
summary: "all done",
|
||||
expected: &codersdk.WorkspaceAppStatus{
|
||||
State: codersdk.WorkspaceAppStatusStateComplete,
|
||||
Message: "all done",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// When AgentAPI is not being used, we accept agent state updates as-is.
|
||||
@@ -1110,4 +1150,148 @@ func TestExpMcpReporter(t *testing.T) {
|
||||
<-cmdDone
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("Reconnect", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create a test deployment and workspace.
|
||||
client, db := coderdtest.NewWithDatabase(t, nil)
|
||||
user := coderdtest.CreateFirstUser(t, client)
|
||||
client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID)
|
||||
|
||||
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
OrganizationID: user.OrganizationID,
|
||||
OwnerID: user2.ID,
|
||||
}).WithAgent(func(a []*proto.Agent) []*proto.Agent {
|
||||
a[0].Apps = []*proto.App{
|
||||
{
|
||||
Slug: "vscode",
|
||||
},
|
||||
}
|
||||
return a
|
||||
}).Do()
|
||||
|
||||
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong))
|
||||
|
||||
// Watch the workspace for changes.
|
||||
watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID)
|
||||
require.NoError(t, err)
|
||||
var lastAppStatus codersdk.WorkspaceAppStatus
|
||||
nextUpdate := func() codersdk.WorkspaceAppStatus {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
require.FailNow(t, "timed out waiting for status update")
|
||||
case w, ok := <-watcher:
|
||||
require.True(t, ok, "watch channel closed")
|
||||
if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID {
|
||||
t.Logf("Got status update: %s > %s", lastAppStatus.State, w.LatestAppStatus.State)
|
||||
lastAppStatus = *w.LatestAppStatus
|
||||
return lastAppStatus
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mock AI AgentAPI server that supports disconnect/reconnect.
|
||||
disconnect := make(chan struct{})
|
||||
listening := make(chan func(sse codersdk.ServerSentEvent) error)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Create a cancelable context so we can stop the SSE sender
|
||||
// goroutine on disconnect without waiting for the HTTP
|
||||
// serve loop to cancel r.Context().
|
||||
sseCtx, sseCancel := context.WithCancel(r.Context())
|
||||
defer sseCancel()
|
||||
r = r.WithContext(sseCtx)
|
||||
|
||||
send, closed, err := httpapi.ServerSentEventSender(w, r)
|
||||
if err != nil {
|
||||
httpapi.Write(sseCtx, w, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Internal error setting up server-sent events.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
// Send initial message so the watcher knows the agent is active.
|
||||
send(*makeMessageEvent(0, agentapi.RoleAgent))
|
||||
select {
|
||||
case listening <- send:
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-closed:
|
||||
case <-disconnect:
|
||||
sseCancel()
|
||||
<-closed
|
||||
}
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
inv, _ := clitest.New(t,
|
||||
"exp", "mcp", "server",
|
||||
"--agent-url", client.URL.String(),
|
||||
"--agent-token", r.AgentToken,
|
||||
"--app-status-slug", "vscode",
|
||||
"--allowed-tools=coder_report_task",
|
||||
"--ai-agentapi-url", srv.URL,
|
||||
)
|
||||
inv = inv.WithContext(ctx)
|
||||
|
||||
pty := ptytest.New(t)
|
||||
inv.Stdin = pty.Input()
|
||||
inv.Stdout = pty.Output()
|
||||
stderr := ptytest.New(t)
|
||||
inv.Stderr = stderr.Output()
|
||||
|
||||
// Run the MCP server.
|
||||
clitest.Start(t, inv)
|
||||
|
||||
// Initialize.
|
||||
payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}`
|
||||
pty.WriteLine(payload)
|
||||
_ = pty.ReadLine(ctx) // ignore echo
|
||||
_ = pty.ReadLine(ctx) // ignore init response
|
||||
|
||||
// Get first sender from the initial SSE connection.
|
||||
sender := testutil.RequireReceive(ctx, t, listening)
|
||||
|
||||
// Self-report a working status via tool call.
|
||||
toolPayload := `{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"doing work","link":""}}}`
|
||||
pty.WriteLine(toolPayload)
|
||||
_ = pty.ReadLine(ctx) // ignore echo
|
||||
_ = pty.ReadLine(ctx) // ignore response
|
||||
got := nextUpdate()
|
||||
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
||||
require.Equal(t, "doing work", got.Message)
|
||||
|
||||
// Watcher sends stable, verify idle is reported.
|
||||
err = sender(*makeStatusEvent(agentapi.StatusStable))
|
||||
require.NoError(t, err)
|
||||
got = nextUpdate()
|
||||
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
|
||||
|
||||
// Disconnect the SSE connection by signaling the handler to return.
|
||||
testutil.RequireSend(ctx, t, disconnect, struct{}{})
|
||||
|
||||
// Wait for the watcher to reconnect and get the new sender.
|
||||
sender = testutil.RequireReceive(ctx, t, listening)
|
||||
|
||||
// After reconnect, self-report a working status again.
|
||||
toolPayload = `{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"reconnected","link":""}}}`
|
||||
pty.WriteLine(toolPayload)
|
||||
_ = pty.ReadLine(ctx) // ignore echo
|
||||
_ = pty.ReadLine(ctx) // ignore response
|
||||
got = nextUpdate()
|
||||
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
||||
require.Equal(t, "reconnected", got.Message)
|
||||
|
||||
// Verify the watcher still processes events after reconnect.
|
||||
err = sender(*makeStatusEvent(agentapi.StatusStable))
|
||||
require.NoError(t, err)
|
||||
got = nextUpdate()
|
||||
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
|
||||
|
||||
cancel()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -265,7 +265,7 @@ Bad Tasks
|
||||
Use the "state" field to indicate your progress. Periodically report
|
||||
progress with state "working" to keep the user updated. It is not possible to send too many updates!
|
||||
|
||||
ONLY report an "idle" or "failure" state if you have FULLY completed the task.
|
||||
ONLY report a "complete", "idle", or "failure" state if you have FULLY completed the task.
|
||||
`,
|
||||
Schema: aisdk.Schema{
|
||||
Properties: map[string]any{
|
||||
@@ -279,9 +279,10 @@ ONLY report an "idle" or "failure" state if you have FULLY completed the task.
|
||||
},
|
||||
"state": map[string]any{
|
||||
"type": "string",
|
||||
"description": "The state of your task. This can be one of the following: working, idle, or failure. Select the state that best represents your current progress.",
|
||||
"description": "The state of your task. This can be one of the following: working, complete, idle, or failure. Select the state that best represents your current progress.",
|
||||
"enum": []string{
|
||||
string(codersdk.WorkspaceAppStatusStateWorking),
|
||||
string(codersdk.WorkspaceAppStatusStateComplete),
|
||||
string(codersdk.WorkspaceAppStatusStateIdle),
|
||||
string(codersdk.WorkspaceAppStatusStateFailure),
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user