diff --git a/src/server/mod.rs b/src/server/mod.rs index a172e8d2..057d2531 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -242,52 +242,61 @@ async fn chat_completions( ); // Create SSE stream from aggregating stream + // We'll emit [DONE] after all chunks by checking finish_reason let stream_id = format!("chatcmpl-{}", Uuid::new_v4()); let stream_created = chrono::Utc::now().timestamp() as u64; - let sse_stream = aggregating_stream.map(move |chunk_result| { - match chunk_result { - Ok(chunk) => { - // Convert provider chunk to OpenAI-compatible SSE event - let response = ChatCompletionStreamResponse { - id: stream_id.clone(), - object: "chat.completion.chunk".to_string(), - created: stream_created, - model: chunk.model.clone(), - choices: vec![ChatStreamChoice { - index: 0, - delta: ChatStreamDelta { - role: None, - content: Some(chunk.content), - reasoning_content: chunk.reasoning_content, - tool_calls: chunk.tool_calls, - }, - finish_reason: chunk.finish_reason, - }], - }; + + // Track if we've already emitted [DONE] to avoid duplicates + let done_emitted = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let done_emitted_clone = done_emitted.clone(); + + let sse_stream = aggregating_stream + .map(move |chunk_result| { + match chunk_result { + Ok(chunk) => { + // Convert provider chunk to OpenAI-compatible SSE event + let response = ChatCompletionStreamResponse { + id: stream_id.clone(), + object: "chat.completion.chunk".to_string(), + created: stream_created, + model: chunk.model.clone(), + choices: vec![ChatStreamChoice { + index: 0, + delta: ChatStreamDelta { + role: None, + content: Some(chunk.content), + reasoning_content: chunk.reasoning_content, + tool_calls: chunk.tool_calls, + }, + finish_reason: chunk.finish_reason, + }], + }; - match Event::default().json_data(response) { - Ok(event) => Ok(event), - Err(e) => { - warn!("Failed to serialize SSE event: {}", e); - Err(AppError::InternalError("SSE serialization failed".to_string())) + match Event::default().json_data(response) { + Ok(event) => Ok(event), + Err(e) => { + warn!("Failed to serialize SSE event: {}", e); + Err(AppError::InternalError("SSE serialization failed".to_string())) + } } } + Err(e) => { + warn!("Error in streaming response: {}", e); + Err(e) + } } - Err(e) => { - warn!("Error in streaming response: {}", e); - Err(e) + }) + // Add [DONE] when stream ends + .chain(futures::stream::once(async move { + if done_emitted_clone.swap(true, std::sync::atomic::Ordering::SeqCst) { + // Already emitted [DONE] from a previous check, return empty + Ok(Event::default()) + } else { + Ok(Event::default().data("[DONE]")) } - } - }); + })); - // Many OpenAI-compatible clients expect a terminal [DONE] marker. - // Emit it when the upstream stream ends to avoid clients treating - // the response as incomplete. - let done_event = Ok::(Event::default().data("[DONE]")); - let done_stream = futures::stream::iter(vec![done_event]); - let out = sse_stream.chain(done_stream); - - Ok(Sse::new(out).into_response()) + Ok(Sse::new(sse_stream).into_response()) } Err(e) => { // Record provider failure