fix(streaming): use iter vec for [DONE] marker
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled

This commit is contained in:
2026-03-03 12:54:23 -05:00
parent 545664f0dd
commit 2508a745c6

View File

@@ -242,7 +242,6 @@ async fn chat_completions(
); );
// Create SSE stream from aggregating stream // Create SSE stream from aggregating stream
// Emit [DONE] immediately when we see finish_reason: "stop"
let stream_id = format!("chatcmpl-{}", Uuid::new_v4()); let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
let stream_created = chrono::Utc::now().timestamp() as u64; let stream_created = chrono::Utc::now().timestamp() as u64;
@@ -250,9 +249,6 @@ async fn chat_completions(
.map(move |chunk_result| { .map(move |chunk_result| {
match chunk_result { match chunk_result {
Ok(chunk) => { Ok(chunk) => {
// Check if this is the final chunk
let is_final = chunk.finish_reason.as_deref() == Some("stop");
// Convert provider chunk to OpenAI-compatible SSE event // Convert provider chunk to OpenAI-compatible SSE event
let response = ChatCompletionStreamResponse { let response = ChatCompletionStreamResponse {
id: stream_id.clone(), id: stream_id.clone(),
@@ -272,16 +268,7 @@ async fn chat_completions(
}; };
match Event::default().json_data(response) { match Event::default().json_data(response) {
Ok(event) => { Ok(event) => Ok(event),
// If this is the final chunk, also create [DONE] event
// We'll return both in a nested stream
if is_final {
// Return the final chunk first
Ok(event)
} else {
Ok(event)
}
}
Err(e) => { Err(e) => {
warn!("Failed to serialize SSE event: {}", e); warn!("Failed to serialize SSE event: {}", e);
Err(AppError::InternalError("SSE serialization failed".to_string())) Err(AppError::InternalError("SSE serialization failed".to_string()))
@@ -295,9 +282,9 @@ async fn chat_completions(
} }
}); });
// Add [DONE] as a separate stream after sse_stream // Append [DONE] using iter (not once) - should ensure it gets polled
let done_event = Event::default().data("[DONE]"); let done_vec = vec![Ok::<Event, AppError>(Event::default().data("[DONE]"))];
let done_stream = futures::stream::once(async move { Ok(done_event) }); let done_stream = futures::stream::iter(done_vec);
let out = sse_stream.chain(done_stream); let out = sse_stream.chain(done_stream);
Ok(Sse::new(out).into_response()) Ok(Sse::new(out).into_response())