From f7aa46c4ba45ad75d74939531a5bbf24d9a27d4f Mon Sep 17 00:00:00 2001 From: Ethan <39577870+ethanndickson@users.noreply.github.com> Date: Mon, 30 Mar 2026 12:21:53 +1100 Subject: [PATCH] fix(scaletest/llmmock): emit Anthropic SSE event lines (#23587) The llmmock Anthropic stream wrote each chunk as `data:` only, so Anthropic clients never saw the named SSE events they dispatch on and Claude responses arrived empty even though the stream completed with HTTP 200. Update `sendAnthropicStream()` to emit `event: ` and `data: ` for each Anthropic chunk while leaving the OpenAI-style streams unchanged. --- scaletest/llmmock/server.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/scaletest/llmmock/server.go b/scaletest/llmmock/server.go index 24c0701b0a..8c9bdfe3c9 100644 --- a/scaletest/llmmock/server.go +++ b/scaletest/llmmock/server.go @@ -583,8 +583,8 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter, return } - writeChunk := func(data string) bool { - if _, err := fmt.Fprintf(w, "%s", data); err != nil { + writeChunk := func(eventType string, data []byte) bool { + if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, data); err != nil { s.logger.Error(ctx, "failed to write Anthropic stream chunk", slog.F("response_id", resp.ID), slog.Error(err), @@ -597,8 +597,9 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter, return true } + startEventType := "message_start" startEvent := map[string]interface{}{ - "type": "message_start", + "type": startEventType, "message": map[string]interface{}{ "id": resp.ID, "type": resp.Type, @@ -607,13 +608,14 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter, }, } startBytes, _ := json.Marshal(startEvent) - if !writeChunk(fmt.Sprintf("data: %s\n\n", startBytes)) { + if !writeChunk(startEventType, startBytes) { return } // Send content_block_start event + contentStartEventType := "content_block_start" contentStartEvent := map[string]interface{}{ - "type": "content_block_start", + "type": contentStartEventType, "index": 0, "content_block": map[string]interface{}{ "type": "text", @@ -621,13 +623,14 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter, }, } contentStartBytes, _ := json.Marshal(contentStartEvent) - if !writeChunk(fmt.Sprintf("data: %s\n\n", contentStartBytes)) { + if !writeChunk(contentStartEventType, contentStartBytes) { return } // Send content_block_delta event + deltaEventType := "content_block_delta" deltaEvent := map[string]interface{}{ - "type": "content_block_delta", + "type": deltaEventType, "index": 0, "delta": map[string]interface{}{ "type": "text_delta", @@ -635,23 +638,25 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter, }, } deltaBytes, _ := json.Marshal(deltaEvent) - if !writeChunk(fmt.Sprintf("data: %s\n\n", deltaBytes)) { + if !writeChunk(deltaEventType, deltaBytes) { return } // Send content_block_stop event + contentStopEventType := "content_block_stop" contentStopEvent := map[string]interface{}{ - "type": "content_block_stop", + "type": contentStopEventType, "index": 0, } contentStopBytes, _ := json.Marshal(contentStopEvent) - if !writeChunk(fmt.Sprintf("data: %s\n\n", contentStopBytes)) { + if !writeChunk(contentStopEventType, contentStopBytes) { return } // Send message_delta event + deltaMsgEventType := "message_delta" deltaMsgEvent := map[string]interface{}{ - "type": "message_delta", + "type": deltaMsgEventType, "delta": map[string]interface{}{ "stop_reason": resp.StopReason, "stop_sequence": resp.StopSequence, @@ -659,16 +664,17 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter, "usage": resp.Usage, } deltaMsgBytes, _ := json.Marshal(deltaMsgEvent) - if !writeChunk(fmt.Sprintf("data: %s\n\n", deltaMsgBytes)) { + if !writeChunk(deltaMsgEventType, deltaMsgBytes) { return } // Send message_stop event + stopEventType := "message_stop" stopEvent := map[string]interface{}{ - "type": "message_stop", + "type": stopEventType, } stopBytes, _ := json.Marshal(stopEvent) - writeChunk(fmt.Sprintf("data: %s\n\n", stopBytes)) + writeChunk(stopEventType, stopBytes) } func (s *Server) tracingMiddleware(next http.Handler) http.Handler {