diff --git a/src/server/mod.rs b/src/server/mod.rs index 5fb1a6cc..859b03da 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -242,22 +242,20 @@ async fn chat_completions( }, ); - // Create streaming response - collect all chunks first, then stream with [DONE] + // Create SSE stream - simpler approach that works let stream_id = format!("chatcmpl-{}", Uuid::new_v4()); let stream_created = chrono::Utc::now().timestamp() as u64; - let stream_id_clone = stream_id.clone(); + let stream_id_sse = stream_id.clone(); - // Collect all chunks from the aggregator - let chunks: Vec> = - aggregating_stream.collect().await; - - // Create a stream that yields chunks then [DONE] - let final_stream = futures::stream::iter(chunks) - .map(move |chunk_result| { + // Build stream that yields events wrapped in Result + let stream = async_stream::stream! { + let mut aggregator = Box::pin(aggregating_stream); + + while let Some(chunk_result) = aggregator.next().await { match chunk_result { Ok(chunk) => { let response = ChatCompletionStreamResponse { - id: stream_id_clone.clone(), + id: stream_id_sse.clone(), object: "chat.completion.chunk".to_string(), created: stream_created, model: chunk.model.clone(), @@ -272,17 +270,26 @@ async fn chat_completions( finish_reason: chunk.finish_reason, }], }; - Event::default().json_data(response) - .map_err(|e| crate::errors::AppError::InternalError(format!("SSE error: {}", e))) + + // Use axum's Event directly, wrap in Ok + match Event::default().json_data(response) { + Ok(event) => yield Ok::<_, crate::errors::AppError>(event), + Err(e) => { + warn!("Failed to serialize SSE: {}", e); + } + } + } + Err(e) => { + warn!("Stream error: {}", e); } - Err(e) => Err(e), } - }) - .chain(futures::stream::once(async { - Ok::(Event::default().data("[DONE]")) - })); + } + + // Yield [DONE] at the end + yield Ok::<_, crate::errors::AppError>(Event::default().data("[DONE]")); + }; - Ok(Sse::new(final_stream).into_response()) + Ok(Sse::new(stream).into_response()) } Err(e) => { // Record provider failure