mirror of
https://github.com/coder/coder.git
synced 2026-06-04 13:38:21 +00:00
a1e912a763
> **PR Stack** > 1. #23351 ← `#23282` > 2. #23282 ← `#23275` > 3. #23275 ← `#23349` > 4. **#23349** ← `main` *(you are here)* --- Retry events were published only to the local in-process stream via `publishEvent()`. When pubsub is active, `Subscribe()`'s merge loop only forwarded durable events (messages, status, errors) from pubsub notifications, so retry events were silently dropped for cross-replica subscribers. This adds a `publishRetry()` helper that publishes both locally and via pubsub, and extends the `Subscribe()` notification handler to forward retry events. **Changes:** - `coderd/pubsub/chatstreamnotify.go`: add `Retry` field to notify message - `coderd/chatd/chatd.go`: add `publishRetry()`, update `OnRetry` callback, extend `Subscribe()` to forward `notify.Retry` - `coderd/chatd/chatd_internal_test.go`: focused pubsub delivery test - `enterprise/coderd/chatd/chatd_test.go`: cross-replica end-to-end test
51 lines
1.7 KiB
Go
51 lines
1.7 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/coder/coder/v2/codersdk"
|
|
)
|
|
|
|
// ChatStreamNotifyChannel returns the pubsub channel for per-chat
|
|
// stream notifications. Subscribers receive lightweight notifications
|
|
// and read actual content from the database.
|
|
func ChatStreamNotifyChannel(chatID uuid.UUID) string {
|
|
return fmt.Sprintf("chat:stream:%s", chatID)
|
|
}
|
|
|
|
// ChatStreamNotifyMessage is the payload published on the per-chat
|
|
// stream notification channel. Durable message content is still read
|
|
// from the database, while transient control events can be carried
|
|
// inline for cross-replica delivery.
|
|
type ChatStreamNotifyMessage struct {
|
|
// AfterMessageID tells subscribers to query messages after this
|
|
// ID. Set when a new message is persisted.
|
|
AfterMessageID int64 `json:"after_message_id,omitempty"`
|
|
|
|
// Status is set when the chat status changes. Subscribers use
|
|
// this to update clients and to manage relay lifecycle.
|
|
Status string `json:"status,omitempty"`
|
|
|
|
// WorkerID identifies which replica is running the chat. Used
|
|
// by enterprise relay to know where to connect.
|
|
WorkerID string `json:"worker_id,omitempty"`
|
|
|
|
// Retry carries a structured retry event for cross-replica live
|
|
// delivery. This is transient stream state and is not read back
|
|
// from the database.
|
|
Retry *codersdk.ChatStreamRetry `json:"retry,omitempty"`
|
|
|
|
// Error is set when a processing error occurs.
|
|
Error string `json:"error,omitempty"`
|
|
|
|
// QueueUpdate is set when the queued messages change.
|
|
QueueUpdate bool `json:"queue_update,omitempty"`
|
|
|
|
// FullRefresh signals that subscribers should re-fetch all
|
|
// messages from the beginning (e.g. after an edit that
|
|
// truncates message history).
|
|
FullRefresh bool `json:"full_refresh,omitempty"`
|
|
}
|