From 53250683051fcf73098260b96c02c90255879487 Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Tue, 3 Mar 2026 13:24:25 -0500 Subject: [PATCH] fix(streaming): restore SSE with [DONE] chain --- src/server/mod.rs | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index f1a835af..23722dd4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,7 +5,7 @@ use axum::{ response::sse::{Event, Sse}, routing::{get, post}, }; -use futures::stream::{StreamExt, self}; +use futures::stream::StreamExt; use std::time::Duration; use sqlx; use std::sync::Arc; @@ -242,23 +242,17 @@ async fn chat_completions( }, ); - // Create SSE stream with explicit [DONE] termination + // Create SSE stream from aggregating stream let stream_id = format!("chatcmpl-{}", Uuid::new_v4()); let stream_created = chrono::Utc::now().timestamp() as u64; - let stream_id_clone = stream_id.clone(); - // Convert aggregator to a Vec first, then stream with [DONE] - let chunks: Vec> = aggregating_stream.collect().await; - - // Create stream that yields SSE events then [DONE] - let final_stream = stream::iter(chunks) - .then(move |chunk_result| { - let sid = stream_id_clone.clone(); - async move { + // Create the SSE event stream + let sse_stream = aggregating_stream + .map(move |chunk_result| { match chunk_result { Ok(chunk) => { let response = ChatCompletionStreamResponse { - id: sid, + id: stream_id.clone(), object: "chat.completion.chunk".to_string(), created: stream_created, model: chunk.model.clone(), @@ -274,17 +268,19 @@ async fn chat_completions( }], }; Event::default().json_data(response) - .map_err(|e| AppError::InternalError(format!("SSE serialization failed: {}", e))) + .map_err(|e| AppError::InternalError(format!("SSE error: {}", e))) } Err(e) => Err(e), } - } - }) - .chain(stream::once(async { - Ok::(Event::default().data("[DONE]")) - })); + }); - Ok(Sse::new(final_stream).into_response()) + // Chain [DONE] event - this should work now with the corrected stream type + let done_stream = futures::stream::once(async { + Ok::(Event::default().data("[DONE]")) + }); + let out = sse_stream.chain(done_stream); + + Ok(Sse::new(out).into_response()) } Err(e) => { // Record provider failure