diff --git a/agent/agentproc/api.go b/agent/agentproc/api.go index f784d0e898..bf974b6307 100644 --- a/agent/agentproc/api.go +++ b/agent/agentproc/api.go @@ -1,11 +1,13 @@ package agentproc import ( + "context" "encoding/json" "errors" "fmt" "net/http" "sort" + "time" "github.com/go-chi/chi/v5" "github.com/google/uuid" @@ -18,6 +20,13 @@ import ( "github.com/coder/coder/v2/codersdk/workspacesdk" ) +const ( + // maxWaitDuration is the maximum time a blocking + // process output request can wait, regardless of + // what the client requests. + maxWaitDuration = 5 * time.Minute +) + // API exposes process-related operations through the agent. type API struct { logger slog.Logger @@ -163,6 +172,30 @@ func (api *API) handleProcessOutput(rw http.ResponseWriter, r *http.Request) { } } + // Check for blocking mode via query params. + waitStr := r.URL.Query().Get("wait") + wantWait := waitStr == "true" + + if wantWait { + // Extend the write deadline so the HTTP server's + // WriteTimeout does not kill the connection while + // we block. + rc := http.NewResponseController(rw) + if err := rc.SetWriteDeadline(time.Now().Add(maxWaitDuration)); err != nil { + api.logger.Error(ctx, "extend write deadline for blocking process output", + slog.Error(err), + ) + } + + // Cap the wait at maxWaitDuration regardless of + // client-supplied timeout. + waitCtx, waitCancel := context.WithTimeout(ctx, maxWaitDuration) + defer waitCancel() + + _ = proc.waitForOutput(waitCtx) + // Fall through to read snapshot below. + } + output, truncated := proc.output() info := proc.info() diff --git a/agent/agentproc/api_test.go b/agent/agentproc/api_test.go index c7157c1730..eddbe2d6f9 100644 --- a/agent/agentproc/api_test.go +++ b/agent/agentproc/api_test.go @@ -10,6 +10,7 @@ import ( "os" "runtime" "strings" + "sync" "testing" "time" @@ -783,6 +784,133 @@ func TestProcessOutput(t *testing.T) { w2 := getOutput(t, handler, id) require.Equal(t, http.StatusOK, w2.Code) }) + + t.Run("WaitForExit", func(t *testing.T) { + t.Parallel() + + handler := newTestAPI(t) + + id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{ + Command: "echo hello-wait && sleep 0.1", + }) + + w := getOutputWithWait(t, handler, id) + require.Equal(t, http.StatusOK, w.Code) + + var resp workspacesdk.ProcessOutputResponse + err := json.NewDecoder(w.Body).Decode(&resp) + require.NoError(t, err) + require.False(t, resp.Running) + require.NotNil(t, resp.ExitCode) + require.Equal(t, 0, *resp.ExitCode) + require.Contains(t, resp.Output, "hello-wait") + }) + + t.Run("WaitAlreadyExited", func(t *testing.T) { + t.Parallel() + + handler := newTestAPI(t) + + id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{ + Command: "echo done", + }) + + waitForExit(t, handler, id) + + w := getOutputWithWait(t, handler, id) + require.Equal(t, http.StatusOK, w.Code) + + var resp workspacesdk.ProcessOutputResponse + err := json.NewDecoder(w.Body).Decode(&resp) + require.NoError(t, err) + require.False(t, resp.Running) + require.Contains(t, resp.Output, "done") + }) + + t.Run("WaitTimeout", func(t *testing.T) { + t.Parallel() + + handler := newTestAPI(t) + + id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{ + Command: "sleep 300", + Background: true, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.IntervalMedium) + defer cancel() + + w := getOutputWithWaitCtx(ctx, t, handler, id) + require.Equal(t, http.StatusOK, w.Code) + + var resp workspacesdk.ProcessOutputResponse + err := json.NewDecoder(w.Body).Decode(&resp) + require.NoError(t, err) + require.True(t, resp.Running) + + // Kill and wait for the process so cleanup does + // not hang. + postSignal( + t, handler, id, + workspacesdk.SignalProcessRequest{Signal: "kill"}, + ) + waitForExit(t, handler, id) + }) + + t.Run("ConcurrentWaiters", func(t *testing.T) { + t.Parallel() + + handler := newTestAPI(t) + + id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{ + Command: "sleep 300", + Background: true, + }) + + var ( + wg sync.WaitGroup + resps [2]workspacesdk.ProcessOutputResponse + codes [2]int + ) + for i := range 2 { + wg.Add(1) + go func() { + defer wg.Done() + w := getOutputWithWait(t, handler, id) + codes[i] = w.Code + _ = json.NewDecoder(w.Body).Decode(&resps[i]) + }() + } + + // Signal the process to exit so both waiters unblock. + postSignal( + t, handler, id, + workspacesdk.SignalProcessRequest{Signal: "kill"}, + ) + + wg.Wait() + + for i := range 2 { + require.Equal(t, http.StatusOK, codes[i], "waiter %d", i) + require.False(t, resps[i].Running, "waiter %d", i) + } + }) +} + +func getOutputWithWait(t *testing.T, handler http.Handler, id string) *httptest.ResponseRecorder { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + return getOutputWithWaitCtx(ctx, t, handler, id) +} + +func getOutputWithWaitCtx(ctx context.Context, t *testing.T, handler http.Handler, id string) *httptest.ResponseRecorder { + t.Helper() + path := fmt.Sprintf("/%s/output?wait=true", id) + req := httptest.NewRequestWithContext(ctx, http.MethodGet, path, nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + return w } func TestSignalProcess(t *testing.T) { diff --git a/agent/agentproc/headtail.go b/agent/agentproc/headtail.go index 34c07101ae..b1e65e369b 100644 --- a/agent/agentproc/headtail.go +++ b/agent/agentproc/headtail.go @@ -39,11 +39,13 @@ const ( // how much output is written. type HeadTailBuffer struct { mu sync.Mutex + cond *sync.Cond head []byte tail []byte tailPos int tailFull bool headFull bool + closed bool totalBytes int maxHead int maxTail int @@ -52,20 +54,24 @@ type HeadTailBuffer struct { // NewHeadTailBuffer creates a new HeadTailBuffer with the // default head and tail sizes. func NewHeadTailBuffer() *HeadTailBuffer { - return &HeadTailBuffer{ + b := &HeadTailBuffer{ maxHead: MaxHeadBytes, maxTail: MaxTailBytes, } + b.cond = sync.NewCond(&b.mu) + return b } // NewHeadTailBufferSized creates a HeadTailBuffer with custom // head and tail sizes. This is useful for testing truncation // logic with smaller buffers. func NewHeadTailBufferSized(maxHead, maxTail int) *HeadTailBuffer { - return &HeadTailBuffer{ + b := &HeadTailBuffer{ maxHead: maxHead, maxTail: maxTail, } + b.cond = sync.NewCond(&b.mu) + return b } // Write implements io.Writer. It is safe for concurrent use. @@ -296,6 +302,15 @@ func truncateLines(s string) string { return b.String() } +// Close marks the buffer as closed and wakes any waiters. +// This is called when the process exits. +func (b *HeadTailBuffer) Close() { + b.mu.Lock() + defer b.mu.Unlock() + b.closed = true + b.cond.Broadcast() +} + // Reset clears the buffer, discarding all data. func (b *HeadTailBuffer) Reset() { b.mu.Lock() @@ -305,5 +320,7 @@ func (b *HeadTailBuffer) Reset() { b.tailPos = 0 b.tailFull = false b.headFull = false + b.closed = false b.totalBytes = 0 + b.cond.Broadcast() } diff --git a/agent/agentproc/process.go b/agent/agentproc/process.go index ed1279409c..3a457387dc 100644 --- a/agent/agentproc/process.go +++ b/agent/agentproc/process.go @@ -208,6 +208,9 @@ func (m *manager) start(req workspacesdk.StartProcessRequest, chatID string) (*p proc.exitCode = &code proc.mu.Unlock() + // Wake any waiters blocked on new output or + // process exit before closing the done channel. + proc.buf.Close() close(proc.done) }() @@ -320,6 +323,36 @@ func (m *manager) Close() error { return nil } +// waitForOutput blocks until the buffer is closed (process +// exited) or the context is canceled. Returns nil when the +// buffer closed, ctx.Err() when the context expired. +func (p *process) waitForOutput(ctx context.Context) error { + p.buf.cond.L.Lock() + defer p.buf.cond.L.Unlock() + + nevermind := make(chan struct{}) + defer close(nevermind) + go func() { + select { + case <-ctx.Done(): + // Acquire the lock before broadcasting to + // guarantee the waiter has entered cond.Wait() + // (which atomically releases the lock). + // Without this, a Broadcast between the loop + // predicate check and cond.Wait() is lost. + p.buf.cond.L.Lock() + defer p.buf.cond.L.Unlock() + p.buf.cond.Broadcast() + case <-nevermind: + } + }() + + for ctx.Err() == nil && !p.buf.closed { + p.buf.cond.Wait() + } + return ctx.Err() +} + // resolveWorkDir returns the directory a process should start in. // Priority: explicit request dir > agent configured dir > $HOME. // Falls through when a candidate is empty or does not exist on diff --git a/coderd/chatd/chatd_test.go b/coderd/chatd/chatd_test.go index 02f6b4b408..9fd13005fb 100644 --- a/coderd/chatd/chatd_test.go +++ b/coderd/chatd/chatd_test.go @@ -1590,7 +1590,7 @@ func TestPersistToolResultWithBinaryData(t *testing.T) { }). Times(1) mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-binary"). + ProcessOutput(gomock.Any(), "proc-binary", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{ Output: string(binaryOutput), Running: false, diff --git a/coderd/chatd/chattool/execute.go b/coderd/chatd/chattool/execute.go index d7b2203ff4..4b64df1e89 100644 --- a/coderd/chatd/chattool/execute.go +++ b/coderd/chatd/chattool/execute.go @@ -3,14 +3,12 @@ package chattool import ( "context" "encoding/json" - "errors" "fmt" "regexp" "strings" "time" "charm.land/fantasy" - "golang.org/x/xerrors" "github.com/coder/coder/v2/codersdk/workspacesdk" ) @@ -23,9 +21,10 @@ const ( // maxOutputToModel is the maximum output sent to the LLM. maxOutputToModel = 32 << 10 // 32KB - // pollInterval is how often we check for process completion - // in foreground mode. - pollInterval = 200 * time.Millisecond + // snapshotTimeout is how long a non-blocking fallback + // request is allowed to take when retrieving a process + // output snapshot after a blocking wait times out. + snapshotTimeout = 30 * time.Second ) // nonInteractiveEnvVars are set on every process to prevent @@ -89,7 +88,7 @@ type ExecuteArgs struct { func Execute(options ExecuteOptions) fantasy.AgentTool { return fantasy.NewAgentTool( "execute", - "Execute a shell command in the workspace. Use run_in_background=true for long-running processes (dev servers, file watchers, builds). Never use shell '&' for backgrounding.", + "Execute a shell command in the workspace. Use run_in_background=true for long-running processes (dev servers, file watchers, builds). Never use shell '&' for backgrounding. If the command times out, the response includes a background_process_id so you can retrieve output later with process_output.", func(ctx context.Context, args ExecuteArgs, _ fantasy.ToolCall) (fantasy.ToolResponse, error) { if options.GetWorkspaceConn == nil { return fantasy.NewTextErrorResponse("workspace connection resolver is not configured"), nil @@ -173,7 +172,7 @@ func executeBackground( return fantasy.NewTextResponse(string(data)) } -// executeForeground starts a process and polls for its +// executeForeground starts a process and waits for its // completion, enforcing the configured timeout. func executeForeground( ctx context.Context, @@ -212,7 +211,7 @@ func executeForeground( return errorResult(fmt.Sprintf("start process: %v", err)) } - result := pollProcess(cmdCtx, conn, resp.ID, timeout) + result := waitForProcess(cmdCtx, conn, resp.ID, timeout) result.WallDurationMs = time.Since(start).Milliseconds() // Add an advisory note for file-dump commands. @@ -237,62 +236,84 @@ func truncateOutput(output string) string { return output } -// pollProcess polls for process output until the process exits -// or the context times out. -func pollProcess( +// waitForProcess waits for process completion using the +// blocking process output API instead of polling. +func waitForProcess( ctx context.Context, conn workspacesdk.AgentConn, processID string, timeout time.Duration, ) ExecuteResult { - ticker := time.NewTicker(pollInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - // Timeout — get whatever output we have. Use a - // fresh context since cmdCtx is already canceled. + // Block until the process exits or the context is + // canceled. + resp, err := conn.ProcessOutput(ctx, processID, &workspacesdk.ProcessOutputOptions{ + Wait: true, + }) + if err != nil { + if ctx.Err() != nil { + // Timeout: fetch final snapshot with a fresh + // context. The blocking request was canceled + // so the response body was lost. bgCtx, bgCancel := context.WithTimeout( context.Background(), - 5*time.Second, + snapshotTimeout, ) - outputResp, outputErr := conn.ProcessOutput(bgCtx, processID) - bgCancel() - output := truncateOutput(outputResp.Output) - timeoutErr := xerrors.Errorf("command timed out after %s", timeout) - if outputErr != nil { - timeoutErr = errors.Join(timeoutErr, xerrors.Errorf("failed to get output: %w", outputErr)) - } - return ExecuteResult{ - Success: false, - Output: output, - ExitCode: -1, - Error: timeoutErr.Error(), - Truncated: outputResp.Truncated, - } - case <-ticker.C: - outputResp, err := conn.ProcessOutput(ctx, processID) + defer bgCancel() + resp, err = conn.ProcessOutput(bgCtx, processID, nil) if err != nil { return ExecuteResult{ - Success: false, - Error: fmt.Sprintf("get process output: %v", err), + Success: false, + ExitCode: -1, + Error: fmt.Sprintf("command timed out after %s; failed to get output: %v", timeout, err), + BackgroundProcessID: processID, } } - if !outputResp.Running { - exitCode := 0 - if outputResp.ExitCode != nil { - exitCode = *outputResp.ExitCode - } - output := truncateOutput(outputResp.Output) - return ExecuteResult{ - Success: exitCode == 0, - Output: output, - ExitCode: exitCode, - Truncated: outputResp.Truncated, - } + output := truncateOutput(resp.Output) + return ExecuteResult{ + Success: false, + Output: output, + ExitCode: -1, + Error: fmt.Sprintf("command timed out after %s", timeout), + Truncated: resp.Truncated, + BackgroundProcessID: processID, } } + return ExecuteResult{ + Success: false, + Error: fmt.Sprintf("get process output: %v", err), + } + } + + // The server-side wait may return before the + // process exits if maxWaitDuration is shorter than + // the client's timeout. Retry if our context still + // has time left. + if resp.Running { + if ctx.Err() == nil { + // Still within the caller's timeout, retry. + return waitForProcess(ctx, conn, processID, timeout) + } + output := truncateOutput(resp.Output) + return ExecuteResult{ + Success: false, + Output: output, + ExitCode: -1, + Error: fmt.Sprintf("command timed out after %s", timeout), + Truncated: resp.Truncated, + BackgroundProcessID: processID, + } + } + + exitCode := 0 + if resp.ExitCode != nil { + exitCode = *resp.ExitCode + } + output := truncateOutput(resp.Output) + return ExecuteResult{ + Success: exitCode == 0, + Output: output, + ExitCode: exitCode, + Truncated: resp.Truncated, } } @@ -322,10 +343,19 @@ func detectFileDump(command string) string { return "" } +const ( + // defaultProcessOutputTimeout is the default time the + // process_output tool blocks waiting for new output or + // process exit before returning. This avoids polling + // loops that waste tokens and HTTP round-trips. + defaultProcessOutputTimeout = 10 * time.Second +) + // ProcessOutputArgs are the parameters accepted by the // process_output tool. type ProcessOutputArgs struct { - ProcessID string `json:"process_id"` + ProcessID string `json:"process_id"` + WaitTimeout *string `json:"wait_timeout,omitempty" description:"Override the default 10s block duration. The call blocks until the process exits or this timeout is reached. Set to '0s' for an immediate snapshot without waiting."` } // ProcessOutput returns an AgentTool that retrieves the output @@ -335,9 +365,13 @@ func ProcessOutput(options ProcessToolOptions) fantasy.AgentTool { "process_output", "Retrieve output from a background process. "+ "Use the process_id returned by execute with "+ - "run_in_background=true. Returns the current output, "+ - "whether the process is still running, and the exit "+ - "code if it has finished.", + "run_in_background=true or from a timed-out "+ + "execute's background_process_id. Blocks up to "+ + "10s for the process to exit, then returns the "+ + "output and exit_code. If still running after "+ + "the timeout, returns the output so far. Use "+ + "wait_timeout to override the default 10s wait "+ + "(e.g. '30s', or '0s' for an immediate snapshot).", func(ctx context.Context, args ProcessOutputArgs, _ fantasy.ToolCall) (fantasy.ToolResponse, error) { if options.GetWorkspaceConn == nil { return fantasy.NewTextErrorResponse("workspace connection resolver is not configured"), nil @@ -349,9 +383,42 @@ func ProcessOutput(options ProcessToolOptions) fantasy.AgentTool { if err != nil { return fantasy.NewTextErrorResponse(err.Error()), nil } - resp, err := conn.ProcessOutput(ctx, args.ProcessID) + + timeout := defaultProcessOutputTimeout + if args.WaitTimeout != nil { + parsed, err := time.ParseDuration(*args.WaitTimeout) + if err != nil { + return fantasy.NewTextErrorResponse( + fmt.Sprintf("invalid wait_timeout %q: %v", *args.WaitTimeout, err), + ), nil + } + timeout = parsed + } + var opts *workspacesdk.ProcessOutputOptions + // Save parent context before applying timeout. + parentCtx := ctx + if timeout > 0 { + opts = &workspacesdk.ProcessOutputOptions{ + Wait: true, + } + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + resp, err := conn.ProcessOutput(ctx, args.ProcessID, opts) if err != nil { - return errorResult(fmt.Sprintf("get process output: %v", err)), nil + // If our wait timed out but the parent is still alive, + // fetch a non-blocking snapshot. + if ctx.Err() == nil || parentCtx.Err() != nil { + return errorResult(fmt.Sprintf("get process output: %v", err)), nil + } + bgCtx, bgCancel := context.WithTimeout(parentCtx, snapshotTimeout) + defer bgCancel() + resp, err = conn.ProcessOutput(bgCtx, args.ProcessID, nil) + if err != nil { + return errorResult(fmt.Sprintf("get process output: %v", err)), nil + } + // Fall through to normal response handling below. } output := truncateOutput(resp.Output) exitCode := 0 @@ -365,7 +432,7 @@ func ProcessOutput(options ProcessToolOptions) fantasy.AgentTool { Truncated: resp.Truncated, } if resp.Running { - // Process is still running — success is not + // Process is still running, success is not // yet determined. result.Success = true result.Note = "process is still running" diff --git a/coderd/chatd/chattool/execute_internal_test.go b/coderd/chatd/chattool/execute_internal_test.go index e9100f3051..dd3ee84940 100644 --- a/coderd/chatd/chattool/execute_internal_test.go +++ b/coderd/chatd/chattool/execute_internal_test.go @@ -74,7 +74,7 @@ func runForegroundWithOutput(t *testing.T, output string) ExecuteResult { Return(workspacesdk.StartProcessResponse{ID: "proc-1"}, nil) exitCode := 0 mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-1"). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{ Running: false, ExitCode: &exitCode, diff --git a/coderd/chatd/chattool/execute_test.go b/coderd/chatd/chattool/execute_test.go index c89086c219..3d22cee96f 100644 --- a/coderd/chatd/chattool/execute_test.go +++ b/coderd/chatd/chattool/execute_test.go @@ -121,7 +121,7 @@ func TestExecuteTool(t *testing.T) { // For foreground cases, ProcessOutput is polled. exitCode := 0 mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-1"). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{ Running: false, ExitCode: &exitCode, @@ -177,7 +177,7 @@ func TestExecuteTool(t *testing.T) { }) exitCode := 0 mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-1"). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{ Running: false, ExitCode: &exitCode, @@ -213,7 +213,7 @@ func TestExecuteTool(t *testing.T) { Return(workspacesdk.StartProcessResponse{ID: "proc-1"}, nil) exitCode := 42 mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-1"). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{ Running: false, ExitCode: &exitCode, @@ -274,23 +274,27 @@ func TestExecuteTool(t *testing.T) { StartProcess(gomock.Any(), gomock.Any()). Return(workspacesdk.StartProcessResponse{ID: "proc-1"}, nil) - // ProcessOutput always returns running. The poll loop - // and the timeout-branch recovery call both hit this. + // First call (blocking wait) returns context error + // because the 50ms timeout expires. mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-1"). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). + DoAndReturn(func(ctx context.Context, _ string, _ *workspacesdk.ProcessOutputOptions) (workspacesdk.ProcessOutputResponse, error) { + <-ctx.Done() + return workspacesdk.ProcessOutputResponse{}, ctx.Err() + }) + // Second call (snapshot fallback) returns partial output. + mockConn.EXPECT(). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{ Running: true, Output: "partial output", - }, nil). - AnyTimes() - + }, nil) tool := newExecuteTool(t, mockConn) ctx := testutil.Context(t, testutil.WaitMedium) resp, err := tool.Run(ctx, fantasy.ToolCall{ ID: "call-1", Name: "execute", - // 50ms timeout expires before the 200ms poll interval, - // so the context-done branch fires first. + // 50ms timeout expires during the blocking wait. Input: `{"command":"sleep 999","timeout":"50ms"}`, }) require.NoError(t, err) @@ -340,7 +344,7 @@ func TestExecuteTool(t *testing.T) { StartProcess(gomock.Any(), gomock.Any()). Return(workspacesdk.StartProcessResponse{ID: "proc-1"}, nil) mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-1"). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{}, xerrors.New("agent disconnected")) tool := newExecuteTool(t, mockConn) @@ -440,7 +444,7 @@ func TestDetectFileDump(t *testing.T) { Return(workspacesdk.StartProcessResponse{ID: "proc-1"}, nil) exitCode := 0 mockConn.EXPECT(). - ProcessOutput(gomock.Any(), "proc-1"). + ProcessOutput(gomock.Any(), "proc-1", gomock.Any()). Return(workspacesdk.ProcessOutputResponse{ Running: false, ExitCode: &exitCode, diff --git a/codersdk/workspacesdk/agentconn.go b/codersdk/workspacesdk/agentconn.go index c12d528b12..1cf8ca7a99 100644 --- a/codersdk/workspacesdk/agentconn.go +++ b/codersdk/workspacesdk/agentconn.go @@ -70,7 +70,7 @@ type AgentConn interface { ListeningPorts(ctx context.Context) (codersdk.WorkspaceAgentListeningPortsResponse, error) Netcheck(ctx context.Context) (healthsdk.AgentNetcheckReport, error) Ping(ctx context.Context) (time.Duration, bool, *ipnstate.PingResult, error) - ProcessOutput(ctx context.Context, id string) (ProcessOutputResponse, error) + ProcessOutput(ctx context.Context, id string, opts *ProcessOutputOptions) (ProcessOutputResponse, error) PrometheusMetrics(ctx context.Context) ([]byte, error) ReconnectingPTY(ctx context.Context, id uuid.UUID, height uint16, width uint16, command string, initOpts ...AgentReconnectingPTYInitOption) (net.Conn, error) DeleteDevcontainer(ctx context.Context, devcontainerID string) error @@ -715,6 +715,14 @@ type ProcessOutputResponse struct { ExitCode *int `json:"exit_code,omitempty"` } +// ProcessOutputOptions configures blocking behavior for +// process output retrieval. +type ProcessOutputOptions struct { + // Wait enables blocking mode. When true, the request + // blocks until the process exits or the context expires. + Wait bool +} + // ProcessTruncation describes how process output was truncated. type ProcessTruncation struct { OriginalBytes int `json:"original_bytes"` @@ -946,10 +954,14 @@ func (c *agentConn) ListProcesses(ctx context.Context) (ListProcessesResponse, e } // ProcessOutput returns the output of a tracked process on the agent. -func (c *agentConn) ProcessOutput(ctx context.Context, id string) (ProcessOutputResponse, error) { +func (c *agentConn) ProcessOutput(ctx context.Context, id string, opts *ProcessOutputOptions) (ProcessOutputResponse, error) { ctx, span := tracing.StartSpan(ctx) defer span.End() - res, err := c.apiRequest(ctx, http.MethodGet, "/api/v0/processes/"+id+"/output", nil) + path := "/api/v0/processes/" + id + "/output" + if opts != nil && opts.Wait { + path += "?wait=true" + } + res, err := c.apiRequest(ctx, http.MethodGet, path, nil) if err != nil { return ProcessOutputResponse{}, xerrors.Errorf("do request: %w", err) } diff --git a/codersdk/workspacesdk/agentconnmock/agentconnmock.go b/codersdk/workspacesdk/agentconnmock/agentconnmock.go index 3204e5947d..6a83a8bd8d 100644 --- a/codersdk/workspacesdk/agentconnmock/agentconnmock.go +++ b/codersdk/workspacesdk/agentconnmock/agentconnmock.go @@ -308,18 +308,18 @@ func (mr *MockAgentConnMockRecorder) Ping(ctx any) *gomock.Call { } // ProcessOutput mocks base method. -func (m *MockAgentConn) ProcessOutput(ctx context.Context, id string) (workspacesdk.ProcessOutputResponse, error) { +func (m *MockAgentConn) ProcessOutput(ctx context.Context, id string, opts *workspacesdk.ProcessOutputOptions) (workspacesdk.ProcessOutputResponse, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessOutput", ctx, id) + ret := m.ctrl.Call(m, "ProcessOutput", ctx, id, opts) ret0, _ := ret[0].(workspacesdk.ProcessOutputResponse) ret1, _ := ret[1].(error) return ret0, ret1 } // ProcessOutput indicates an expected call of ProcessOutput. -func (mr *MockAgentConnMockRecorder) ProcessOutput(ctx, id any) *gomock.Call { +func (mr *MockAgentConnMockRecorder) ProcessOutput(ctx, id, opts any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessOutput", reflect.TypeOf((*MockAgentConn)(nil).ProcessOutput), ctx, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessOutput", reflect.TypeOf((*MockAgentConn)(nil).ProcessOutput), ctx, id, opts) } // PrometheusMetrics mocks base method.