fix(streaming): restore SSE with [DONE] chain
This commit is contained in:
@@ -5,7 +5,7 @@ use axum::{
|
||||
response::sse::{Event, Sse},
|
||||
routing::{get, post},
|
||||
};
|
||||
use futures::stream::{StreamExt, self};
|
||||
use futures::stream::StreamExt;
|
||||
use std::time::Duration;
|
||||
use sqlx;
|
||||
use std::sync::Arc;
|
||||
@@ -242,23 +242,17 @@ async fn chat_completions(
|
||||
},
|
||||
);
|
||||
|
||||
// Create SSE stream with explicit [DONE] termination
|
||||
// Create SSE stream from aggregating stream
|
||||
let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
|
||||
let stream_created = chrono::Utc::now().timestamp() as u64;
|
||||
let stream_id_clone = stream_id.clone();
|
||||
|
||||
// Convert aggregator to a Vec first, then stream with [DONE]
|
||||
let chunks: Vec<Result<ProviderStreamChunk, AppError>> = aggregating_stream.collect().await;
|
||||
|
||||
// Create stream that yields SSE events then [DONE]
|
||||
let final_stream = stream::iter(chunks)
|
||||
.then(move |chunk_result| {
|
||||
let sid = stream_id_clone.clone();
|
||||
async move {
|
||||
// Create the SSE event stream
|
||||
let sse_stream = aggregating_stream
|
||||
.map(move |chunk_result| {
|
||||
match chunk_result {
|
||||
Ok(chunk) => {
|
||||
let response = ChatCompletionStreamResponse {
|
||||
id: sid,
|
||||
id: stream_id.clone(),
|
||||
object: "chat.completion.chunk".to_string(),
|
||||
created: stream_created,
|
||||
model: chunk.model.clone(),
|
||||
@@ -274,17 +268,19 @@ async fn chat_completions(
|
||||
}],
|
||||
};
|
||||
Event::default().json_data(response)
|
||||
.map_err(|e| AppError::InternalError(format!("SSE serialization failed: {}", e)))
|
||||
.map_err(|e| AppError::InternalError(format!("SSE error: {}", e)))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
})
|
||||
.chain(stream::once(async {
|
||||
Ok::<Event, AppError>(Event::default().data("[DONE]"))
|
||||
}));
|
||||
});
|
||||
|
||||
Ok(Sse::new(final_stream).into_response())
|
||||
// Chain [DONE] event - this should work now with the corrected stream type
|
||||
let done_stream = futures::stream::once(async {
|
||||
Ok::<Event, AppError>(Event::default().data("[DONE]"))
|
||||
});
|
||||
let out = sse_stream.chain(done_stream);
|
||||
|
||||
Ok(Sse::new(out).into_response())
|
||||
}
|
||||
Err(e) => {
|
||||
// Record provider failure
|
||||
|
||||
Reference in New Issue
Block a user