fix: stop treating pending chat notification as cancel signal

Stale "pending" pubsub notifications from SendMessage can arrive at the
worker control subscriber after close(controlArmed), causing a spurious
ErrInterrupted that aborts the LLM call before the assistant message is
committed.

Remove ChatStatusPending from the cancel-trigger set in
shouldCancelChatFromControlNotification. A "pending" notification
reaching an active worker is always a stale echo: SendMessage (trigger),
EditMessage (handled by persistStep ownership guard), or auto-promote
(runs after processing). None warrant canceling the active run.

Unskip TestSubscribeRelayEstablishedMidStream (CODAGT-353). The only
stale notification that could cancel this test was "pending" from
SendMessage, which the fix now ignores. Verified with a deterministic
red/green/revert cycle using delayedPendingPubsub: 5/5 FAIL without
fix (chat interrupted / Condition never satisfied), 5/5 PASS with fix,
5/5 FAIL on revert.

Fixes CODAGT-383.
Refs CODAGT-179, CODAGT-353.
This commit is contained in:
Mathias Fredriksson
2026-05-25 17:44:04 +00:00
parent 2af037ce02
commit 2a48c8721e
3 changed files with 253 additions and 7 deletions
+10 -1
View File
@@ -5782,8 +5782,17 @@ func shouldCancelChatFromControlNotification(
) bool {
status := database.ChatStatus(strings.TrimSpace(notify.Status))
switch status {
case database.ChatStatusWaiting, database.ChatStatusPending, database.ChatStatusError:
case database.ChatStatusWaiting, database.ChatStatusError:
return true
case database.ChatStatusPending:
// Pending is not a cancel signal. SendMessage and auto-promote
// produce stale echoes (the notification that triggered this
// run, or one published after processing finishes). EditMessage
// produces a live signal, but cancellation for that case is
// handled by the persistStep ownership guard at the next step
// boundary, not by the control subscriber.
// See CODAGT-383.
return false
case database.ChatStatusRunning:
worker := strings.TrimSpace(notify.WorkerID)
if worker == "" {
+26
View File
@@ -6690,3 +6690,29 @@ func TestPersistChatContextSummarySetsAPIKeyID(t *testing.T) {
}
require.True(t, foundUserSummary, "expected to find compressed user summary message")
}
func TestShouldCancelChatFromControlNotification(t *testing.T) {
t.Parallel()
myWorker := uuid.New()
otherWorker := uuid.New()
tests := []struct {
name string
notify coderdpubsub.ChatStreamNotifyMessage
want bool
}{
{"waiting cancels", coderdpubsub.ChatStreamNotifyMessage{Status: string(database.ChatStatusWaiting)}, true},
{"error cancels", coderdpubsub.ChatStreamNotifyMessage{Status: string(database.ChatStatusError)}, true},
{"pending does not cancel", coderdpubsub.ChatStreamNotifyMessage{Status: string(database.ChatStatusPending)}, false},
{"running different worker cancels", coderdpubsub.ChatStreamNotifyMessage{Status: string(database.ChatStatusRunning), WorkerID: otherWorker.String()}, true},
{"running same worker does not cancel", coderdpubsub.ChatStreamNotifyMessage{Status: string(database.ChatStatusRunning), WorkerID: myWorker.String()}, false},
{"running empty worker does not cancel", coderdpubsub.ChatStreamNotifyMessage{Status: string(database.ChatStatusRunning)}, false},
{"running malformed worker does not cancel", coderdpubsub.ChatStreamNotifyMessage{Status: string(database.ChatStatusRunning), WorkerID: "not-a-uuid"}, false},
{"unknown status does not cancel", coderdpubsub.ChatStreamNotifyMessage{Status: "completed"}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, tt.want, shouldCancelChatFromControlNotification(tt.notify, myWorker))
})
}
}
+217 -6
View File
@@ -8,6 +8,7 @@ import (
"math"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
@@ -1435,12 +1436,6 @@ func TestSubscribeRelayDialCanceledOnFastCompletion(t *testing.T) {
// condition where the relay is too slow.
func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
t.Parallel()
// TODO(CODAGT-353): Re-enable this test after the chatd notification flow
// refactor gives workers enough causal information to distinguish stale
// control NOTIFY messages from real interrupts. The current design reuses
// the same status notification shape for wake-only and interrupt intents,
// so a stale NOTIFY can cancel a new processChat run.
t.Skip("skipped until chatd notification flow refactor handles stale control notifications")
db, ps := dbtestutil.NewDB(t)
workerID := uuid.New()
@@ -1621,3 +1616,219 @@ waitForStream:
require.NotEmpty(t, messageParts,
"streaming parts should be received when relay establishes while worker is still streaming")
}
// delayedPendingPubsub wraps a real Pubsub and holds back any
// Publish call whose payload contains a "pending" status on a chat
// stream notify channel. Held messages are published when
// releaseHeld() is called. This is a test-only wrapper (no production
// code changes) that widens the race window in CODAGT-383 so the
// stale "pending" notification arrives at the worker's control
// subscriber AFTER close(controlArmed).
type delayedPendingPubsub struct {
dbpubsub.Pubsub
held chan heldMsg
}
type heldMsg struct {
event string
message []byte
}
func newDelayedPendingPubsub(inner dbpubsub.Pubsub) *delayedPendingPubsub {
return &delayedPendingPubsub{
Pubsub: inner,
held: make(chan heldMsg, 16),
}
}
func (d *delayedPendingPubsub) Publish(event string, message []byte) error {
// Intercept chat stream notify messages with status=pending.
if strings.HasPrefix(event, "chat:stream:") {
var notify coderdpubsub.ChatStreamNotifyMessage
if err := json.Unmarshal(message, &notify); err == nil && notify.Status == string(database.ChatStatusPending) {
// Hold the message; it will be replayed on releaseHeld().
d.held <- heldMsg{event: event, message: message}
return nil
}
}
return d.Pubsub.Publish(event, message)
}
// releaseHeld publishes all held pending messages on the inner Pubsub.
func (d *delayedPendingPubsub) releaseHeld() {
for {
select {
case msg := <-d.held:
_ = d.Pubsub.Publish(msg.event, msg.message)
default:
return
}
}
}
// TestSubscribeRelayDialCanceledOnFastCompletion_StalePendingDoesNotInterrupt
// is a deterministic reproduction of CODAGT-383. It uses a pubsub
// wrapper to delay the "pending" notification from SendMessage until
// the worker's control subscriber is armed (close(controlArmed) has
// executed), then replays it while the LLM is still streaming.
// With the fix applied, the stale pending notification is ignored
// and the assistant message commits successfully.
func TestSubscribeRelayDialCanceledOnFastCompletion_StalePendingDoesNotInterrupt(t *testing.T) {
t.Parallel()
db, realPS := dbtestutil.NewDB(t)
// Wrap pubsub to hold back "pending" notifications.
ps := newDelayedPendingPubsub(realPS)
workerID := uuid.New()
subscriberID := uuid.New()
workerDone := make(chan struct{})
// Gate: the LLM blocks until we release it. This keeps the
// worker in active processing while we inject the stale
// pending notification.
llmContinue := make(chan struct{})
openAIURL := chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
if !req.Stream {
return chattest.OpenAINonStreamingResponse("stale-pending-repro")
}
// Block until the test releases us. This keeps the worker
// in active processing so the stale notification can arrive
// while processChat is running.
<-llmContinue
return chattest.OpenAIStreamingResponse(
chattest.OpenAITextChunks("hello ", "world")...,
)
})
workerLogger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
worker := osschatd.New(osschatd.Config{
Logger: workerLogger,
Database: db,
ReplicaID: workerID,
Pubsub: ps,
PendingChatAcquireInterval: time.Hour,
InFlightChatStaleAfter: testutil.WaitSuperLong,
})
worker.Start()
t.Cleanup(func() {
require.NoError(t, worker.Close())
})
subscriber := newTestServer(t, db, ps, subscriberID, func(
ctx context.Context,
chatID uuid.UUID,
_ uuid.UUID,
requestHeader http.Header,
) (
[]codersdk.ChatStreamEvent,
<-chan codersdk.ChatStreamEvent,
func(),
error,
) {
select {
case <-workerDone:
case <-ctx.Done():
return nil, nil, nil, ctx.Err()
}
snapshot, relayEvents, cancel, ok := worker.Subscribe(ctx, chatID, requestHeader, math.MaxInt64)
if !ok {
return nil, nil, nil, xerrors.New("worker subscribe failed")
}
return snapshot, relayEvents, cancel, nil
}, nil)
ctx := testutil.Context(t, testutil.WaitLong)
user, org, model := seedChatDependencies(t, db)
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
chat := seedWaitingChat(t, db, org.ID, user, model, "stale-pending-repro")
_, events, subCancel, ok := subscriber.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
defer subCancel()
// SendMessage publishes {Status: "pending"} which our wrapper
// holds back instead of delivering immediately.
_, err := worker.SendMessage(ctx, osschatd.SendMessageOptions{
ChatID: chat.ID,
CreatedBy: user.ID,
Content: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
// Wait for the worker to reach "running" state, which means
// close(controlArmed) has already executed.
require.Eventually(t, func() bool {
fromDB, dbErr := db.GetChatByID(ctx, chat.ID)
if dbErr != nil {
return false
}
return fromDB.Status == database.ChatStatusRunning
}, testutil.WaitMedium, testutil.IntervalFast)
// NOW release the held "pending" notification while the LLM is
// still blocked. The notification propagates through pubsub to
// the control subscriber while the worker is actively processing.
// With the bug, shouldCancelChatFromControlNotification returns
// true for "pending" and the chat is interrupted.
ps.releaseHeld()
// Allow the stale notification to propagate through real pubsub.
// A deterministic signal (e.g. delivery callback) would require
// instrumenting the production subscriber, which is disproportionate
// for a race reproduction test. The sleep widens the window so the
// notification arrives while the LLM is still blocked.
time.Sleep(200 * time.Millisecond)
// Release the LLM to complete (or, with the bug, the worker is
// already interrupted so this unblocks the OpenAI handler which
// then returns to a canceled context).
close(llmContinue)
// Give the worker time to finish.
require.Eventually(t, func() bool {
fromDB, dbErr := db.GetChatByID(ctx, chat.ID)
if dbErr != nil {
return false
}
return fromDB.Status == database.ChatStatusWaiting ||
fromDB.Status == database.ChatStatusError
}, testutil.WaitMedium, testutil.IntervalFast)
close(workerDone)
// Collect events. With the bug, the chat was interrupted and
// no assistant message was committed, so this times out.
var committedAssistantMsgs int
timedOut := false
func() {
shortCtx, shortCancel := context.WithTimeout(ctx, testutil.WaitShort)
defer shortCancel()
for {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeMessage &&
event.Message != nil &&
event.Message.Role == codersdk.ChatMessageRoleAssistant {
committedAssistantMsgs++
return
}
case <-shortCtx.Done():
timedOut = true
return
}
}
}()
if timedOut {
fromDB, dbErr := db.GetChatByID(ctx, chat.ID)
require.NoError(t, dbErr)
t.Logf("chat status: %s (interrupted=%v)", fromDB.Status, fromDB.Status == database.ChatStatusWaiting)
}
require.Equal(t, 1, committedAssistantMsgs,
"committed assistant message should arrive; stale pending notification must not interrupt processing")
}