fix(coderd): subscribe to pubsub before accepting websocket in watchChats

The watchChats handler called SubscribeWithErr after websocket.Accept,
creating a window where clients could trigger events before the
subscription was active. Move the subscription before the accept so
events accumulate in the pubsub internal queue and drain naturally
once the encoder is ready.

Fixes: https://linear.app/codercom/issue/CODAGT-480
This commit is contained in:
Mathias Fredriksson
2026-06-01 20:20:52 +00:00
parent 2ad2f7869d
commit 89d470c112
2 changed files with 99 additions and 54 deletions
+55 -28
View File
@@ -174,8 +174,61 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) {
apiKey := httpmw.APIKey(r)
logger := api.Logger.Named("chat_watcher")
// Subscribe before accepting the websocket so the subscription
// is active when the client's Dial returns.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
encoder *json.Encoder
encoderReady = make(chan struct{})
// Capture before WebsocketNetConn reassigns ctx (data race).
ctxDone = ctx.Done()
)
cancelSubscribe, err := api.Pubsub.SubscribeWithErr(pubsub.ChatWatchEventChannel(apiKey.UserID),
pubsub.HandleChatWatchEvent(
func(cbCtx context.Context, payload codersdk.ChatWatchEvent, err error) {
if err != nil {
logger.Error(cbCtx, "chat watch event subscription error", slog.Error(err))
return
}
select {
case <-encoderReady:
case <-ctxDone:
return
case <-cbCtx.Done():
return
}
// encoderReady may close with encoder still nil on error paths.
if encoder == nil {
return
}
// The encoder is only written from the pubsub delivery
// goroutine, which processes messages serially. Do not
// add a second write path without synchronization.
if err := encoder.Encode(payload); err != nil {
logger.Debug(cbCtx, "failed to send chat watch event", slog.Error(err))
cancel()
return
}
},
))
if err != nil {
close(encoderReady)
logger.Error(ctx, "failed to subscribe to chat watch events", slog.Error(err))
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to subscribe to chat events.",
Detail: err.Error(),
})
return
}
defer cancelSubscribe()
conn, err := websocket.Accept(rw, r, nil)
if err != nil {
close(encoderReady)
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to open chat watch stream.",
Detail: err.Error(),
@@ -183,9 +236,6 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) {
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
_ = conn.CloseRead(context.Background())
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageText)
@@ -193,31 +243,8 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) {
go httpapi.HeartbeatClose(ctx, logger, cancel, conn)
// The encoder is only written from the SubscribeWithErr callback,
// which delivers serially per subscription. Do not add a second
// write path without introducing synchronization.
encoder := json.NewEncoder(wsNetConn)
cancelSubscribe, err := api.Pubsub.SubscribeWithErr(pubsub.ChatWatchEventChannel(apiKey.UserID),
pubsub.HandleChatWatchEvent(
func(ctx context.Context, payload codersdk.ChatWatchEvent, err error) {
if err != nil {
logger.Error(ctx, "chat watch event subscription error", slog.Error(err))
return
}
if err := encoder.Encode(payload); err != nil {
logger.Debug(ctx, "failed to send chat watch event", slog.Error(err))
cancel()
return
}
},
))
if err != nil {
logger.Error(ctx, "failed to subscribe to chat watch events", slog.Error(err))
_ = conn.Close(websocket.StatusInternalError, "Failed to subscribe to chat events.")
return
}
defer cancelSubscribe()
encoder = json.NewEncoder(wsNetConn)
close(encoderReady)
<-ctx.Done()
}
+44 -26
View File
@@ -1798,6 +1798,45 @@ func TestWatchChats(t *testing.T) {
}
}
})
t.Run("AcceptFailureNoPanic", func(t *testing.T) {
t.Parallel()
// Verify that canceling the request context before the
// websocket upgrade completes does not panic or leak
// goroutines, even when a pubsub event is queued.
client, api := newChatClientWithAPI(t)
user := coderdtest.CreateFirstUser(t, client.Client)
_ = createChatModelConfig(t, client)
// Use a context we can cancel immediately to abort the
// websocket handshake.
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong))
// Queue a pubsub event so the callback fires during or
// after the canceled Accept.
event := codersdk.ChatWatchEvent{
Kind: codersdk.ChatWatchEventKindCreated,
Chat: codersdk.Chat{ID: uuid.New()},
}
payload, err := json.Marshal(event)
require.NoError(t, err)
// Cancel before Dial so Accept never succeeds.
cancel()
// Publish after cancel to exercise the callback's defensive
// paths (ctxDone select, nil encoder guard).
err = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload)
require.NoError(t, err)
//nolint:usetesting // The canceled ctx is intentional.
_, err = client.Dial(ctx, "/api/experimental/chats/watch", nil)
require.Error(t, err, "Dial should fail with canceled context")
// No assertion beyond "no panic". Goroutine leak detection
// is handled by TestMain's goleak.VerifyTestMain.
})
t.Run("CreatedEventIncludesAllChatFields", func(t *testing.T) {
t.Parallel()
@@ -1926,31 +1965,11 @@ func TestWatchChats(t *testing.T) {
payload, err := json.Marshal(event)
require.NoError(t, err)
// Publish the event in a goroutine that keeps retrying.
// When the WebSocket Dial returns, the server has completed
// the HTTP upgrade but may not have called SubscribeWithErr
// yet. If we publish only once, the message can arrive
// before the subscription is active and be silently dropped,
// causing the read loop to block until the context deadline.
// Re-publishing on a short ticker guarantees that at least
// one publish lands after the subscription is ready.
publishDone := make(chan struct{})
go func() {
ticker := time.NewTicker(testutil.IntervalFast)
defer ticker.Stop()
for {
// Publish immediately on the first iteration,
// then again on each tick.
_ = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload)
select {
case <-publishDone:
return
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()
// A single publish is sufficient because the subscription
// is active before websocket.Accept (and thus before Dial
// returns). This serves as a regression test for the fix.
err = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload)
require.NoError(t, err)
var received codersdk.ChatWatchEvent
for {
@@ -1962,7 +1981,6 @@ func TestWatchChats(t *testing.T) {
break
}
}
close(publishDone)
// Verify the event carries the full DiffStatus.
require.NotNil(t, received.Chat.DiffStatus, "diff_status_change event must include DiffStatus")