mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
perf(coderd): reduce chat streaming latency with event-driven acquisition (#23745)
Previously, when a user sent a message, there was a 0–1000ms (avg ~500ms) polling delay before processing began. `SendMessage`/`CreateChat`/`EditMessage` set `status='pending'` in the DB and returned, but nothing woke the processing loop — it was a blind 1-second ticker. ## Changes **Event-driven acquisition (main change):** Adds a `wakeCh` channel to the chatd `Server`. `CreateChat`, `SendMessage`, `EditMessage`, and `PromoteQueued` call `signalWake()` after committing their transactions, which wakes the run loop to call `processOnce` immediately. The 1-second ticker remains as a fallback safety net for edge cases (stale recovery, missed signals). **Buffer WebSocket write channel:** Changes the `OneWayWebSocketEventSender` event channel from unbuffered to buffered (64), decoupling the event producer from WebSocket write speed. The existing 10s write timeout guards against stuck connections. <details><summary>Implementation plan & analysis</summary> The full latency analysis identified these sources of delay in the streaming pipeline: 1. **Chat acquisition polling** — 0–1000ms (avg 500ms) dead time per message. Fixed by wake channel. 2. **Unbuffered WebSocket write channel** — each token blocked on the previous WS write completing. Fixed by buffering. 3. **PersistStep DB transaction per step** — `FOR UPDATE` lock + batch insert. Not addressed in this PR (medium risk, would overlap DB write with next provider TTFB). 4. **Multi-hop channel pipeline** — 4 channel hops per token. Not addressed (medium complexity). </details> <details><summary>Test stabilization notes</summary> `signalWake()` causes the chatd daemon to process chats immediately after creation/send/edit, which exposed timing assumptions in several tests that expected chats to remain in `pending` status long enough to assert on. These tests were updated with `require.Eventually` + `WaitUntilIdleForTest` patterns to wait for processing to settle before asserting. The race detector (`test-go-race-pg`) shows failures in `TestCreateWorkspaceTool_EndToEnd` and `TestAwaitSubagentCompletion` — these appear to be pre-existing races in the end-to-end chat flow that are now exercised more aggressively because processing starts immediately instead of after a 1s delay. Main branch CI (race detector) passes without these changes. </details>
This commit is contained in:
@@ -539,7 +539,16 @@ func TestListChats(t *testing.T) {
|
||||
|
||||
require.Equal(t, firstUser.UserID, chat.OwnerID)
|
||||
require.Equal(t, modelConfig.ID, chat.LastModelConfigID)
|
||||
require.Equal(t, codersdk.ChatStatusPending, chat.Status)
|
||||
// The chat may have been picked up by the background
|
||||
// processor (via signalWake) before we list, so
|
||||
// accept any active status.
|
||||
require.Contains(t, []codersdk.ChatStatus{
|
||||
codersdk.ChatStatusPending,
|
||||
codersdk.ChatStatusRunning,
|
||||
codersdk.ChatStatusError,
|
||||
codersdk.ChatStatusWaiting,
|
||||
codersdk.ChatStatusCompleted,
|
||||
}, chat.Status, "unexpected chat status: %s", chat.Status)
|
||||
require.NotZero(t, chat.CreatedAt)
|
||||
require.NotZero(t, chat.UpdatedAt)
|
||||
require.Nil(t, chat.ParentChatID)
|
||||
@@ -549,7 +558,6 @@ func TestListChats(t *testing.T) {
|
||||
require.NotNil(t, chat.DiffStatus)
|
||||
require.Equal(t, chat.ID, chat.DiffStatus.ChatID)
|
||||
}
|
||||
|
||||
require.Contains(t, chatsByID, firstChatA.ID)
|
||||
require.Contains(t, chatsByID, firstChatB.ID)
|
||||
require.NotContains(t, chatsByID, memberDBChat.ID)
|
||||
@@ -559,12 +567,12 @@ func TestListChats(t *testing.T) {
|
||||
for i := 1; i < len(chats); i++ {
|
||||
require.False(t, chats[i-1].UpdatedAt.Before(chats[i].UpdatedAt))
|
||||
}
|
||||
if firstChatA.UpdatedAt.After(firstChatB.UpdatedAt) {
|
||||
require.Less(t, chatIndexes[firstChatA.ID], chatIndexes[firstChatB.ID])
|
||||
}
|
||||
if firstChatB.UpdatedAt.After(firstChatA.UpdatedAt) {
|
||||
require.Less(t, chatIndexes[firstChatB.ID], chatIndexes[firstChatA.ID])
|
||||
}
|
||||
// The list is already verified as sorted by UpdatedAt
|
||||
// descending (loop above). We intentionally do NOT
|
||||
// compare positions using the creation-time UpdatedAt
|
||||
// values because signalWake() may trigger background
|
||||
// processing that mutates UpdatedAt between CreateChat
|
||||
// and ListChats.
|
||||
|
||||
memberChats, err := memberClient.ListChats(ctx, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -613,6 +621,23 @@ func TestListChats(t *testing.T) {
|
||||
createdChats = append(createdChats, chat)
|
||||
}
|
||||
|
||||
// Wait for all chats to reach a terminal status so
|
||||
// updated_at is stable before paginating.
|
||||
for _, c := range createdChats {
|
||||
require.Eventually(t, func() bool {
|
||||
all, listErr := client.ListChats(ctx, nil)
|
||||
if listErr != nil {
|
||||
return false
|
||||
}
|
||||
for _, ch := range all {
|
||||
if ch.ID == c.ID {
|
||||
return ch.Status != codersdk.ChatStatusPending && ch.Status != codersdk.ChatStatusRunning
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
}
|
||||
|
||||
// Fetch first page with limit=2.
|
||||
page1, err := client.ListChats(ctx, &codersdk.ListChatsOptions{
|
||||
Pagination: codersdk.Pagination{Limit: 2},
|
||||
@@ -3683,6 +3708,17 @@ func TestRegenerateChatTitle(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for background processing triggered by signalWake
|
||||
// to finish before setting the status, otherwise the
|
||||
// processor may update updated_at concurrently.
|
||||
require.Eventually(t, func() bool {
|
||||
c, getErr := db.GetChatByID(dbauthz.AsSystemRestricted(ctx), chat.ID)
|
||||
if getErr != nil {
|
||||
return false
|
||||
}
|
||||
return c.Status != database.ChatStatusPending && c.Status != database.ChatStatusRunning
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
_, err = db.UpdateChatStatus(dbauthz.AsSystemRestricted(ctx), database.UpdateChatStatusParams{
|
||||
ID: chat.ID,
|
||||
Status: database.ChatStatusCompleted,
|
||||
|
||||
@@ -438,7 +438,7 @@ func OneWayWebSocketEventSender(log slog.Logger) func(rw http.ResponseWriter, r
|
||||
}
|
||||
go HeartbeatClose(ctx, log, cancel, socket)
|
||||
|
||||
eventC := make(chan codersdk.ServerSentEvent)
|
||||
eventC := make(chan codersdk.ServerSentEvent, 64)
|
||||
socketErrC := make(chan websocket.CloseError, 1)
|
||||
closed := make(chan struct{})
|
||||
go func() {
|
||||
|
||||
@@ -137,6 +137,11 @@ type Server struct {
|
||||
maxChatsPerAcquire int32
|
||||
inFlightChatStaleAfter time.Duration
|
||||
chatHeartbeatInterval time.Duration
|
||||
|
||||
// wakeCh is signaled by SendMessage, EditMessage, CreateChat,
|
||||
// and PromoteQueued so the run loop calls processOnce
|
||||
// immediately instead of waiting for the next ticker.
|
||||
wakeCh chan struct{}
|
||||
}
|
||||
|
||||
// chatTemplateAllowlist returns the deployment-wide template
|
||||
@@ -973,6 +978,7 @@ func (p *Server) CreateChat(ctx context.Context, opts CreateOptions) (database.C
|
||||
}
|
||||
|
||||
p.publishChatPubsubEvent(chat, coderdpubsub.ChatEventKindCreated, nil)
|
||||
p.signalWake()
|
||||
return chat, nil
|
||||
}
|
||||
|
||||
@@ -1134,6 +1140,7 @@ func (p *Server) SendMessage(
|
||||
p.publishMessage(opts.ChatID, result.Message)
|
||||
p.publishStatus(opts.ChatID, result.Chat.Status, result.Chat.WorkerID)
|
||||
p.publishChatPubsubEvent(result.Chat, coderdpubsub.ChatEventKindStatusChange, nil)
|
||||
p.signalWake()
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -1276,6 +1283,7 @@ func (p *Server) EditMessage(
|
||||
})
|
||||
p.publishStatus(opts.ChatID, result.Chat.Status, result.Chat.WorkerID)
|
||||
p.publishChatPubsubEvent(result.Chat, coderdpubsub.ChatEventKindStatusChange, nil)
|
||||
p.signalWake()
|
||||
|
||||
return result, nil
|
||||
}
|
||||
@@ -1461,6 +1469,7 @@ func (p *Server) PromoteQueued(
|
||||
p.publishMessage(opts.ChatID, promoted)
|
||||
p.publishStatus(opts.ChatID, updatedChat.Status, updatedChat.WorkerID)
|
||||
p.publishChatPubsubEvent(updatedChat, coderdpubsub.ChatEventKindStatusChange, nil)
|
||||
p.signalWake()
|
||||
|
||||
return result, nil
|
||||
}
|
||||
@@ -2373,6 +2382,7 @@ func New(cfg Config) *Server {
|
||||
chatHeartbeatInterval: chatHeartbeatInterval,
|
||||
usageTracker: cfg.UsageTracker,
|
||||
clock: clk,
|
||||
wakeCh: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
//nolint:gocritic // The chat processor uses a scoped chatd context.
|
||||
@@ -2435,12 +2445,23 @@ func (p *Server) start(ctx context.Context) {
|
||||
return
|
||||
case <-acquireTicker.C:
|
||||
p.processOnce(ctx)
|
||||
case <-p.wakeCh:
|
||||
p.processOnce(ctx)
|
||||
case <-staleTicker.C:
|
||||
p.recoverStaleChats(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// signalWake wakes the run loop so it calls processOnce immediately.
|
||||
// Non-blocking: if a signal is already pending it is a no-op.
|
||||
func (p *Server) signalWake() {
|
||||
select {
|
||||
case p.wakeCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Server) processOnce(ctx context.Context) {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
|
||||
@@ -603,9 +603,21 @@ func TestEditMessageUpdatesAndTruncatesAndClearsQueue(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queued, 0)
|
||||
|
||||
chatFromDB, err := db.GetChatByID(ctx, chat.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, database.ChatStatusPending, chatFromDB.Status)
|
||||
// The wake channel may trigger immediate processing after EditMessage,
|
||||
// transitioning the chat from pending to running then error before we
|
||||
// read the DB. Wait for any in-flight processing to settle.
|
||||
// Note: WaitUntilIdleForTest must be called from the test goroutine
|
||||
// (not inside require.Eventually) to avoid a WaitGroup Add/Wait race.
|
||||
chatd.WaitUntilIdleForTest(replica)
|
||||
var chatFromDB database.Chat
|
||||
require.Eventually(t, func() bool {
|
||||
c, e := db.GetChatByID(ctx, chat.ID)
|
||||
if e != nil {
|
||||
return false
|
||||
}
|
||||
chatFromDB = c
|
||||
return chatFromDB.Status != database.ChatStatusRunning
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
require.False(t, chatFromDB.WorkerID.Valid)
|
||||
}
|
||||
|
||||
@@ -1490,10 +1502,12 @@ func TestSubscribeSnapshotIncludesStatusEvent(t *testing.T) {
|
||||
t.Cleanup(cancel)
|
||||
|
||||
// The first event in the snapshot must be a status event.
|
||||
// The exact status depends on timing: CreateChat sets
|
||||
// pending, but the wake signal may trigger processing
|
||||
// before Subscribe is called.
|
||||
require.NotEmpty(t, snapshot)
|
||||
require.Equal(t, codersdk.ChatStreamEventTypeStatus, snapshot[0].Type)
|
||||
require.NotNil(t, snapshot[0].Status)
|
||||
require.Equal(t, codersdk.ChatStatusPending, snapshot[0].Status.Status)
|
||||
}
|
||||
|
||||
func TestPersistToolResultWithBinaryData(t *testing.T) {
|
||||
@@ -1691,6 +1705,18 @@ func TestSubscribeNoPubsubNoDuplicateMessageParts(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for any wake-triggered processing to settle before
|
||||
// subscribing, so the snapshot captures the final state.
|
||||
// The wake signal may trigger processOnce which will fail
|
||||
// (no LLM configured) and set the chat to error status.
|
||||
// Poll until the chat leaves pending status, then wait for
|
||||
// the goroutine to finish.
|
||||
require.Eventually(t, func() bool {
|
||||
c, err := db.GetChatByID(ctx, chat.ID)
|
||||
return err == nil && c.Status != database.ChatStatusPending
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
chatd.WaitUntilIdleForTest(replica)
|
||||
|
||||
snapshot, events, cancel, ok := replica.Subscribe(ctx, chat.ID, nil, 0)
|
||||
require.True(t, ok)
|
||||
t.Cleanup(cancel)
|
||||
@@ -2204,6 +2230,20 @@ func TestStoppedWorkspaceWithPersistedAgentBindingDoesNotBlockChat(t *testing.T)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Close the inactive server so its wake-triggered processing
|
||||
// stops and releases the chat. Then reset to pending so the
|
||||
// active server (created below) can acquire it cleanly.
|
||||
require.NoError(t, inactive.Close())
|
||||
_, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
|
||||
ID: chat.ID,
|
||||
Status: database.ChatStatusPending,
|
||||
WorkerID: uuid.NullUUID{},
|
||||
StartedAt: sql.NullTime{},
|
||||
HeartbeatAt: sql.NullTime{},
|
||||
LastError: sql.NullString{},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
build, err := db.GetLatestWorkspaceBuildByWorkspaceID(ctx, ws.ID)
|
||||
require.NoError(t, err)
|
||||
chat, err = db.UpdateChatBuildAgentBinding(ctx, database.UpdateChatBuildAgentBindingParams{
|
||||
@@ -4283,3 +4323,133 @@ func TestChatTemplateAllowlistEnforcement(t *testing.T) {
|
||||
require.NotContains(t, toolResult, tplBlocked.ID.String(),
|
||||
"blocked template should NOT appear in list_templates result")
|
||||
}
|
||||
|
||||
// TestSignalWakeImmediateAcquisition verifies that CreateChat triggers
|
||||
// immediate processing via signalWake without waiting for the polling
|
||||
// ticker to fire. The ticker interval is set to an hour so it never
|
||||
// fires during the test — any processing must come from the wake
|
||||
// channel.
|
||||
func TestSignalWakeImmediateAcquisition(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, ps := dbtestutil.NewDB(t)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
|
||||
processed := make(chan struct{})
|
||||
openAIURL := chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
|
||||
if !req.Stream {
|
||||
return chattest.OpenAINonStreamingResponse("title")
|
||||
}
|
||||
// Signal that the LLM was reached — this proves the chat
|
||||
// was acquired and processing started.
|
||||
select {
|
||||
case <-processed:
|
||||
default:
|
||||
close(processed)
|
||||
}
|
||||
return chattest.OpenAIStreamingResponse(
|
||||
chattest.OpenAITextChunks("hello from the model")...,
|
||||
)
|
||||
})
|
||||
|
||||
// Use a 1-hour acquire interval so the ticker never fires.
|
||||
server := newActiveTestServer(t, db, ps, func(cfg *chatd.Config) {
|
||||
cfg.PendingChatAcquireInterval = time.Hour
|
||||
cfg.InFlightChatStaleAfter = testutil.WaitSuperLong
|
||||
})
|
||||
|
||||
user, model := seedChatDependencies(ctx, t, db)
|
||||
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
|
||||
|
||||
// CreateChat sets status=pending and calls signalWake().
|
||||
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
||||
OwnerID: user.ID,
|
||||
Title: "wake-test",
|
||||
ModelConfigID: model.ID,
|
||||
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// The chat should be processed immediately — the LLM handler
|
||||
// closes the `processed` channel when it receives a streaming
|
||||
// request. Without signalWake this would hang forever because
|
||||
// the 1-hour ticker never fires.
|
||||
testutil.TryReceive(ctx, t, processed)
|
||||
|
||||
chatd.WaitUntilIdleForTest(server)
|
||||
|
||||
// Verify the chat was fully processed.
|
||||
fromDB, err := db.GetChatByID(ctx, chat.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, database.ChatStatusWaiting, fromDB.Status,
|
||||
"chat should be in waiting status after processing completes")
|
||||
}
|
||||
|
||||
// TestSignalWakeSendMessage verifies that SendMessage on an idle chat
|
||||
// triggers immediate processing via signalWake.
|
||||
func TestSignalWakeSendMessage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, ps := dbtestutil.NewDB(t)
|
||||
ctx := testutil.Context(t, testutil.WaitSuperLong)
|
||||
|
||||
firstProcessed := make(chan struct{})
|
||||
var requestCount atomic.Int32
|
||||
secondProcessed := make(chan struct{})
|
||||
openAIURL := chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
|
||||
if !req.Stream {
|
||||
return chattest.OpenAINonStreamingResponse("title")
|
||||
}
|
||||
switch requestCount.Add(1) {
|
||||
case 1:
|
||||
select {
|
||||
case <-firstProcessed:
|
||||
default:
|
||||
close(firstProcessed)
|
||||
}
|
||||
case 2:
|
||||
close(secondProcessed)
|
||||
}
|
||||
return chattest.OpenAIStreamingResponse(
|
||||
chattest.OpenAITextChunks("response")...,
|
||||
)
|
||||
})
|
||||
|
||||
server := newActiveTestServer(t, db, ps, func(cfg *chatd.Config) {
|
||||
cfg.PendingChatAcquireInterval = time.Hour
|
||||
cfg.InFlightChatStaleAfter = testutil.WaitSuperLong
|
||||
})
|
||||
|
||||
user, model := seedChatDependencies(ctx, t, db)
|
||||
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
|
||||
|
||||
// CreateChat triggers wake -> processes first turn.
|
||||
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
||||
OwnerID: user.ID,
|
||||
Title: "wake-send-test",
|
||||
ModelConfigID: model.ID,
|
||||
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("first")},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for the first turn to actually reach the LLM, then
|
||||
// wait for the processing goroutine to finish so the chat
|
||||
// transitions to "waiting" status.
|
||||
testutil.TryReceive(ctx, t, firstProcessed)
|
||||
chatd.WaitUntilIdleForTest(server)
|
||||
|
||||
// Now send a follow-up message — this should also be
|
||||
// processed immediately via signalWake.
|
||||
_, err = server.SendMessage(ctx, chatd.SendMessageOptions{
|
||||
ChatID: chat.ID,
|
||||
Content: []codersdk.ChatMessagePart{codersdk.ChatMessageText("second")},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
testutil.TryReceive(ctx, t, secondProcessed)
|
||||
chatd.WaitUntilIdleForTest(server)
|
||||
|
||||
// Both turns processed — verify second request reached the LLM.
|
||||
require.GreaterOrEqual(t, requestCount.Load(), int32(2),
|
||||
"LLM should have received at least 2 streaming requests")
|
||||
}
|
||||
|
||||
@@ -1005,6 +1005,12 @@ func TestAwaitSubagentCompletion(t *testing.T) {
|
||||
|
||||
parent, child := createParentChildChats(ctx, t, server, user, model)
|
||||
|
||||
// signalWake from CreateChat may trigger immediate processing.
|
||||
// Wait for it to settle, then reset chats to the state we need.
|
||||
server.inflight.Wait()
|
||||
setChatStatus(ctx, t, db, parent.ID, database.ChatStatusRunning, "")
|
||||
setChatStatus(ctx, t, db, child.ID, database.ChatStatusRunning, "")
|
||||
|
||||
// Trap the fallback poll ticker to know when the
|
||||
// function has subscribed to pubsub and entered
|
||||
// its select loop.
|
||||
@@ -1088,6 +1094,15 @@ func TestAwaitSubagentCompletion(t *testing.T) {
|
||||
|
||||
parent, child := createParentChildChats(ctx, t, server, user, model)
|
||||
|
||||
// signalWake from CreateChat may have triggered background
|
||||
// processing that transitions the child to "error". Wait
|
||||
// for that to finish, then reset to "running" so the test
|
||||
// exercises the context-cancellation path. Using "running"
|
||||
// (not "pending") prevents re-acquisition by the shared
|
||||
// server's background loop.
|
||||
server.inflight.Wait()
|
||||
setChatStatus(ctx, t, db, child.ID, database.ChatStatusRunning, "")
|
||||
|
||||
// Use a short-lived context instead of goroutine + sleep.
|
||||
shortCtx, cancel := context.WithTimeout(ctx, testutil.IntervalMedium)
|
||||
defer cancel()
|
||||
|
||||
Reference in New Issue
Block a user