fix(streaming): emit [DONE] using chain on aggregator stream
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled

This commit is contained in:
2026-03-03 12:41:16 -05:00
parent 2040b068e6
commit 99716d97ef

View File

@@ -242,52 +242,61 @@ async fn chat_completions(
);
// Create SSE stream from aggregating stream
// We'll emit [DONE] after all chunks by checking finish_reason
let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
let stream_created = chrono::Utc::now().timestamp() as u64;
let sse_stream = aggregating_stream.map(move |chunk_result| {
match chunk_result {
Ok(chunk) => {
// Convert provider chunk to OpenAI-compatible SSE event
let response = ChatCompletionStreamResponse {
id: stream_id.clone(),
object: "chat.completion.chunk".to_string(),
created: stream_created,
model: chunk.model.clone(),
choices: vec![ChatStreamChoice {
index: 0,
delta: ChatStreamDelta {
role: None,
content: Some(chunk.content),
reasoning_content: chunk.reasoning_content,
tool_calls: chunk.tool_calls,
},
finish_reason: chunk.finish_reason,
}],
};
// Track if we've already emitted [DONE] to avoid duplicates
let done_emitted = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let done_emitted_clone = done_emitted.clone();
let sse_stream = aggregating_stream
.map(move |chunk_result| {
match chunk_result {
Ok(chunk) => {
// Convert provider chunk to OpenAI-compatible SSE event
let response = ChatCompletionStreamResponse {
id: stream_id.clone(),
object: "chat.completion.chunk".to_string(),
created: stream_created,
model: chunk.model.clone(),
choices: vec![ChatStreamChoice {
index: 0,
delta: ChatStreamDelta {
role: None,
content: Some(chunk.content),
reasoning_content: chunk.reasoning_content,
tool_calls: chunk.tool_calls,
},
finish_reason: chunk.finish_reason,
}],
};
match Event::default().json_data(response) {
Ok(event) => Ok(event),
Err(e) => {
warn!("Failed to serialize SSE event: {}", e);
Err(AppError::InternalError("SSE serialization failed".to_string()))
match Event::default().json_data(response) {
Ok(event) => Ok(event),
Err(e) => {
warn!("Failed to serialize SSE event: {}", e);
Err(AppError::InternalError("SSE serialization failed".to_string()))
}
}
}
Err(e) => {
warn!("Error in streaming response: {}", e);
Err(e)
}
}
Err(e) => {
warn!("Error in streaming response: {}", e);
Err(e)
})
// Add [DONE] when stream ends
.chain(futures::stream::once(async move {
if done_emitted_clone.swap(true, std::sync::atomic::Ordering::SeqCst) {
// Already emitted [DONE] from a previous check, return empty
Ok(Event::default())
} else {
Ok(Event::default().data("[DONE]"))
}
}
});
}));
// Many OpenAI-compatible clients expect a terminal [DONE] marker.
// Emit it when the upstream stream ends to avoid clients treating
// the response as incomplete.
let done_event = Ok::<Event, AppError>(Event::default().data("[DONE]"));
let done_stream = futures::stream::iter(vec![done_event]);
let out = sse_stream.chain(done_stream);
Ok(Sse::new(out).into_response())
Ok(Sse::new(sse_stream).into_response())
}
Err(e) => {
// Record provider failure