mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(chat): fix streaming bugs in edit notifications, persist race, and frontend reconnect (#22737)
This commit is contained in:
+20
-3
@@ -1330,10 +1330,14 @@ func (p *Server) Subscribe(
|
||||
}
|
||||
return
|
||||
case notify := <-notifications:
|
||||
if notify.AfterMessageID > 0 {
|
||||
if notify.AfterMessageID > 0 || notify.FullRefresh {
|
||||
afterID := lastMessageID
|
||||
if notify.FullRefresh {
|
||||
afterID = 0
|
||||
}
|
||||
newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: lastMessageID,
|
||||
AfterID: afterID,
|
||||
})
|
||||
if msgErr != nil {
|
||||
p.logger.Warn(mergedCtx, "failed to get chat messages after pubsub notification",
|
||||
@@ -1642,7 +1646,7 @@ func (p *Server) publishEditedMessage(chatID uuid.UUID, message database.ChatMes
|
||||
Message: &sdkMessage,
|
||||
})
|
||||
p.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{
|
||||
AfterMessageID: 0,
|
||||
FullRefresh: true,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2209,6 +2213,19 @@ func (p *Server) runChat(
|
||||
|
||||
var insertedMessages []database.ChatMessage
|
||||
err := p.db.InTx(func(tx database.Store) error {
|
||||
// Verify this worker still owns the chat before
|
||||
// inserting messages. This closes the race where
|
||||
// EditMessage truncates history and clears worker_id
|
||||
// while persistInterruptedStep (which uses an
|
||||
// uncancelable context) is still running.
|
||||
lockedChat, lockErr := tx.GetChatByIDForUpdate(persistCtx, chat.ID)
|
||||
if lockErr != nil {
|
||||
return xerrors.Errorf("lock chat for persist: %w", lockErr)
|
||||
}
|
||||
if !lockedChat.WorkerID.Valid || lockedChat.WorkerID.UUID != p.workerID {
|
||||
return chatloop.ErrInterrupted
|
||||
}
|
||||
|
||||
if len(assistantBlocks) > 0 {
|
||||
assistantContent, marshalErr := chatprompt.MarshalContent(assistantBlocks, nil)
|
||||
if marshalErr != nil {
|
||||
|
||||
@@ -34,4 +34,9 @@ type ChatStreamNotifyMessage struct {
|
||||
|
||||
// QueueUpdate is set when the queued messages change.
|
||||
QueueUpdate bool `json:"queue_update,omitempty"`
|
||||
|
||||
// FullRefresh signals that subscribers should re-fetch all
|
||||
// messages from the beginning (e.g. after an edit that
|
||||
// truncates message history).
|
||||
FullRefresh bool `json:"full_refresh,omitempty"`
|
||||
}
|
||||
|
||||
@@ -670,6 +670,17 @@ export const useChatStore = (
|
||||
continue;
|
||||
}
|
||||
const { changed } = store.upsertDurableMessage(message);
|
||||
// Keep lastMessageIdRef in sync with
|
||||
// stream-delivered messages so reconnections use
|
||||
// the correct after_id and don't re-fetch or
|
||||
// miss events.
|
||||
if (
|
||||
message.id !== undefined &&
|
||||
(lastMessageIdRef.current === undefined ||
|
||||
message.id > lastMessageIdRef.current)
|
||||
) {
|
||||
lastMessageIdRef.current = message.id;
|
||||
}
|
||||
if (changed) {
|
||||
scheduleStreamReset();
|
||||
}
|
||||
@@ -810,6 +821,13 @@ export const useChatStore = (
|
||||
if (reconnectAttempt === 0) {
|
||||
store.setStreamError("Chat stream disconnected. Reconnecting…");
|
||||
}
|
||||
// Clear "running" status on disconnect so the UI
|
||||
// doesn't show a stale spinner. The reconnected
|
||||
// stream will deliver the authoritative status.
|
||||
const currentStatus = store.getSnapshot().chatStatus;
|
||||
if (currentStatus === "running") {
|
||||
store.setChatStatus(null);
|
||||
}
|
||||
scheduleReconnect();
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user