fix(streaming): use async_stream with [DONE] at end
This commit is contained in:
@@ -242,22 +242,20 @@ async fn chat_completions(
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
// Create streaming response - collect all chunks first, then stream with [DONE]
|
// Create SSE stream - simpler approach that works
|
||||||
let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
|
let stream_id = format!("chatcmpl-{}", Uuid::new_v4());
|
||||||
let stream_created = chrono::Utc::now().timestamp() as u64;
|
let stream_created = chrono::Utc::now().timestamp() as u64;
|
||||||
let stream_id_clone = stream_id.clone();
|
let stream_id_sse = stream_id.clone();
|
||||||
|
|
||||||
// Collect all chunks from the aggregator
|
// Build stream that yields events wrapped in Result
|
||||||
let chunks: Vec<Result<crate::providers::ProviderStreamChunk, crate::errors::AppError>> =
|
let stream = async_stream::stream! {
|
||||||
aggregating_stream.collect().await;
|
let mut aggregator = Box::pin(aggregating_stream);
|
||||||
|
|
||||||
// Create a stream that yields chunks then [DONE]
|
while let Some(chunk_result) = aggregator.next().await {
|
||||||
let final_stream = futures::stream::iter(chunks)
|
|
||||||
.map(move |chunk_result| {
|
|
||||||
match chunk_result {
|
match chunk_result {
|
||||||
Ok(chunk) => {
|
Ok(chunk) => {
|
||||||
let response = ChatCompletionStreamResponse {
|
let response = ChatCompletionStreamResponse {
|
||||||
id: stream_id_clone.clone(),
|
id: stream_id_sse.clone(),
|
||||||
object: "chat.completion.chunk".to_string(),
|
object: "chat.completion.chunk".to_string(),
|
||||||
created: stream_created,
|
created: stream_created,
|
||||||
model: chunk.model.clone(),
|
model: chunk.model.clone(),
|
||||||
@@ -272,17 +270,26 @@ async fn chat_completions(
|
|||||||
finish_reason: chunk.finish_reason,
|
finish_reason: chunk.finish_reason,
|
||||||
}],
|
}],
|
||||||
};
|
};
|
||||||
Event::default().json_data(response)
|
|
||||||
.map_err(|e| crate::errors::AppError::InternalError(format!("SSE error: {}", e)))
|
// Use axum's Event directly, wrap in Ok
|
||||||
|
match Event::default().json_data(response) {
|
||||||
|
Ok(event) => yield Ok::<_, crate::errors::AppError>(event),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to serialize SSE: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Stream error: {}", e);
|
||||||
}
|
}
|
||||||
Err(e) => Err(e),
|
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.chain(futures::stream::once(async {
|
|
||||||
Ok::<Event, crate::errors::AppError>(Event::default().data("[DONE]"))
|
// Yield [DONE] at the end
|
||||||
}));
|
yield Ok::<_, crate::errors::AppError>(Event::default().data("[DONE]"));
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Sse::new(final_stream).into_response())
|
Ok(Sse::new(stream).into_response())
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Record provider failure
|
// Record provider failure
|
||||||
|
|||||||
Reference in New Issue
Block a user