From 07f91ec4bb13547bf970e9fc3d0c867cd4e09f4b Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Tue, 3 Mar 2026 13:52:18 -0500 Subject: [PATCH] fix(streaming): collect chunks then stream with explicit [DONE] --- src/server/mod.rs | 78 ++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 42 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index b2d4904f..5fb1a6cc 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -242,51 +242,45 @@ async fn chat_completions( }, ); - // Create SSE stream from aggregating stream + // Create streaming response - collect all chunks first, then stream with [DONE] let stream_id = format!("chatcmpl-{}", Uuid::new_v4()); let stream_created = chrono::Utc::now().timestamp() as u64; - - // Map chunks to SSE events - clone stream_id for the async block - let stream_id_for_sse = stream_id.clone(); - - // Use async stream macro to ensure proper sequencing - let final_stream = async_stream::stream! { - // First, process and yield all chunks from aggregator - let mut stream = Box::pin(aggregating_stream - .map(move |chunk_result| { - match chunk_result { - Ok(chunk) => { - let response = ChatCompletionStreamResponse { - id: stream_id_for_sse.clone(), - object: "chat.completion.chunk".to_string(), - created: stream_created, - model: chunk.model.clone(), - choices: vec![ChatStreamChoice { - index: 0, - delta: ChatStreamDelta { - role: None, - content: Some(chunk.content), - reasoning_content: chunk.reasoning_content, - tool_calls: chunk.tool_calls, - }, - finish_reason: chunk.finish_reason, - }], - }; - Event::default().json_data(response) - .map_err(|e| AppError::InternalError(format!("SSE error: {}", e))) - } - Err(e) => Err(e), + let stream_id_clone = stream_id.clone(); + + // Collect all chunks from the aggregator + let chunks: Vec> = + aggregating_stream.collect().await; + + // Create a stream that yields chunks then [DONE] + let final_stream = futures::stream::iter(chunks) + .map(move |chunk_result| { + match chunk_result { + Ok(chunk) => { + let response = ChatCompletionStreamResponse { + id: stream_id_clone.clone(), + object: "chat.completion.chunk".to_string(), + created: stream_created, + model: chunk.model.clone(), + choices: vec![ChatStreamChoice { + index: 0, + delta: ChatStreamDelta { + role: None, + content: Some(chunk.content), + reasoning_content: chunk.reasoning_content, + tool_calls: chunk.tool_calls, + }, + finish_reason: chunk.finish_reason, + }], + }; + Event::default().json_data(response) + .map_err(|e| crate::errors::AppError::InternalError(format!("SSE error: {}", e))) } - })); - - // Yield all chunks - while let Some(item) = stream.next().await { - yield item; - } - - // Finally yield [DONE] - yield Ok::(Event::default().data("[DONE]")); - }; + Err(e) => Err(e), + } + }) + .chain(futures::stream::once(async { + Ok::(Event::default().data("[DONE]")) + })); Ok(Sse::new(final_stream).into_response()) }