fix(streaming): use repeat_with + take for [DONE] marker
This commit is contained in:
@@ -246,7 +246,7 @@ async fn chat_completions(
|
|||||||
let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
|
let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
|
||||||
let stream_created = chrono::Utc::now().timestamp() as u64;
|
let stream_created = chrono::Utc::now().timestamp() as u64;
|
||||||
|
|
||||||
// Create the SSE event stream
|
// Map chunks to SSE events
|
||||||
let sse_stream = aggregating_stream
|
let sse_stream = aggregating_stream
|
||||||
.map(move |chunk_result| {
|
.map(move |chunk_result| {
|
||||||
match chunk_result {
|
match chunk_result {
|
||||||
@@ -274,10 +274,9 @@ async fn chat_completions(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Chain [DONE] event - this should work now with the corrected stream type
|
// Chain [DONE] - using repeat_with to ensure it gets polled
|
||||||
let done_stream = futures::stream::once(async {
|
let done_stream = futures::stream::repeat_with(|| Ok::<Event, AppError>(Event::default().data("[DONE]")))
|
||||||
Ok::<Event, AppError>(Event::default().data("[DONE]"))
|
.take(1);
|
||||||
});
|
|
||||||
let out = sse_stream.chain(done_stream);
|
let out = sse_stream.chain(done_stream);
|
||||||
|
|
||||||
Ok(Sse::new(out).into_response())
|
Ok(Sse::new(out).into_response())
|
||||||
|
|||||||
Reference in New Issue
Block a user