mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(coderd/x/chatd): deliver out-of-order durable messages on subscribe (#25433)
The subscriber advanced a single delivery cursor on each notify and trusted it for both lookups. Concurrent publishMessage calls and PG NOTIFY commit ordering let cache appends and notifies arrive out of ID order, after which a late notify would scan above its own message and drop it. The DB fallback was also skipped whenever the cache delivered anything, hiding cross-replica messages that only the DB held. The cursor becomes a high-water mark, not the lookup key. Notifies trigger a rescan over the gap they describe and dedupe per subscription, and the DB pass runs every time so cross-replica messages can't get eaten by a local cache hit. Closes coder/internal#1525 Closes CODAGT-357
This commit is contained in:
committed by
GitHub
parent
57ed244de1
commit
ec1e861152
+44
-8
@@ -4915,6 +4915,7 @@ func (p *Server) SubscribeAuthorized(
|
||||
// is already active so no notifications can be lost during this
|
||||
// window.
|
||||
initialSnapshot := make([]codersdk.ChatStreamEvent, 0)
|
||||
delivered := map[int64]struct{}{}
|
||||
// Add local same-replica message_parts to the snapshot. Retry comes
|
||||
// from state.currentRetry, not the event buffer, so late joiners see
|
||||
// only the latest phase rather than a stale buffered retry event.
|
||||
@@ -4959,6 +4960,7 @@ func (p *Server) SubscribeAuthorized(
|
||||
ChatID: chatID,
|
||||
Message: &sdkMsg,
|
||||
})
|
||||
delivered[msg.ID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5077,35 +5079,59 @@ func (p *Server) SubscribeAuthorized(
|
||||
if notify.AfterMessageID > 0 || notify.FullRefresh {
|
||||
if notify.FullRefresh {
|
||||
lastMessageID = 0
|
||||
clear(delivered)
|
||||
}
|
||||
var (
|
||||
deliveredCount int
|
||||
source string
|
||||
)
|
||||
cached := p.getCachedDurableMessages(chatID, lastMessageID)
|
||||
// Notifies can arrive out of order. Rescan from
|
||||
// min(AfterMessageID, lastMessageID) to cover the gap,
|
||||
// floored at afterMessageID to respect the subscription
|
||||
// boundary. The delivered set deduplicates.
|
||||
lookupAfter := lastMessageID
|
||||
if !notify.FullRefresh {
|
||||
lookupAfter = max(afterMessageID, min(notify.AfterMessageID, lastMessageID))
|
||||
}
|
||||
cached := p.getCachedDurableMessages(chatID, lookupAfter)
|
||||
if !notify.FullRefresh && len(cached) > 0 {
|
||||
source = "cache"
|
||||
for _, event := range cached {
|
||||
if event.Message == nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := delivered[event.Message.ID]; ok {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case <-mergedCtx.Done():
|
||||
return
|
||||
case mergedEvents <- event:
|
||||
}
|
||||
delivered[event.Message.ID] = struct{}{}
|
||||
if event.Message.ID > lastMessageID {
|
||||
lastMessageID = event.Message.ID
|
||||
}
|
||||
deliveredCount = len(cached)
|
||||
} else if newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{
|
||||
deliveredCount++
|
||||
source = "cache"
|
||||
}
|
||||
}
|
||||
// DB pass picks up cross-replica messages the local cache
|
||||
// cannot have. Delivered set dedupes against the cache pass.
|
||||
newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: lastMessageID,
|
||||
}); msgErr != nil {
|
||||
AfterID: lookupAfter,
|
||||
})
|
||||
if msgErr != nil {
|
||||
p.logger.Warn(mergedCtx, "failed to get chat messages after pubsub notification",
|
||||
slog.F("chat_id", chatID),
|
||||
slog.Error(msgErr),
|
||||
)
|
||||
} else {
|
||||
source = "db"
|
||||
for _, msg := range newMessages {
|
||||
if msg.ID <= lastMessageID {
|
||||
if msg.ID <= lookupAfter {
|
||||
continue
|
||||
}
|
||||
if _, ok := delivered[msg.ID]; ok {
|
||||
continue
|
||||
}
|
||||
sdkMsg := db2sdk.ChatMessage(msg)
|
||||
@@ -5118,14 +5144,24 @@ func (p *Server) SubscribeAuthorized(
|
||||
Message: &sdkMsg,
|
||||
}:
|
||||
}
|
||||
delivered[msg.ID] = struct{}{}
|
||||
if msg.ID > lastMessageID {
|
||||
lastMessageID = msg.ID
|
||||
}
|
||||
deliveredCount++
|
||||
switch source {
|
||||
case "":
|
||||
source = "db"
|
||||
case "cache":
|
||||
source = "cache+db"
|
||||
}
|
||||
}
|
||||
}
|
||||
// Marker for ENG-2645: subscriber delivered durable messages.
|
||||
p.logger.Debug(mergedCtx, "stream subscriber delivered messages",
|
||||
slog.F("chat_id", chatID),
|
||||
slog.F("after_message_id", notify.AfterMessageID),
|
||||
slog.F("lookup_after", lookupAfter),
|
||||
slog.F("source", source),
|
||||
slog.F("delivered_count", deliveredCount),
|
||||
slog.F("last_message_id", lastMessageID),
|
||||
|
||||
@@ -1999,7 +1999,7 @@ func TestTurnWorkspaceContext_EnsureWorkspaceAgentIgnoresCachedAgentForDifferent
|
||||
require.Equal(t, updatedChat, currentChat)
|
||||
}
|
||||
|
||||
func TestSubscribeSkipsDatabaseCatchupForLocallyDeliveredMessage(t *testing.T) {
|
||||
func TestSubscribeDedupesLocallyDeliveredMessageOnNotifyCatchup(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
@@ -2028,6 +2028,12 @@ func TestSubscribeSkipsDatabaseCatchupForLocallyDeliveredMessage(t *testing.T) {
|
||||
AfterID: 0,
|
||||
}).Return([]database.ChatMessage{initialMessage}, nil),
|
||||
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
|
||||
// DB catchup runs unconditionally on every notify; the delivered
|
||||
// set dedupes against locally-delivered messages.
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: 1,
|
||||
}).Return(nil, nil),
|
||||
)
|
||||
|
||||
server := newSubscribeTestServer(t, db)
|
||||
@@ -2071,6 +2077,12 @@ func TestSubscribeUsesDurableCacheWhenLocalMessageWasNotDelivered(t *testing.T)
|
||||
AfterID: 0,
|
||||
}).Return([]database.ChatMessage{initialMessage}, nil),
|
||||
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
|
||||
// DB catchup runs unconditionally; cached id=2 is deduped via
|
||||
// the delivered set so this query returning nil is sufficient.
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: 1,
|
||||
}).Return(nil, nil),
|
||||
)
|
||||
|
||||
server := newSubscribeTestServer(t, db)
|
||||
|
||||
@@ -0,0 +1,212 @@
|
||||
package chatd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbmock"
|
||||
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
// TestSubscribeDeliversOutOfOrderDurableMessage tests that a
|
||||
// late-arriving lower-ID durable message is delivered when a
|
||||
// higher-ID was already cached and sent.
|
||||
func TestSubscribeDeliversOutOfOrderDurableMessage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
db := dbmock.NewMockStore(ctrl)
|
||||
|
||||
chatID := uuid.New()
|
||||
chat := database.Chat{ID: chatID, Status: database.ChatStatusRequiresAction}
|
||||
initialUser := database.ChatMessage{ID: 3, ChatID: chatID, Role: database.ChatMessageRoleUser}
|
||||
initialAssistant := database.ChatMessage{ID: 4, ChatID: chatID, Role: database.ChatMessageRoleAssistant}
|
||||
|
||||
gomock.InOrder(
|
||||
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
|
||||
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: 0,
|
||||
}).Return([]database.ChatMessage{initialUser, initialAssistant}, nil),
|
||||
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
|
||||
)
|
||||
// Notify-driven catch-up queries return nothing so the test only
|
||||
// exercises the cache delivery path.
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||
|
||||
server := newSubscribeTestServer(t, db)
|
||||
|
||||
toolResult := codersdk.ChatMessage{ID: 5, ChatID: chatID, Role: codersdk.ChatMessageRoleTool}
|
||||
resumed := codersdk.ChatMessage{ID: 7, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant}
|
||||
promoted := codersdk.ChatMessage{ID: 6, ChatID: chatID, Role: codersdk.ChatMessageRoleUser}
|
||||
|
||||
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
|
||||
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID,
|
||||
Message: &codersdk.ChatMessage{ID: 4, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant},
|
||||
})
|
||||
|
||||
_, events, cancel, ok := server.Subscribe(ctx, chatID, nil, 0)
|
||||
require.True(t, ok)
|
||||
defer cancel()
|
||||
|
||||
// Cache id=5 and id=7, but not id=6, then emit the notify for
|
||||
// id=5. The merge goroutine drains [5, 7] from the cache.
|
||||
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
|
||||
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &toolResult,
|
||||
})
|
||||
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
|
||||
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &resumed,
|
||||
})
|
||||
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 4})
|
||||
|
||||
first := testutil.RequireReceive(ctx, t, events)
|
||||
require.Equal(t, codersdk.ChatStreamEventTypeMessage, first.Type)
|
||||
require.NotNil(t, first.Message)
|
||||
require.Equal(t, int64(5), first.Message.ID)
|
||||
second := testutil.RequireReceive(ctx, t, events)
|
||||
require.Equal(t, codersdk.ChatStreamEventTypeMessage, second.Type)
|
||||
require.NotNil(t, second.Message)
|
||||
require.Equal(t, int64(7), second.Message.ID)
|
||||
|
||||
// Cache id=6 after the merge goroutine has already advanced
|
||||
// lastMessageID to 7, then emit the notify for id=6.
|
||||
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
|
||||
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &promoted,
|
||||
})
|
||||
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 5})
|
||||
|
||||
third := testutil.RequireReceive(ctx, t, events)
|
||||
require.Equal(t, codersdk.ChatStreamEventTypeMessage, third.Type)
|
||||
require.NotNil(t, third.Message)
|
||||
require.Equal(t, int64(6), third.Message.ID)
|
||||
|
||||
requireNoStreamEvent(t, events, 200*time.Millisecond)
|
||||
}
|
||||
|
||||
// TestSubscribeRespectsAfterMessageIDOnLateNotify tests that
|
||||
// lookupAfter never drops below afterMessageID, preventing
|
||||
// re-emission of messages the client already has via REST.
|
||||
func TestSubscribeRespectsAfterMessageIDOnLateNotify(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
db := dbmock.NewMockStore(ctrl)
|
||||
|
||||
chatID := uuid.New()
|
||||
chat := database.Chat{ID: chatID, Status: database.ChatStatusRunning}
|
||||
|
||||
gomock.InOrder(
|
||||
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
|
||||
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: 100,
|
||||
}).Return(nil, nil),
|
||||
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
|
||||
)
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||
|
||||
server := newSubscribeTestServer(t, db)
|
||||
|
||||
// Seed the cache with messages the client claims to already have
|
||||
// (id<=100) plus one new message (id=101).
|
||||
for _, id := range []int64{96, 97, 98, 99, 100, 101} {
|
||||
msg := &codersdk.ChatMessage{ID: id, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant}
|
||||
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
|
||||
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: msg,
|
||||
})
|
||||
}
|
||||
|
||||
_, events, cancel, ok := server.Subscribe(ctx, chatID, nil, 100)
|
||||
require.True(t, ok)
|
||||
defer cancel()
|
||||
|
||||
// A stale notify with AfterMessageID=95 would naively pull
|
||||
// id=96..101 back from the cache; only id=101 should reach the
|
||||
// live stream because the client already has 96-100.
|
||||
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 95})
|
||||
|
||||
ev := testutil.RequireReceive(ctx, t, events)
|
||||
require.Equal(t, codersdk.ChatStreamEventTypeMessage, ev.Type)
|
||||
require.NotNil(t, ev.Message)
|
||||
require.Equal(t, int64(101), ev.Message.ID,
|
||||
"messages at or below afterMessageID must not be re-emitted")
|
||||
|
||||
requireNoStreamEvent(t, events, 200*time.Millisecond)
|
||||
}
|
||||
|
||||
// TestSubscribeRunsDBFallbackWhenCacheDeliversUnrelatedMessage tests
|
||||
// that the DB fallback runs even when the cache delivers, so
|
||||
// cross-replica messages are not dropped.
|
||||
func TestSubscribeRunsDBFallbackWhenCacheDeliversUnrelatedMessage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
db := dbmock.NewMockStore(ctrl)
|
||||
|
||||
chatID := uuid.New()
|
||||
chat := database.Chat{ID: chatID, Status: database.ChatStatusRunning}
|
||||
crossReplica := database.ChatMessage{ID: 6, ChatID: chatID, Role: database.ChatMessageRoleUser}
|
||||
|
||||
gomock.InOrder(
|
||||
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
|
||||
db.EXPECT().GetChatByID(gomock.Any(), chatID).Return(chat, nil),
|
||||
// Snapshot: nothing above the client's afterMessageID=5 yet.
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: 5,
|
||||
}).Return(nil, nil),
|
||||
db.EXPECT().GetChatQueuedMessages(gomock.Any(), chatID).Return(nil, nil),
|
||||
// Notify catchup: the cross-replica message lives only in the
|
||||
// DB on this replica.
|
||||
db.EXPECT().GetChatMessagesByChatID(gomock.Any(), database.GetChatMessagesByChatIDParams{
|
||||
ChatID: chatID,
|
||||
AfterID: 5,
|
||||
}).Return([]database.ChatMessage{crossReplica}, nil),
|
||||
)
|
||||
|
||||
server := newSubscribeTestServer(t, db)
|
||||
|
||||
// Cache a locally-published higher-ID message so the cache pass
|
||||
// has something to deliver without covering id=6.
|
||||
localOnly := codersdk.ChatMessage{ID: 8, ChatID: chatID, Role: codersdk.ChatMessageRoleAssistant}
|
||||
server.cacheDurableMessage(chatID, codersdk.ChatStreamEvent{
|
||||
Type: codersdk.ChatStreamEventTypeMessage, ChatID: chatID, Message: &localOnly,
|
||||
})
|
||||
|
||||
_, events, cancel, ok := server.Subscribe(ctx, chatID, nil, 5)
|
||||
require.True(t, ok)
|
||||
defer cancel()
|
||||
|
||||
server.publishChatStreamNotify(chatID, coderdpubsub.ChatStreamNotifyMessage{AfterMessageID: 5})
|
||||
|
||||
// The cache pass delivers id=8; the DB pass must still run and
|
||||
// deliver id=6. Order between them is set by cache iteration vs
|
||||
// DB query, so accept either ordering.
|
||||
first := testutil.RequireReceive(ctx, t, events)
|
||||
require.Equal(t, codersdk.ChatStreamEventTypeMessage, first.Type)
|
||||
require.NotNil(t, first.Message)
|
||||
second := testutil.RequireReceive(ctx, t, events)
|
||||
require.Equal(t, codersdk.ChatStreamEventTypeMessage, second.Type)
|
||||
require.NotNil(t, second.Message)
|
||||
|
||||
got := map[int64]bool{first.Message.ID: true, second.Message.ID: true}
|
||||
require.True(t, got[6], "cross-replica DB message id=6 must be delivered")
|
||||
require.True(t, got[8], "locally-cached message id=8 must be delivered")
|
||||
|
||||
requireNoStreamEvent(t, events, 200*time.Millisecond)
|
||||
}
|
||||
Reference in New Issue
Block a user