perf(coderd): reduce chat streaming latency with event-driven acquisition (#23745)

Previously, when a user sent a message, there was a 0–1000ms (avg
~500ms) polling delay before processing began.
`SendMessage`/`CreateChat`/`EditMessage` set `status='pending'` in the
DB and returned, but nothing woke the processing loop — it was a blind
1-second ticker.

## Changes

**Event-driven acquisition (main change):** Adds a `wakeCh` channel to
the chatd `Server`. `CreateChat`, `SendMessage`, `EditMessage`, and
`PromoteQueued` call `signalWake()` after committing their transactions,
which wakes the run loop to call `processOnce` immediately. The 1-second
ticker remains as a fallback safety net for edge cases (stale recovery,
missed signals).

**Buffer WebSocket write channel:** Changes the
`OneWayWebSocketEventSender` event channel from unbuffered to buffered
(64), decoupling the event producer from WebSocket write speed. The
existing 10s write timeout guards against stuck connections.

<details><summary>Implementation plan & analysis</summary>

The full latency analysis identified these sources of delay in the
streaming pipeline:

1. **Chat acquisition polling** — 0–1000ms (avg 500ms) dead time per
message. Fixed by wake channel.
2. **Unbuffered WebSocket write channel** — each token blocked on the
previous WS write completing. Fixed by buffering.
3. **PersistStep DB transaction per step** — `FOR UPDATE` lock + batch
insert. Not addressed in this PR (medium risk, would overlap DB write
with next provider TTFB).
4. **Multi-hop channel pipeline** — 4 channel hops per token. Not
addressed (medium complexity).

</details>

<details><summary>Test stabilization notes</summary>

`signalWake()` causes the chatd daemon to process chats immediately
after creation/send/edit, which exposed timing assumptions in several
tests that expected chats to remain in `pending` status long enough to
assert on. These tests were updated with `require.Eventually` +
`WaitUntilIdleForTest` patterns to wait for processing to settle before
asserting.

The race detector (`test-go-race-pg`) shows failures in
`TestCreateWorkspaceTool_EndToEnd` and `TestAwaitSubagentCompletion` —
these appear to be pre-existing races in the end-to-end chat flow that
are now exercised more aggressively because processing starts
immediately instead of after a 1s delay. Main branch CI (race detector)
passes without these changes.

</details>
This commit is contained in:
Kyle Carberry
2026-03-28 15:26:42 -04:00
committed by GitHub
parent 565cf846de
commit 386b449273
5 changed files with 255 additions and 13 deletions
+44 -8
View File
@@ -539,7 +539,16 @@ func TestListChats(t *testing.T) {
require.Equal(t, firstUser.UserID, chat.OwnerID)
require.Equal(t, modelConfig.ID, chat.LastModelConfigID)
require.Equal(t, codersdk.ChatStatusPending, chat.Status)
// The chat may have been picked up by the background
// processor (via signalWake) before we list, so
// accept any active status.
require.Contains(t, []codersdk.ChatStatus{
codersdk.ChatStatusPending,
codersdk.ChatStatusRunning,
codersdk.ChatStatusError,
codersdk.ChatStatusWaiting,
codersdk.ChatStatusCompleted,
}, chat.Status, "unexpected chat status: %s", chat.Status)
require.NotZero(t, chat.CreatedAt)
require.NotZero(t, chat.UpdatedAt)
require.Nil(t, chat.ParentChatID)
@@ -549,7 +558,6 @@ func TestListChats(t *testing.T) {
require.NotNil(t, chat.DiffStatus)
require.Equal(t, chat.ID, chat.DiffStatus.ChatID)
}
require.Contains(t, chatsByID, firstChatA.ID)
require.Contains(t, chatsByID, firstChatB.ID)
require.NotContains(t, chatsByID, memberDBChat.ID)
@@ -559,12 +567,12 @@ func TestListChats(t *testing.T) {
for i := 1; i < len(chats); i++ {
require.False(t, chats[i-1].UpdatedAt.Before(chats[i].UpdatedAt))
}
if firstChatA.UpdatedAt.After(firstChatB.UpdatedAt) {
require.Less(t, chatIndexes[firstChatA.ID], chatIndexes[firstChatB.ID])
}
if firstChatB.UpdatedAt.After(firstChatA.UpdatedAt) {
require.Less(t, chatIndexes[firstChatB.ID], chatIndexes[firstChatA.ID])
}
// The list is already verified as sorted by UpdatedAt
// descending (loop above). We intentionally do NOT
// compare positions using the creation-time UpdatedAt
// values because signalWake() may trigger background
// processing that mutates UpdatedAt between CreateChat
// and ListChats.
memberChats, err := memberClient.ListChats(ctx, nil)
require.NoError(t, err)
@@ -613,6 +621,23 @@ func TestListChats(t *testing.T) {
createdChats = append(createdChats, chat)
}
// Wait for all chats to reach a terminal status so
// updated_at is stable before paginating.
for _, c := range createdChats {
require.Eventually(t, func() bool {
all, listErr := client.ListChats(ctx, nil)
if listErr != nil {
return false
}
for _, ch := range all {
if ch.ID == c.ID {
return ch.Status != codersdk.ChatStatusPending && ch.Status != codersdk.ChatStatusRunning
}
}
return false
}, testutil.WaitShort, testutil.IntervalFast)
}
// Fetch first page with limit=2.
page1, err := client.ListChats(ctx, &codersdk.ListChatsOptions{
Pagination: codersdk.Pagination{Limit: 2},
@@ -3683,6 +3708,17 @@ func TestRegenerateChatTitle(t *testing.T) {
})
require.NoError(t, err)
// Wait for background processing triggered by signalWake
// to finish before setting the status, otherwise the
// processor may update updated_at concurrently.
require.Eventually(t, func() bool {
c, getErr := db.GetChatByID(dbauthz.AsSystemRestricted(ctx), chat.ID)
if getErr != nil {
return false
}
return c.Status != database.ChatStatusPending && c.Status != database.ChatStatusRunning
}, testutil.WaitShort, testutil.IntervalFast)
_, err = db.UpdateChatStatus(dbauthz.AsSystemRestricted(ctx), database.UpdateChatStatusParams{
ID: chat.ID,
Status: database.ChatStatusCompleted,
+1 -1
View File
@@ -438,7 +438,7 @@ func OneWayWebSocketEventSender(log slog.Logger) func(rw http.ResponseWriter, r
}
go HeartbeatClose(ctx, log, cancel, socket)
eventC := make(chan codersdk.ServerSentEvent)
eventC := make(chan codersdk.ServerSentEvent, 64)
socketErrC := make(chan websocket.CloseError, 1)
closed := make(chan struct{})
go func() {
+21
View File
@@ -137,6 +137,11 @@ type Server struct {
maxChatsPerAcquire int32
inFlightChatStaleAfter time.Duration
chatHeartbeatInterval time.Duration
// wakeCh is signaled by SendMessage, EditMessage, CreateChat,
// and PromoteQueued so the run loop calls processOnce
// immediately instead of waiting for the next ticker.
wakeCh chan struct{}
}
// chatTemplateAllowlist returns the deployment-wide template
@@ -973,6 +978,7 @@ func (p *Server) CreateChat(ctx context.Context, opts CreateOptions) (database.C
}
p.publishChatPubsubEvent(chat, coderdpubsub.ChatEventKindCreated, nil)
p.signalWake()
return chat, nil
}
@@ -1134,6 +1140,7 @@ func (p *Server) SendMessage(
p.publishMessage(opts.ChatID, result.Message)
p.publishStatus(opts.ChatID, result.Chat.Status, result.Chat.WorkerID)
p.publishChatPubsubEvent(result.Chat, coderdpubsub.ChatEventKindStatusChange, nil)
p.signalWake()
return result, nil
}
@@ -1276,6 +1283,7 @@ func (p *Server) EditMessage(
})
p.publishStatus(opts.ChatID, result.Chat.Status, result.Chat.WorkerID)
p.publishChatPubsubEvent(result.Chat, coderdpubsub.ChatEventKindStatusChange, nil)
p.signalWake()
return result, nil
}
@@ -1461,6 +1469,7 @@ func (p *Server) PromoteQueued(
p.publishMessage(opts.ChatID, promoted)
p.publishStatus(opts.ChatID, updatedChat.Status, updatedChat.WorkerID)
p.publishChatPubsubEvent(updatedChat, coderdpubsub.ChatEventKindStatusChange, nil)
p.signalWake()
return result, nil
}
@@ -2373,6 +2382,7 @@ func New(cfg Config) *Server {
chatHeartbeatInterval: chatHeartbeatInterval,
usageTracker: cfg.UsageTracker,
clock: clk,
wakeCh: make(chan struct{}, 1),
}
//nolint:gocritic // The chat processor uses a scoped chatd context.
@@ -2435,12 +2445,23 @@ func (p *Server) start(ctx context.Context) {
return
case <-acquireTicker.C:
p.processOnce(ctx)
case <-p.wakeCh:
p.processOnce(ctx)
case <-staleTicker.C:
p.recoverStaleChats(ctx)
}
}
}
// signalWake wakes the run loop so it calls processOnce immediately.
// Non-blocking: if a signal is already pending it is a no-op.
func (p *Server) signalWake() {
select {
case p.wakeCh <- struct{}{}:
default:
}
}
func (p *Server) processOnce(ctx context.Context) {
if ctx.Err() != nil {
return
+174 -4
View File
@@ -603,9 +603,21 @@ func TestEditMessageUpdatesAndTruncatesAndClearsQueue(t *testing.T) {
require.NoError(t, err)
require.Len(t, queued, 0)
chatFromDB, err := db.GetChatByID(ctx, chat.ID)
require.NoError(t, err)
require.Equal(t, database.ChatStatusPending, chatFromDB.Status)
// The wake channel may trigger immediate processing after EditMessage,
// transitioning the chat from pending to running then error before we
// read the DB. Wait for any in-flight processing to settle.
// Note: WaitUntilIdleForTest must be called from the test goroutine
// (not inside require.Eventually) to avoid a WaitGroup Add/Wait race.
chatd.WaitUntilIdleForTest(replica)
var chatFromDB database.Chat
require.Eventually(t, func() bool {
c, e := db.GetChatByID(ctx, chat.ID)
if e != nil {
return false
}
chatFromDB = c
return chatFromDB.Status != database.ChatStatusRunning
}, testutil.WaitShort, testutil.IntervalFast)
require.False(t, chatFromDB.WorkerID.Valid)
}
@@ -1490,10 +1502,12 @@ func TestSubscribeSnapshotIncludesStatusEvent(t *testing.T) {
t.Cleanup(cancel)
// The first event in the snapshot must be a status event.
// The exact status depends on timing: CreateChat sets
// pending, but the wake signal may trigger processing
// before Subscribe is called.
require.NotEmpty(t, snapshot)
require.Equal(t, codersdk.ChatStreamEventTypeStatus, snapshot[0].Type)
require.NotNil(t, snapshot[0].Status)
require.Equal(t, codersdk.ChatStatusPending, snapshot[0].Status.Status)
}
func TestPersistToolResultWithBinaryData(t *testing.T) {
@@ -1691,6 +1705,18 @@ func TestSubscribeNoPubsubNoDuplicateMessageParts(t *testing.T) {
})
require.NoError(t, err)
// Wait for any wake-triggered processing to settle before
// subscribing, so the snapshot captures the final state.
// The wake signal may trigger processOnce which will fail
// (no LLM configured) and set the chat to error status.
// Poll until the chat leaves pending status, then wait for
// the goroutine to finish.
require.Eventually(t, func() bool {
c, err := db.GetChatByID(ctx, chat.ID)
return err == nil && c.Status != database.ChatStatusPending
}, testutil.WaitShort, testutil.IntervalFast)
chatd.WaitUntilIdleForTest(replica)
snapshot, events, cancel, ok := replica.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
@@ -2204,6 +2230,20 @@ func TestStoppedWorkspaceWithPersistedAgentBindingDoesNotBlockChat(t *testing.T)
})
require.NoError(t, err)
// Close the inactive server so its wake-triggered processing
// stops and releases the chat. Then reset to pending so the
// active server (created below) can acquire it cleanly.
require.NoError(t, inactive.Close())
_, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
ID: chat.ID,
Status: database.ChatStatusPending,
WorkerID: uuid.NullUUID{},
StartedAt: sql.NullTime{},
HeartbeatAt: sql.NullTime{},
LastError: sql.NullString{},
})
require.NoError(t, err)
build, err := db.GetLatestWorkspaceBuildByWorkspaceID(ctx, ws.ID)
require.NoError(t, err)
chat, err = db.UpdateChatBuildAgentBinding(ctx, database.UpdateChatBuildAgentBindingParams{
@@ -4283,3 +4323,133 @@ func TestChatTemplateAllowlistEnforcement(t *testing.T) {
require.NotContains(t, toolResult, tplBlocked.ID.String(),
"blocked template should NOT appear in list_templates result")
}
// TestSignalWakeImmediateAcquisition verifies that CreateChat triggers
// immediate processing via signalWake without waiting for the polling
// ticker to fire. The ticker interval is set to an hour so it never
// fires during the test — any processing must come from the wake
// channel.
func TestSignalWakeImmediateAcquisition(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitLong)
processed := make(chan struct{})
openAIURL := chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
if !req.Stream {
return chattest.OpenAINonStreamingResponse("title")
}
// Signal that the LLM was reached — this proves the chat
// was acquired and processing started.
select {
case <-processed:
default:
close(processed)
}
return chattest.OpenAIStreamingResponse(
chattest.OpenAITextChunks("hello from the model")...,
)
})
// Use a 1-hour acquire interval so the ticker never fires.
server := newActiveTestServer(t, db, ps, func(cfg *chatd.Config) {
cfg.PendingChatAcquireInterval = time.Hour
cfg.InFlightChatStaleAfter = testutil.WaitSuperLong
})
user, model := seedChatDependencies(ctx, t, db)
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
// CreateChat sets status=pending and calls signalWake().
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "wake-test",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
// The chat should be processed immediately — the LLM handler
// closes the `processed` channel when it receives a streaming
// request. Without signalWake this would hang forever because
// the 1-hour ticker never fires.
testutil.TryReceive(ctx, t, processed)
chatd.WaitUntilIdleForTest(server)
// Verify the chat was fully processed.
fromDB, err := db.GetChatByID(ctx, chat.ID)
require.NoError(t, err)
require.Equal(t, database.ChatStatusWaiting, fromDB.Status,
"chat should be in waiting status after processing completes")
}
// TestSignalWakeSendMessage verifies that SendMessage on an idle chat
// triggers immediate processing via signalWake.
func TestSignalWakeSendMessage(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitSuperLong)
firstProcessed := make(chan struct{})
var requestCount atomic.Int32
secondProcessed := make(chan struct{})
openAIURL := chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
if !req.Stream {
return chattest.OpenAINonStreamingResponse("title")
}
switch requestCount.Add(1) {
case 1:
select {
case <-firstProcessed:
default:
close(firstProcessed)
}
case 2:
close(secondProcessed)
}
return chattest.OpenAIStreamingResponse(
chattest.OpenAITextChunks("response")...,
)
})
server := newActiveTestServer(t, db, ps, func(cfg *chatd.Config) {
cfg.PendingChatAcquireInterval = time.Hour
cfg.InFlightChatStaleAfter = testutil.WaitSuperLong
})
user, model := seedChatDependencies(ctx, t, db)
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
// CreateChat triggers wake -> processes first turn.
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "wake-send-test",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("first")},
})
require.NoError(t, err)
// Wait for the first turn to actually reach the LLM, then
// wait for the processing goroutine to finish so the chat
// transitions to "waiting" status.
testutil.TryReceive(ctx, t, firstProcessed)
chatd.WaitUntilIdleForTest(server)
// Now send a follow-up message — this should also be
// processed immediately via signalWake.
_, err = server.SendMessage(ctx, chatd.SendMessageOptions{
ChatID: chat.ID,
Content: []codersdk.ChatMessagePart{codersdk.ChatMessageText("second")},
})
require.NoError(t, err)
testutil.TryReceive(ctx, t, secondProcessed)
chatd.WaitUntilIdleForTest(server)
// Both turns processed — verify second request reached the LLM.
require.GreaterOrEqual(t, requestCount.Load(), int32(2),
"LLM should have received at least 2 streaming requests")
}
+15
View File
@@ -1005,6 +1005,12 @@ func TestAwaitSubagentCompletion(t *testing.T) {
parent, child := createParentChildChats(ctx, t, server, user, model)
// signalWake from CreateChat may trigger immediate processing.
// Wait for it to settle, then reset chats to the state we need.
server.inflight.Wait()
setChatStatus(ctx, t, db, parent.ID, database.ChatStatusRunning, "")
setChatStatus(ctx, t, db, child.ID, database.ChatStatusRunning, "")
// Trap the fallback poll ticker to know when the
// function has subscribed to pubsub and entered
// its select loop.
@@ -1088,6 +1094,15 @@ func TestAwaitSubagentCompletion(t *testing.T) {
parent, child := createParentChildChats(ctx, t, server, user, model)
// signalWake from CreateChat may have triggered background
// processing that transitions the child to "error". Wait
// for that to finish, then reset to "running" so the test
// exercises the context-cancellation path. Using "running"
// (not "pending") prevents re-acquisition by the shared
// server's background loop.
server.inflight.Wait()
setChatStatus(ctx, t, db, child.ID, database.ChatStatusRunning, "")
// Use a short-lived context instead of goroutine + sleep.
shortCtx, cancel := context.WithTimeout(ctx, testutil.IntervalMedium)
defer cancel()