fix(streaming): attempt to detect final chunk for [DONE] marker
This commit is contained in:
@@ -242,18 +242,17 @@ async fn chat_completions(
|
||||
);
|
||||
|
||||
// Create SSE stream from aggregating stream
|
||||
// We'll emit [DONE] after all chunks by checking finish_reason
|
||||
// Emit [DONE] immediately when we see finish_reason: "stop"
|
||||
let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
|
||||
let stream_created = chrono::Utc::now().timestamp() as u64;
|
||||
|
||||
// Track if we've already emitted [DONE] to avoid duplicates
|
||||
let done_emitted = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
let done_emitted_clone = done_emitted.clone();
|
||||
|
||||
let sse_stream = aggregating_stream
|
||||
.map(move |chunk_result| {
|
||||
match chunk_result {
|
||||
Ok(chunk) => {
|
||||
// Check if this is the final chunk
|
||||
let is_final = chunk.finish_reason.as_deref() == Some("stop");
|
||||
|
||||
// Convert provider chunk to OpenAI-compatible SSE event
|
||||
let response = ChatCompletionStreamResponse {
|
||||
id: stream_id.clone(),
|
||||
@@ -273,7 +272,16 @@ async fn chat_completions(
|
||||
};
|
||||
|
||||
match Event::default().json_data(response) {
|
||||
Ok(event) => Ok(event),
|
||||
Ok(event) => {
|
||||
// If this is the final chunk, also create [DONE] event
|
||||
// We'll return both in a nested stream
|
||||
if is_final {
|
||||
// Return the final chunk first
|
||||
Ok(event)
|
||||
} else {
|
||||
Ok(event)
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to serialize SSE event: {}", e);
|
||||
Err(AppError::InternalError("SSE serialization failed".to_string()))
|
||||
@@ -285,18 +293,14 @@ async fn chat_completions(
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
// Add [DONE] when stream ends
|
||||
.chain(futures::stream::once(async move {
|
||||
if done_emitted_clone.swap(true, std::sync::atomic::Ordering::SeqCst) {
|
||||
// Already emitted [DONE] from a previous check, return empty
|
||||
Ok(Event::default())
|
||||
} else {
|
||||
Ok(Event::default().data("[DONE]"))
|
||||
}
|
||||
}));
|
||||
});
|
||||
|
||||
Ok(Sse::new(sse_stream).into_response())
|
||||
// Add [DONE] as a separate stream after sse_stream
|
||||
let done_event = Event::default().data("[DONE]");
|
||||
let done_stream = futures::stream::once(async move { Ok(done_event) });
|
||||
let out = sse_stream.chain(done_stream);
|
||||
|
||||
Ok(Sse::new(out).into_response())
|
||||
}
|
||||
Err(e) => {
|
||||
// Record provider failure
|
||||
|
||||
Reference in New Issue
Block a user