fix(coderd/x/chatd): deliver out-of-order durable messages on subscribe (#25433)

The subscriber advanced a single delivery cursor on each notify and
trusted it for both lookups. Concurrent publishMessage calls and PG
NOTIFY commit ordering let cache appends and notifies arrive out of
ID order, after which a late notify would scan above its own message
and drop it. The DB fallback was also skipped whenever the cache
delivered anything, hiding cross-replica messages that only the DB
held.

The cursor becomes a high-water mark, not the lookup key. Notifies
trigger a rescan over the gap they describe and dedupe per
subscription, and the DB pass runs every time so cross-replica
messages can't get eaten by a local cache hit.

Closes coder/internal#1525
Closes CODAGT-357
This commit is contained in:
Mathias Fredriksson
2026-05-21 10:35:41 +03:00
committed by GitHub
parent 57ed244de1
commit ec1e861152
3 changed files with 271 additions and 11 deletions
+44 -8
View File
@@ -4915,6 +4915,7 @@ func (p *Server) SubscribeAuthorized(
// is already active so no notifications can be lost during this
// window.
initialSnapshot := make([]codersdk.ChatStreamEvent, 0)
delivered := map[int64]struct{}{}
// Add local same-replica message_parts to the snapshot. Retry comes
// from state.currentRetry, not the event buffer, so late joiners see
// only the latest phase rather than a stale buffered retry event.
@@ -4959,6 +4960,7 @@ func (p *Server) SubscribeAuthorized(
ChatID: chatID,
Message: &sdkMsg,
})
delivered[msg.ID] = struct{}{}
}
}
@@ -5077,35 +5079,59 @@ func (p *Server) SubscribeAuthorized(
if notify.AfterMessageID > 0 || notify.FullRefresh {
if notify.FullRefresh {
lastMessageID = 0
clear(delivered)
}
var (
deliveredCount int
source string
)
cached := p.getCachedDurableMessages(chatID, lastMessageID)
// Notifies can arrive out of order. Rescan from
// min(AfterMessageID, lastMessageID) to cover the gap,
// floored at afterMessageID to respect the subscription
// boundary. The delivered set deduplicates.
lookupAfter := lastMessageID
if !notify.FullRefresh {
lookupAfter = max(afterMessageID, min(notify.AfterMessageID, lastMessageID))
}
cached := p.getCachedDurableMessages(chatID, lookupAfter)
if !notify.FullRefresh && len(cached) > 0 {
source = "cache"
for _, event := range cached {
if event.Message == nil {
continue
}
if _, ok := delivered[event.Message.ID]; ok {
continue
}
select {
case <-mergedCtx.Done():
return
case mergedEvents <- event:
}
delivered[event.Message.ID] = struct{}{}
if event.Message.ID > lastMessageID {
lastMessageID = event.Message.ID
}
deliveredCount = len(cached)
} else if newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{
deliveredCount++
source = "cache"
}
}
// DB pass picks up cross-replica messages the local cache
// cannot have. Delivered set dedupes against the cache pass.
newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: lastMessageID,
}); msgErr != nil {
AfterID: lookupAfter,
})
if msgErr != nil {
p.logger.Warn(mergedCtx, "failed to get chat messages after pubsub notification",
slog.F("chat_id", chatID),
slog.Error(msgErr),
)
} else {
source = "db"
for _, msg := range newMessages {
if msg.ID <= lastMessageID {
if msg.ID <= lookupAfter {
continue
}
if _, ok := delivered[msg.ID]; ok {
continue
}
sdkMsg := db2sdk.ChatMessage(msg)
@@ -5118,14 +5144,24 @@ func (p *Server) SubscribeAuthorized(
Message: &sdkMsg,
}:
}
delivered[msg.ID] = struct{}{}
if msg.ID > lastMessageID {
lastMessageID = msg.ID
}
deliveredCount++
switch source {
case "":
source = "db"
case "cache":
source = "cache+db"
}
}
}
// Marker for ENG-2645: subscriber delivered durable messages.
p.logger.Debug(mergedCtx, "stream subscriber delivered messages",
slog.F("chat_id", chatID),
slog.F("after_message_id", notify.AfterMessageID),
slog.F("lookup_after", lookupAfter),
slog.F("source", source),
slog.F("delivered_count", deliveredCount),
slog.F("last_message_id", lastMessageID),
+13 -1
View File
@@ -1999,7 +1999,7 @@ func TestTurnWorkspaceContext_EnsureWorkspaceAgentIgnoresCachedAgentForDifferent
require.Equal(t, updatedChat, currentChat)
}
func TestSubscribeSkipsDatabaseCatchupForLocallyDeliveredMessage(t *testing.T) {
func TestSubscribeDedupesLocallyDeliveredMessageOnNotifyCatchup(t *testing.T) {
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())
@@ -2028,6 +2028,12 @@ func TestSubscribeSkipsDatabaseCatchupForLocallyDeliveredMessage(t *testing.T) {
AfterID: 0,
}).Return([]database.ChatMessage{initialMessage}, nil),
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
// DB catchup runs unconditionally on every notify; the delivered
// set dedupes against locally-delivered messages.
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: 1,
}).Return(nil, nil),
)
server := newSubscribeTestServer(t, db)
@@ -2071,6 +2077,12 @@ func TestSubscribeUsesDurableCacheWhenLocalMessageWasNotDelivered(t *testing.T)
AfterID: 0,
}).Return([]database.ChatMessage{initialMessage}, nil),
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
// DB catchup runs unconditionally; cached id=2 is deduped via
// the delivered set so this query returning nil is sufficient.
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: 1,
}).Return(nil, nil),
)
server := newSubscribeTestServer(t, db)
@@ -0,0 +1,212 @@
package chatd
import (
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmock"
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
)
// TestSubscribeDeliversOutOfOrderDurableMessage tests that a
// late-arriving lower-ID durable message is delivered when a
// higher-ID was already cached and sent.
func TestSubscribeDeliversOutOfOrderDurableMessage(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitMedium)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
chatID := uuid.New()
chat := database.Chat{ID: chatID, Status: database.ChatStatusRequiresAction}
initialUser := database.ChatMessage{ID: 3, ChatID: chatID, Role: database.ChatMessageRoleUser}
initialAssistant := database.ChatMessage{ID: 4, ChatID: chatID, Role: database.ChatMessageRoleAssistant}
gomock.InOrder(
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: 0,
}).Return([]database.ChatMessage{initialUser, initialAssistant}, nil),
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
)
// Notify-driven catch-up queries return nothing so the test only
// exercises the cache delivery path.
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
server := newSubscribeTestServer(t, db)
toolResult := codersdk.ChatMessage{ID: 5, ChatID: chatID, Role: codersdk.ChatMessageRoleTool}
resumed := codersdk.ChatMessage{ID: 7, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant}
promoted := codersdk.ChatMessage{ID: 6, ChatID: chatID, Role: codersdk.ChatMessageRoleUser}
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID,
Message: &codersdk.ChatMessage{ID: 4, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant},
})
_, events, cancel, ok := server.Subscribe(ctx, chatID, nil, 0)
require.True(t, ok)
defer cancel()
// Cache id=5 and id=7, but not id=6, then emit the notify for
// id=5. The merge goroutine drains [5, 7] from the cache.
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &toolResult,
})
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &resumed,
})
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 4})
first := testutil.RequireReceive(ctx, t, events)
require.Equal(t, codersdk.ChatStreamEventTypeMessage, first.Type)
require.NotNil(t, first.Message)
require.Equal(t, int64(5), first.Message.ID)
second := testutil.RequireReceive(ctx, t, events)
require.Equal(t, codersdk.ChatStreamEventTypeMessage, second.Type)
require.NotNil(t, second.Message)
require.Equal(t, int64(7), second.Message.ID)
// Cache id=6 after the merge goroutine has already advanced
// lastMessageID to 7, then emit the notify for id=6.
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &promoted,
})
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 5})
third := testutil.RequireReceive(ctx, t, events)
require.Equal(t, codersdk.ChatStreamEventTypeMessage, third.Type)
require.NotNil(t, third.Message)
require.Equal(t, int64(6), third.Message.ID)
requireNoStreamEvent(t, events, 200*time.Millisecond)
}
// TestSubscribeRespectsAfterMessageIDOnLateNotify tests that
// lookupAfter never drops below afterMessageID, preventing
// re-emission of messages the client already has via REST.
func TestSubscribeRespectsAfterMessageIDOnLateNotify(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitMedium)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
chatID := uuid.New()
chat := database.Chat{ID: chatID, Status: database.ChatStatusRunning}
gomock.InOrder(
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: 100,
}).Return(nil, nil),
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
)
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
server := newSubscribeTestServer(t, db)
// Seed the cache with messages the client claims to already have
// (id<=100) plus one new message (id=101).
for _, id := range []int64{96, 97, 98, 99, 100, 101} {
msg := &codersdk.ChatMessage{ID: id, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant}
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: msg,
})
}
_, events, cancel, ok := server.Subscribe(ctx, chatID, nil, 100)
require.True(t, ok)
defer cancel()
// A stale notify with AfterMessageID=95 would naively pull
// id=96..101 back from the cache; only id=101 should reach the
// live stream because the client already has 96-100.
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 95})
ev := testutil.RequireReceive(ctx, t, events)
require.Equal(t, codersdk.ChatStreamEventTypeMessage, ev.Type)
require.NotNil(t, ev.Message)
require.Equal(t, int64(101), ev.Message.ID,
"messages at or below afterMessageID must not be re-emitted")
requireNoStreamEvent(t, events, 200*time.Millisecond)
}
// TestSubscribeRunsDBFallbackWhenCacheDeliversUnrelatedMessage tests
// that the DB fallback runs even when the cache delivers, so
// cross-replica messages are not dropped.
func TestSubscribeRunsDBFallbackWhenCacheDeliversUnrelatedMessage(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitMedium)
ctrl := gomock.NewController(t)
db := dbmock.NewMockStore(ctrl)
chatID := uuid.New()
chat := database.Chat{ID: chatID, Status: database.ChatStatusRunning}
crossReplica := database.ChatMessage{ID: 6, ChatID: chatID, Role: database.ChatMessageRoleUser}
gomock.InOrder(
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
// Snapshot: nothing above the client's afterMessageID=5 yet.
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: 5,
}).Return(nil, nil),
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
// Notify catchup: the cross-replica message lives only in the
// DB on this replica.
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: 5,
}).Return([]database.ChatMessage{crossReplica}, nil),
)
server := newSubscribeTestServer(t, db)
// Cache a locally-published higher-ID message so the cache pass
// has something to deliver without covering id=6.
localOnly := codersdk.ChatMessage{ID: 8, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant}
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &localOnly,
})
_, events, cancel, ok := server.Subscribe(ctx, chatID, nil, 5)
require.True(t, ok)
defer cancel()
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 5})
// The cache pass delivers id=8; the DB pass must still run and
// deliver id=6. Order between them is set by cache iteration vs
// DB query, so accept either ordering.
first := testutil.RequireReceive(ctx, t, events)
require.Equal(t, codersdk.ChatStreamEventTypeMessage, first.Type)
require.NotNil(t, first.Message)
second := testutil.RequireReceive(ctx, t, events)
require.Equal(t, codersdk.ChatStreamEventTypeMessage, second.Type)
require.NotNil(t, second.Message)
got := map[int64]bool{first.Message.ID: true, second.Message.ID: true}
require.True(t, got[6], "cross-replica DB message id=6 must be delivered")
require.True(t, got[8], "locally-cached message id=8 must be delivered")
requireNoStreamEvent(t, events, 200*time.Millisecond)
}