diff --git a/coderd/chatd/chatd.go b/coderd/chatd/chatd.go index eadd7e3569..2cad01f3e3 100644 --- a/coderd/chatd/chatd.go +++ b/coderd/chatd/chatd.go @@ -1330,10 +1330,14 @@ func (p *Server) Subscribe( } return case notify := <-notifications: - if notify.AfterMessageID > 0 { + if notify.AfterMessageID > 0 || notify.FullRefresh { + afterID := lastMessageID + if notify.FullRefresh { + afterID = 0 + } newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{ ChatID: chatID, - AfterID: lastMessageID, + AfterID: afterID, }) if msgErr != nil { p.logger.Warn(mergedCtx, "failed to get chat messages after pubsub notification", @@ -1642,7 +1646,7 @@ func (p *Server) publishEditedMessage(chatID uuid.UUID, message database.ChatMes Message: &sdkMessage, }) p.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{ - AfterMessageID: 0, + FullRefresh: true, }) } @@ -2209,6 +2213,19 @@ func (p *Server) runChat( var insertedMessages []database.ChatMessage err := p.db.InTx(func(tx database.Store) error { + // Verify this worker still owns the chat before + // inserting messages. This closes the race where + // EditMessage truncates history and clears worker_id + // while persistInterruptedStep (which uses an + // uncancelable context) is still running. + lockedChat, lockErr := tx.GetChatByIDForUpdate(persistCtx, chat.ID) + if lockErr != nil { + return xerrors.Errorf("lock chat for persist: %w", lockErr) + } + if !lockedChat.WorkerID.Valid || lockedChat.WorkerID.UUID != p.workerID { + return chatloop.ErrInterrupted + } + if len(assistantBlocks) > 0 { assistantContent, marshalErr := chatprompt.MarshalContent(assistantBlocks, nil) if marshalErr != nil { diff --git a/coderd/pubsub/chatstreamnotify.go b/coderd/pubsub/chatstreamnotify.go index 078691c934..d14a657d66 100644 --- a/coderd/pubsub/chatstreamnotify.go +++ b/coderd/pubsub/chatstreamnotify.go @@ -34,4 +34,9 @@ type ChatStreamNotifyMessage struct { // QueueUpdate is set when the queued messages change. QueueUpdate bool `json:"queue_update,omitempty"` + + // FullRefresh signals that subscribers should re-fetch all + // messages from the beginning (e.g. after an edit that + // truncates message history). + FullRefresh bool `json:"full_refresh,omitempty"` } diff --git a/site/src/pages/AgentsPage/AgentDetail/ChatContext.ts b/site/src/pages/AgentsPage/AgentDetail/ChatContext.ts index 2b1d20d595..6143bf9067 100644 --- a/site/src/pages/AgentsPage/AgentDetail/ChatContext.ts +++ b/site/src/pages/AgentsPage/AgentDetail/ChatContext.ts @@ -670,6 +670,17 @@ export const useChatStore = ( continue; } const { changed } = store.upsertDurableMessage(message); + // Keep lastMessageIdRef in sync with + // stream-delivered messages so reconnections use + // the correct after_id and don't re-fetch or + // miss events. + if ( + message.id !== undefined && + (lastMessageIdRef.current === undefined || + message.id > lastMessageIdRef.current) + ) { + lastMessageIdRef.current = message.id; + } if (changed) { scheduleStreamReset(); } @@ -810,6 +821,13 @@ export const useChatStore = ( if (reconnectAttempt === 0) { store.setStreamError("Chat stream disconnected. Reconnecting…"); } + // Clear "running" status on disconnect so the UI + // doesn't show a stale spinner. The reconnected + // stream will deliver the authoritative status. + const currentStatus = store.getSnapshot().chatStatus; + if (currentStatus === "running") { + store.setChatStatus(null); + } scheduleReconnect(); };