diff --git a/aibridge/intercept/eventstream/eventstream.go b/aibridge/intercept/eventstream/eventstream.go index 939525012e..7a1af49f92 100644 --- a/aibridge/intercept/eventstream/eventstream.go +++ b/aibridge/intercept/eventstream/eventstream.go @@ -121,6 +121,7 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) { return } + time.Sleep(100 * time.Millisecond) // Initiate the stream on first event (if not already initiated). s.InitiateStream(w) case <-s.tick.C: diff --git a/aibridge/intercept/messages/streaming.go b/aibridge/intercept/messages/streaming.go index 47c49528a9..c4997bb8b8 100644 --- a/aibridge/intercept/messages/streaming.go +++ b/aibridge/intercept/messages/streaming.go @@ -483,6 +483,14 @@ newStream: // Causes a new stream to be run with updated messages. isFirst = false + // Commit to SSE format before the next iteration: if + // it fails (e.g. all keys exhausted), the error must + // be relayed as an SSE event since we already streamed + // the first iteration's events. Setting initiated here + // closes the race window in EventStream.IsStreaming() + // where Start may not yet have processed buffered + // events from this iteration. + events.InitiateStream(w) continue newStream }