From 89d470c1127248f4b3dad0072bd9ea449021c650 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 1 Jun 2026 20:20:52 +0000 Subject: [PATCH] fix(coderd): subscribe to pubsub before accepting websocket in watchChats The watchChats handler called SubscribeWithErr after websocket.Accept, creating a window where clients could trigger events before the subscription was active. Move the subscription before the accept so events accumulate in the pubsub internal queue and drain naturally once the encoder is ready. Fixes: https://linear.app/codercom/issue/CODAGT-480 --- coderd/exp_chats.go | 83 ++++++++++++++++++++++++++-------------- coderd/exp_chats_test.go | 70 ++++++++++++++++++++------------- 2 files changed, 99 insertions(+), 54 deletions(-) diff --git a/coderd/exp_chats.go b/coderd/exp_chats.go index 01b1ae386f..c2f90cd2d2 100644 --- a/coderd/exp_chats.go +++ b/coderd/exp_chats.go @@ -174,8 +174,61 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) { apiKey := httpmw.APIKey(r) logger := api.Logger.Named("chat_watcher") + // Subscribe before accepting the websocket so the subscription + // is active when the client's Dial returns. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + encoder *json.Encoder + encoderReady = make(chan struct{}) + // Capture before WebsocketNetConn reassigns ctx (data race). + ctxDone = ctx.Done() + ) + + cancelSubscribe, err := api.Pubsub.SubscribeWithErr(pubsub.ChatWatchEventChannel(apiKey.UserID), + pubsub.HandleChatWatchEvent( + func(cbCtx context.Context, payload codersdk.ChatWatchEvent, err error) { + if err != nil { + logger.Error(cbCtx, "chat watch event subscription error", slog.Error(err)) + return + } + select { + case <-encoderReady: + case <-ctxDone: + return + case <-cbCtx.Done(): + return + } + + // encoderReady may close with encoder still nil on error paths. + if encoder == nil { + return + } + // The encoder is only written from the pubsub delivery + // goroutine, which processes messages serially. Do not + // add a second write path without synchronization. + if err := encoder.Encode(payload); err != nil { + logger.Debug(cbCtx, "failed to send chat watch event", slog.Error(err)) + cancel() + return + } + }, + )) + if err != nil { + close(encoderReady) + logger.Error(ctx, "failed to subscribe to chat watch events", slog.Error(err)) + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Failed to subscribe to chat events.", + Detail: err.Error(), + }) + return + } + defer cancelSubscribe() + conn, err := websocket.Accept(rw, r, nil) if err != nil { + close(encoderReady) httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to open chat watch stream.", Detail: err.Error(), @@ -183,9 +236,6 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) { return } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - _ = conn.CloseRead(context.Background()) ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageText) @@ -193,31 +243,8 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) { go httpapi.HeartbeatClose(ctx, logger, cancel, conn) - // The encoder is only written from the SubscribeWithErr callback, - // which delivers serially per subscription. Do not add a second - // write path without introducing synchronization. - encoder := json.NewEncoder(wsNetConn) - - cancelSubscribe, err := api.Pubsub.SubscribeWithErr(pubsub.ChatWatchEventChannel(apiKey.UserID), - pubsub.HandleChatWatchEvent( - func(ctx context.Context, payload codersdk.ChatWatchEvent, err error) { - if err != nil { - logger.Error(ctx, "chat watch event subscription error", slog.Error(err)) - return - } - if err := encoder.Encode(payload); err != nil { - logger.Debug(ctx, "failed to send chat watch event", slog.Error(err)) - cancel() - return - } - }, - )) - if err != nil { - logger.Error(ctx, "failed to subscribe to chat watch events", slog.Error(err)) - _ = conn.Close(websocket.StatusInternalError, "Failed to subscribe to chat events.") - return - } - defer cancelSubscribe() + encoder = json.NewEncoder(wsNetConn) + close(encoderReady) <-ctx.Done() } diff --git a/coderd/exp_chats_test.go b/coderd/exp_chats_test.go index b9718c996e..458b0dc125 100644 --- a/coderd/exp_chats_test.go +++ b/coderd/exp_chats_test.go @@ -1798,6 +1798,45 @@ func TestWatchChats(t *testing.T) { } } }) + t.Run("AcceptFailureNoPanic", func(t *testing.T) { + t.Parallel() + + // Verify that canceling the request context before the + // websocket upgrade completes does not panic or leak + // goroutines, even when a pubsub event is queued. + client, api := newChatClientWithAPI(t) + user := coderdtest.CreateFirstUser(t, client.Client) + _ = createChatModelConfig(t, client) + + // Use a context we can cancel immediately to abort the + // websocket handshake. + ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong)) + + // Queue a pubsub event so the callback fires during or + // after the canceled Accept. + event := codersdk.ChatWatchEvent{ + Kind: codersdk.ChatWatchEventKindCreated, + Chat: codersdk.Chat{ID: uuid.New()}, + } + payload, err := json.Marshal(event) + require.NoError(t, err) + + // Cancel before Dial so Accept never succeeds. + cancel() + + // Publish after cancel to exercise the callback's defensive + // paths (ctxDone select, nil encoder guard). + err = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload) + require.NoError(t, err) + + //nolint:usetesting // The canceled ctx is intentional. + _, err = client.Dial(ctx, "/api/experimental/chats/watch", nil) + require.Error(t, err, "Dial should fail with canceled context") + + // No assertion beyond "no panic". Goroutine leak detection + // is handled by TestMain's goleak.VerifyTestMain. + }) + t.Run("CreatedEventIncludesAllChatFields", func(t *testing.T) { t.Parallel() @@ -1926,31 +1965,11 @@ func TestWatchChats(t *testing.T) { payload, err := json.Marshal(event) require.NoError(t, err) - // Publish the event in a goroutine that keeps retrying. - // When the WebSocket Dial returns, the server has completed - // the HTTP upgrade but may not have called SubscribeWithErr - // yet. If we publish only once, the message can arrive - // before the subscription is active and be silently dropped, - // causing the read loop to block until the context deadline. - // Re-publishing on a short ticker guarantees that at least - // one publish lands after the subscription is ready. - publishDone := make(chan struct{}) - go func() { - ticker := time.NewTicker(testutil.IntervalFast) - defer ticker.Stop() - for { - // Publish immediately on the first iteration, - // then again on each tick. - _ = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload) - select { - case <-publishDone: - return - case <-ctx.Done(): - return - case <-ticker.C: - } - } - }() + // A single publish is sufficient because the subscription + // is active before websocket.Accept (and thus before Dial + // returns). This serves as a regression test for the fix. + err = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload) + require.NoError(t, err) var received codersdk.ChatWatchEvent for { @@ -1962,7 +1981,6 @@ func TestWatchChats(t *testing.T) { break } } - close(publishDone) // Verify the event carries the full DiffStatus. require.NotNil(t, received.Chat.DiffStatus, "diff_status_change event must include DiffStatus")