feat: make process output blocking-capable (#23312)

Replace the 200ms polling loop in chatd's execute and
process_output tools with server-side blocking via sync.Cond
on HeadTailBuffer.

The agent's GET /{id}/output endpoint accepts ?wait=true to
block until the process exits or a 5-minute server cap expires.
The process_output tool blocks by default for 10s (overridable
via wait_timeout), and falls back to a non-blocking snapshot on
timeout. The execute tool's foreground path makes a single
blocking call instead of polling.

Related #23316
This commit is contained in:
Mathias Fredriksson
2026-03-20 14:33:55 +02:00
committed by GitHub
parent c8e58575e0
commit 41e15ae440
10 changed files with 375 additions and 81 deletions
+33
View File
@@ -1,11 +1,13 @@
package agentproc
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sort"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
@@ -18,6 +20,13 @@ import (
"github.com/coder/coder/v2/codersdk/workspacesdk"
)
const (
// maxWaitDuration is the maximum time a blocking
// process output request can wait, regardless of
// what the client requests.
maxWaitDuration = 5 * time.Minute
)
// API exposes process-related operations through the agent.
type API struct {
logger slog.Logger
@@ -163,6 +172,30 @@ func (api *API) handleProcessOutput(rw http.ResponseWriter, r *http.Request) {
}
}
// Check for blocking mode via query params.
waitStr := r.URL.Query().Get("wait")
wantWait := waitStr == "true"
if wantWait {
// Extend the write deadline so the HTTP server's
// WriteTimeout does not kill the connection while
// we block.
rc := http.NewResponseController(rw)
if err := rc.SetWriteDeadline(time.Now().Add(maxWaitDuration)); err != nil {
api.logger.Error(ctx, "extend write deadline for blocking process output",
slog.Error(err),
)
}
// Cap the wait at maxWaitDuration regardless of
// client-supplied timeout.
waitCtx, waitCancel := context.WithTimeout(ctx, maxWaitDuration)
defer waitCancel()
_ = proc.waitForOutput(waitCtx)
// Fall through to read snapshot below.
}
output, truncated := proc.output()
info := proc.info()
+128
View File
@@ -10,6 +10,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"testing"
"time"
@@ -783,6 +784,133 @@ func TestProcessOutput(t *testing.T) {
w2 := getOutput(t, handler, id)
require.Equal(t, http.StatusOK, w2.Code)
})
t.Run("WaitForExit", func(t *testing.T) {
t.Parallel()
handler := newTestAPI(t)
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
Command: "echo hello-wait && sleep 0.1",
})
w := getOutputWithWait(t, handler, id)
require.Equal(t, http.StatusOK, w.Code)
var resp workspacesdk.ProcessOutputResponse
err := json.NewDecoder(w.Body).Decode(&resp)
require.NoError(t, err)
require.False(t, resp.Running)
require.NotNil(t, resp.ExitCode)
require.Equal(t, 0, *resp.ExitCode)
require.Contains(t, resp.Output, "hello-wait")
})
t.Run("WaitAlreadyExited", func(t *testing.T) {
t.Parallel()
handler := newTestAPI(t)
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
Command: "echo done",
})
waitForExit(t, handler, id)
w := getOutputWithWait(t, handler, id)
require.Equal(t, http.StatusOK, w.Code)
var resp workspacesdk.ProcessOutputResponse
err := json.NewDecoder(w.Body).Decode(&resp)
require.NoError(t, err)
require.False(t, resp.Running)
require.Contains(t, resp.Output, "done")
})
t.Run("WaitTimeout", func(t *testing.T) {
t.Parallel()
handler := newTestAPI(t)
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
Command: "sleep 300",
Background: true,
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.IntervalMedium)
defer cancel()
w := getOutputWithWaitCtx(ctx, t, handler, id)
require.Equal(t, http.StatusOK, w.Code)
var resp workspacesdk.ProcessOutputResponse
err := json.NewDecoder(w.Body).Decode(&resp)
require.NoError(t, err)
require.True(t, resp.Running)
// Kill and wait for the process so cleanup does
// not hang.
postSignal(
t, handler, id,
workspacesdk.SignalProcessRequest{Signal: "kill"},
)
waitForExit(t, handler, id)
})
t.Run("ConcurrentWaiters", func(t *testing.T) {
t.Parallel()
handler := newTestAPI(t)
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
Command: "sleep 300",
Background: true,
})
var (
wg sync.WaitGroup
resps [2]workspacesdk.ProcessOutputResponse
codes [2]int
)
for i := range 2 {
wg.Add(1)
go func() {
defer wg.Done()
w := getOutputWithWait(t, handler, id)
codes[i] = w.Code
_ = json.NewDecoder(w.Body).Decode(&resps[i])
}()
}
// Signal the process to exit so both waiters unblock.
postSignal(
t, handler, id,
workspacesdk.SignalProcessRequest{Signal: "kill"},
)
wg.Wait()
for i := range 2 {
require.Equal(t, http.StatusOK, codes[i], "waiter %d", i)
require.False(t, resps[i].Running, "waiter %d", i)
}
})
}
func getOutputWithWait(t *testing.T, handler http.Handler, id string) *httptest.ResponseRecorder {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
return getOutputWithWaitCtx(ctx, t, handler, id)
}
func getOutputWithWaitCtx(ctx context.Context, t *testing.T, handler http.Handler, id string) *httptest.ResponseRecorder {
t.Helper()
path := fmt.Sprintf("/%s/output?wait=true", id)
req := httptest.NewRequestWithContext(ctx, http.MethodGet, path, nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
return w
}
func TestSignalProcess(t *testing.T) {
+19 -2
View File
@@ -39,11 +39,13 @@ const (
// how much output is written.
type HeadTailBuffer struct {
mu sync.Mutex
cond *sync.Cond
head []byte
tail []byte
tailPos int
tailFull bool
headFull bool
closed bool
totalBytes int
maxHead int
maxTail int
@@ -52,20 +54,24 @@ type HeadTailBuffer struct {
// NewHeadTailBuffer creates a new HeadTailBuffer with the
// default head and tail sizes.
func NewHeadTailBuffer() *HeadTailBuffer {
return &HeadTailBuffer{
b := &HeadTailBuffer{
maxHead: MaxHeadBytes,
maxTail: MaxTailBytes,
}
b.cond = sync.NewCond(&b.mu)
return b
}
// NewHeadTailBufferSized creates a HeadTailBuffer with custom
// head and tail sizes. This is useful for testing truncation
// logic with smaller buffers.
func NewHeadTailBufferSized(maxHead, maxTail int) *HeadTailBuffer {
return &HeadTailBuffer{
b := &HeadTailBuffer{
maxHead: maxHead,
maxTail: maxTail,
}
b.cond = sync.NewCond(&b.mu)
return b
}
// Write implements io.Writer. It is safe for concurrent use.
@@ -296,6 +302,15 @@ func truncateLines(s string) string {
return b.String()
}
// Close marks the buffer as closed and wakes any waiters.
// This is called when the process exits.
func (b *HeadTailBuffer) Close() {
b.mu.Lock()
defer b.mu.Unlock()
b.closed = true
b.cond.Broadcast()
}
// Reset clears the buffer, discarding all data.
func (b *HeadTailBuffer) Reset() {
b.mu.Lock()
@@ -305,5 +320,7 @@ func (b *HeadTailBuffer) Reset() {
b.tailPos = 0
b.tailFull = false
b.headFull = false
b.closed = false
b.totalBytes = 0
b.cond.Broadcast()
}
+33
View File
@@ -208,6 +208,9 @@ func (m *manager) start(req workspacesdk.StartProcessRequest, chatID string) (*p
proc.exitCode = &code
proc.mu.Unlock()
// Wake any waiters blocked on new output or
// process exit before closing the done channel.
proc.buf.Close()
close(proc.done)
}()
@@ -320,6 +323,36 @@ func (m *manager) Close() error {
return nil
}
// waitForOutput blocks until the buffer is closed (process
// exited) or the context is canceled. Returns nil when the
// buffer closed, ctx.Err() when the context expired.
func (p *process) waitForOutput(ctx context.Context) error {
p.buf.cond.L.Lock()
defer p.buf.cond.L.Unlock()
nevermind := make(chan struct{})
defer close(nevermind)
go func() {
select {
case <-ctx.Done():
// Acquire the lock before broadcasting to
// guarantee the waiter has entered cond.Wait()
// (which atomically releases the lock).
// Without this, a Broadcast between the loop
// predicate check and cond.Wait() is lost.
p.buf.cond.L.Lock()
defer p.buf.cond.L.Unlock()
p.buf.cond.Broadcast()
case <-nevermind:
}
}()
for ctx.Err() == nil && !p.buf.closed {
p.buf.cond.Wait()
}
return ctx.Err()
}
// resolveWorkDir returns the directory a process should start in.
// Priority: explicit request dir > agent configured dir > $HOME.
// Falls through when a candidate is empty or does not exist on