diff --git a/coderd/exp_chats.go b/coderd/exp_chats.go index 01b1ae386f..c2f90cd2d2 100644 --- a/coderd/exp_chats.go +++ b/coderd/exp_chats.go @@ -174,8 +174,61 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) { apiKey := httpmw.APIKey(r) logger := api.Logger.Named("chat_watcher") + // Subscribe before accepting the websocket so the subscription + // is active when the client's Dial returns. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + encoder *json.Encoder + encoderReady = make(chan struct{}) + // Capture before WebsocketNetConn reassigns ctx (data race). + ctxDone = ctx.Done() + ) + + cancelSubscribe, err := api.Pubsub.SubscribeWithErr(pubsub.ChatWatchEventChannel(apiKey.UserID), + pubsub.HandleChatWatchEvent( + func(cbCtx context.Context, payload codersdk.ChatWatchEvent, err error) { + if err != nil { + logger.Error(cbCtx, "chat watch event subscription error", slog.Error(err)) + return + } + select { + case <-encoderReady: + case <-ctxDone: + return + case <-cbCtx.Done(): + return + } + + // encoderReady may close with encoder still nil on error paths. + if encoder == nil { + return + } + // The encoder is only written from the pubsub delivery + // goroutine, which processes messages serially. Do not + // add a second write path without synchronization. + if err := encoder.Encode(payload); err != nil { + logger.Debug(cbCtx, "failed to send chat watch event", slog.Error(err)) + cancel() + return + } + }, + )) + if err != nil { + close(encoderReady) + logger.Error(ctx, "failed to subscribe to chat watch events", slog.Error(err)) + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Failed to subscribe to chat events.", + Detail: err.Error(), + }) + return + } + defer cancelSubscribe() + conn, err := websocket.Accept(rw, r, nil) if err != nil { + close(encoderReady) httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to open chat watch stream.", Detail: err.Error(), @@ -183,9 +236,6 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) { return } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - _ = conn.CloseRead(context.Background()) ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageText) @@ -193,31 +243,8 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) { go httpapi.HeartbeatClose(ctx, logger, cancel, conn) - // The encoder is only written from the SubscribeWithErr callback, - // which delivers serially per subscription. Do not add a second - // write path without introducing synchronization. - encoder := json.NewEncoder(wsNetConn) - - cancelSubscribe, err := api.Pubsub.SubscribeWithErr(pubsub.ChatWatchEventChannel(apiKey.UserID), - pubsub.HandleChatWatchEvent( - func(ctx context.Context, payload codersdk.ChatWatchEvent, err error) { - if err != nil { - logger.Error(ctx, "chat watch event subscription error", slog.Error(err)) - return - } - if err := encoder.Encode(payload); err != nil { - logger.Debug(ctx, "failed to send chat watch event", slog.Error(err)) - cancel() - return - } - }, - )) - if err != nil { - logger.Error(ctx, "failed to subscribe to chat watch events", slog.Error(err)) - _ = conn.Close(websocket.StatusInternalError, "Failed to subscribe to chat events.") - return - } - defer cancelSubscribe() + encoder = json.NewEncoder(wsNetConn) + close(encoderReady) <-ctx.Done() } diff --git a/coderd/exp_chats_test.go b/coderd/exp_chats_test.go index b9718c996e..458b0dc125 100644 --- a/coderd/exp_chats_test.go +++ b/coderd/exp_chats_test.go @@ -1798,6 +1798,45 @@ func TestWatchChats(t *testing.T) { } } }) + t.Run("AcceptFailureNoPanic", func(t *testing.T) { + t.Parallel() + + // Verify that canceling the request context before the + // websocket upgrade completes does not panic or leak + // goroutines, even when a pubsub event is queued. + client, api := newChatClientWithAPI(t) + user := coderdtest.CreateFirstUser(t, client.Client) + _ = createChatModelConfig(t, client) + + // Use a context we can cancel immediately to abort the + // websocket handshake. + ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong)) + + // Queue a pubsub event so the callback fires during or + // after the canceled Accept. + event := codersdk.ChatWatchEvent{ + Kind: codersdk.ChatWatchEventKindCreated, + Chat: codersdk.Chat{ID: uuid.New()}, + } + payload, err := json.Marshal(event) + require.NoError(t, err) + + // Cancel before Dial so Accept never succeeds. + cancel() + + // Publish after cancel to exercise the callback's defensive + // paths (ctxDone select, nil encoder guard). + err = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload) + require.NoError(t, err) + + //nolint:usetesting // The canceled ctx is intentional. + _, err = client.Dial(ctx, "/api/experimental/chats/watch", nil) + require.Error(t, err, "Dial should fail with canceled context") + + // No assertion beyond "no panic". Goroutine leak detection + // is handled by TestMain's goleak.VerifyTestMain. + }) + t.Run("CreatedEventIncludesAllChatFields", func(t *testing.T) { t.Parallel() @@ -1926,31 +1965,11 @@ func TestWatchChats(t *testing.T) { payload, err := json.Marshal(event) require.NoError(t, err) - // Publish the event in a goroutine that keeps retrying. - // When the WebSocket Dial returns, the server has completed - // the HTTP upgrade but may not have called SubscribeWithErr - // yet. If we publish only once, the message can arrive - // before the subscription is active and be silently dropped, - // causing the read loop to block until the context deadline. - // Re-publishing on a short ticker guarantees that at least - // one publish lands after the subscription is ready. - publishDone := make(chan struct{}) - go func() { - ticker := time.NewTicker(testutil.IntervalFast) - defer ticker.Stop() - for { - // Publish immediately on the first iteration, - // then again on each tick. - _ = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload) - select { - case <-publishDone: - return - case <-ctx.Done(): - return - case <-ticker.C: - } - } - }() + // A single publish is sufficient because the subscription + // is active before websocket.Accept (and thus before Dial + // returns). This serves as a regression test for the fix. + err = api.Pubsub.Publish(coderdpubsub.ChatWatchEventChannel(user.UserID), payload) + require.NoError(t, err) var received codersdk.ChatWatchEvent for { @@ -1962,7 +1981,6 @@ func TestWatchChats(t *testing.T) { break } } - close(publishDone) // Verify the event carries the full DiffStatus. require.NotNil(t, received.Chat.DiffStatus, "diff_status_change event must include DiffStatus")