fix(streaming): collect chunks then stream with explicit [DONE]
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled

This commit is contained in:
2026-03-03 13:52:18 -05:00
parent 656a6f31ce
commit 07f91ec4bb

View File

@@ -242,51 +242,45 @@ async fn chat_completions(
}, },
); );
// Create SSE stream from aggregating stream // Create streaming response - collect all chunks first, then stream with [DONE]
let stream_id = format!("chatcmpl-{}", Uuid::new_v4()); let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
let stream_created = chrono::Utc::now().timestamp() as u64; let stream_created = chrono::Utc::now().timestamp() as u64;
let stream_id_clone = stream_id.clone();
// Map chunks to SSE events - clone stream_id for the async block
let stream_id_for_sse = stream_id.clone(); // Collect all chunks from the aggregator
let chunks: Vec<Result<crate::providers::ProviderStreamChunk, crate::errors::AppError>> =
// Use async stream macro to ensure proper sequencing aggregating_stream.collect().await;
let final_stream = async_stream::stream! {
// First, process and yield all chunks from aggregator // Create a stream that yields chunks then [DONE]
let mut stream = Box::pin(aggregating_stream let final_stream = futures::stream::iter(chunks)
.map(move |chunk_result| { .map(move |chunk_result| {
match chunk_result { match chunk_result {
Ok(chunk) => { Ok(chunk) => {
let response = ChatCompletionStreamResponse { let response = ChatCompletionStreamResponse {
id: stream_id_for_sse.clone(), id: stream_id_clone.clone(),
object: "chat.completion.chunk".to_string(), object: "chat.completion.chunk".to_string(),
created: stream_created, created: stream_created,
model: chunk.model.clone(), model: chunk.model.clone(),
choices: vec![ChatStreamChoice { choices: vec![ChatStreamChoice {
index: 0, index: 0,
delta: ChatStreamDelta { delta: ChatStreamDelta {
role: None, role: None,
content: Some(chunk.content), content: Some(chunk.content),
reasoning_content: chunk.reasoning_content, reasoning_content: chunk.reasoning_content,
tool_calls: chunk.tool_calls, tool_calls: chunk.tool_calls,
}, },
finish_reason: chunk.finish_reason, finish_reason: chunk.finish_reason,
}], }],
}; };
Event::default().json_data(response) Event::default().json_data(response)
.map_err(|e| AppError::InternalError(format!("SSE error: {}", e))) .map_err(|e| crate::errors::AppError::InternalError(format!("SSE error: {}", e)))
}
Err(e) => Err(e),
} }
})); Err(e) => Err(e),
}
// Yield all chunks })
while let Some(item) = stream.next().await { .chain(futures::stream::once(async {
yield item; Ok::<Event, crate::errors::AppError>(Event::default().data("[DONE]"))
} }));
// Finally yield [DONE]
yield Ok::<Event, AppError>(Event::default().data("[DONE]"));
};
Ok(Sse::new(final_stream).into_response()) Ok(Sse::new(final_stream).into_response())
} }