diff --git a/src/server/mod.rs b/src/server/mod.rs index 057d2531..404a9e81 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -242,18 +242,17 @@ async fn chat_completions( ); // Create SSE stream from aggregating stream - // We'll emit [DONE] after all chunks by checking finish_reason + // Emit [DONE] immediately when we see finish_reason: "stop" let stream_id = format!("chatcmpl-{}", Uuid::new_v4()); let stream_created = chrono::Utc::now().timestamp() as u64; - // Track if we've already emitted [DONE] to avoid duplicates - let done_emitted = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); - let done_emitted_clone = done_emitted.clone(); - let sse_stream = aggregating_stream .map(move |chunk_result| { match chunk_result { Ok(chunk) => { + // Check if this is the final chunk + let is_final = chunk.finish_reason.as_deref() == Some("stop"); + // Convert provider chunk to OpenAI-compatible SSE event let response = ChatCompletionStreamResponse { id: stream_id.clone(), @@ -273,7 +272,16 @@ async fn chat_completions( }; match Event::default().json_data(response) { - Ok(event) => Ok(event), + Ok(event) => { + // If this is the final chunk, also create [DONE] event + // We'll return both in a nested stream + if is_final { + // Return the final chunk first + Ok(event) + } else { + Ok(event) + } + } Err(e) => { warn!("Failed to serialize SSE event: {}", e); Err(AppError::InternalError("SSE serialization failed".to_string())) @@ -285,18 +293,14 @@ async fn chat_completions( Err(e) } } - }) - // Add [DONE] when stream ends - .chain(futures::stream::once(async move { - if done_emitted_clone.swap(true, std::sync::atomic::Ordering::SeqCst) { - // Already emitted [DONE] from a previous check, return empty - Ok(Event::default()) - } else { - Ok(Event::default().data("[DONE]")) - } - })); + }); - Ok(Sse::new(sse_stream).into_response()) + // Add [DONE] as a separate stream after sse_stream + let done_event = Event::default().data("[DONE]"); + let done_stream = futures::stream::once(async move { Ok(done_event) }); + let out = sse_stream.chain(done_stream); + + Ok(Sse::new(out).into_response()) } Err(e) => { // Record provider failure