fix(scaletest/llmmock): emit Anthropic SSE event lines (#23587)

The llmmock Anthropic stream wrote each chunk as `data:` only, so
Anthropic clients never saw the named SSE events they dispatch on and
Claude responses arrived empty even though the stream completed with
HTTP 200.

Update `sendAnthropicStream()` to emit `event: <type>` and `data:
<json>` for each Anthropic chunk while leaving the OpenAI-style streams
unchanged.
This commit is contained in:
Ethan
2026-03-30 12:21:53 +11:00
committed by GitHub
parent 4bf46c4435
commit f7aa46c4ba
+20 -14
View File
@@ -583,8 +583,8 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter,
return
}
writeChunk := func(data string) bool {
if _, err := fmt.Fprintf(w, "%s", data); err != nil {
writeChunk := func(eventType string, data []byte) bool {
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, data); err != nil {
s.logger.Error(ctx, "failed to write Anthropic stream chunk",
slog.F("response_id", resp.ID),
slog.Error(err),
@@ -597,8 +597,9 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter,
return true
}
startEventType := "message_start"
startEvent := map[string]interface{}{
"type": "message_start",
"type": startEventType,
"message": map[string]interface{}{
"id": resp.ID,
"type": resp.Type,
@@ -607,13 +608,14 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter,
},
}
startBytes, _ := json.Marshal(startEvent)
if !writeChunk(fmt.Sprintf("data: %s\n\n", startBytes)) {
if !writeChunk(startEventType, startBytes) {
return
}
// Send content_block_start event
contentStartEventType := "content_block_start"
contentStartEvent := map[string]interface{}{
"type": "content_block_start",
"type": contentStartEventType,
"index": 0,
"content_block": map[string]interface{}{
"type": "text",
@@ -621,13 +623,14 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter,
},
}
contentStartBytes, _ := json.Marshal(contentStartEvent)
if !writeChunk(fmt.Sprintf("data: %s\n\n", contentStartBytes)) {
if !writeChunk(contentStartEventType, contentStartBytes) {
return
}
// Send content_block_delta event
deltaEventType := "content_block_delta"
deltaEvent := map[string]interface{}{
"type": "content_block_delta",
"type": deltaEventType,
"index": 0,
"delta": map[string]interface{}{
"type": "text_delta",
@@ -635,23 +638,25 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter,
},
}
deltaBytes, _ := json.Marshal(deltaEvent)
if !writeChunk(fmt.Sprintf("data: %s\n\n", deltaBytes)) {
if !writeChunk(deltaEventType, deltaBytes) {
return
}
// Send content_block_stop event
contentStopEventType := "content_block_stop"
contentStopEvent := map[string]interface{}{
"type": "content_block_stop",
"type": contentStopEventType,
"index": 0,
}
contentStopBytes, _ := json.Marshal(contentStopEvent)
if !writeChunk(fmt.Sprintf("data: %s\n\n", contentStopBytes)) {
if !writeChunk(contentStopEventType, contentStopBytes) {
return
}
// Send message_delta event
deltaMsgEventType := "message_delta"
deltaMsgEvent := map[string]interface{}{
"type": "message_delta",
"type": deltaMsgEventType,
"delta": map[string]interface{}{
"stop_reason": resp.StopReason,
"stop_sequence": resp.StopSequence,
@@ -659,16 +664,17 @@ func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter,
"usage": resp.Usage,
}
deltaMsgBytes, _ := json.Marshal(deltaMsgEvent)
if !writeChunk(fmt.Sprintf("data: %s\n\n", deltaMsgBytes)) {
if !writeChunk(deltaMsgEventType, deltaMsgBytes) {
return
}
// Send message_stop event
stopEventType := "message_stop"
stopEvent := map[string]interface{}{
"type": "message_stop",
"type": stopEventType,
}
stopBytes, _ := json.Marshal(stopEvent)
writeChunk(fmt.Sprintf("data: %s\n\n", stopBytes))
writeChunk(stopEventType, stopBytes)
}
func (s *Server) tracingMiddleware(next http.Handler) http.Handler {