chore(coderd/x/chatd): instrument PromoteQueued + stream subscriber for ENG-2645 (#25085)

TestPromoteQueuedWhileRequiresActionMixedTools has flaked three times across
Windows and Ubuntu CI runners since 2026-05-06; local repro on the dev
workspace has not surfaced it. The May 8 Ubuntu log shows all four
PromoteQueued post-TX pubsub publishes reaching pg_notify, yet the test still
times out 25s later, so the failure is downstream between the subscriber's
listener and the test's events channel. Adds three Debug-level markers in
chatd.go (no logic change) plus two t.Logf markers in the test's reader so
the next CI occurrence pins down exactly which step failed.

Closes ENG-2645
Closes coder/internal#1523
This commit is contained in:
Mathias Fredriksson
2026-05-11 11:33:46 +03:00
committed by GitHub
parent 063c06ca5f
commit fb60bb0c08
2 changed files with 33 additions and 0 deletions
+31
View File
@@ -2350,6 +2350,13 @@ func (p *Server) PromoteQueued(
p.publishMessage(opts.ChatID, promoted)
p.publishStatus(opts.ChatID, updatedChat.Status, updatedChat.WorkerID)
p.publishChatPubsubEvent(updatedChat, codersdk.ChatWatchEventKindStatusChange, nil)
// Marker for ENG-2645: confirms post-TX publishes ran.
p.logger.Debug(ctx, "promote queued completed",
slog.F("chat_id", opts.ChatID),
slog.F("promoted_id", promoted.ID),
slog.F("synthetic_count", len(syntheticResults)),
slog.F("status", updatedChat.Status),
)
p.signalWake()
return result, nil
@@ -4777,12 +4784,25 @@ func (p *Server) SubscribeAuthorized(
}
return
case notify := <-notifications:
// Marker for ENG-2645: subscriber received pubsub notify.
p.logger.Debug(mergedCtx, "stream subscriber received notify",
slog.F("chat_id", chatID),
slog.F("after_message_id", notify.AfterMessageID),
slog.F("status", notify.Status),
slog.F("queue_update", notify.QueueUpdate),
slog.F("last_message_id", lastMessageID),
)
if notify.AfterMessageID > 0 || notify.FullRefresh {
if notify.FullRefresh {
lastMessageID = 0
}
var (
deliveredCount int
source string
)
cached := p.getCachedDurableMessages(chatID, lastMessageID)
if !notify.FullRefresh && len(cached) > 0 {
source = "cache"
for _, event := range cached {
select {
case <-mergedCtx.Done():
@@ -4791,6 +4811,7 @@ func (p *Server) SubscribeAuthorized(
}
lastMessageID = event.Message.ID
}
deliveredCount = len(cached)
} else if newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: lastMessageID,
@@ -4800,6 +4821,7 @@ func (p *Server) SubscribeAuthorized(
slog.Error(msgErr),
)
} else {
source = "db"
for _, msg := range newMessages {
if msg.ID <= lastMessageID {
continue
@@ -4815,8 +4837,17 @@ func (p *Server) SubscribeAuthorized(
}:
}
lastMessageID = msg.ID
deliveredCount++
}
}
// Marker for ENG-2645: subscriber delivered durable messages.
p.logger.Debug(mergedCtx, "stream subscriber delivered messages",
slog.F("chat_id", chatID),
slog.F("after_message_id", notify.AfterMessageID),
slog.F("source", source),
slog.F("delivered_count", deliveredCount),
slog.F("last_message_id", lastMessageID),
)
}
if notify.Status != "" {
status := database.ChatStatus(notify.Status)
+2
View File
@@ -9130,8 +9130,10 @@ func TestPromoteQueuedWhileRequiresActionMixedTools(t *testing.T) {
select {
case ev := <-events:
if ev.Type != codersdk.ChatStreamEventTypeMessage || ev.Message == nil {
t.Logf("subscriber consumed non-message event type=%s", ev.Type)
return false
}
t.Logf("subscriber consumed message id=%d role=%s match_promoted=%t", ev.Message.ID, ev.Message.Role, ev.Message.ID == promoteResult.PromotedMessage.ID)
switch ev.Message.Role {
case codersdk.ChatMessageRoleTool:
syntheticPublishCount++