fix(deepseek): add missing StreamExt import
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
use super::helpers;
|
use super::helpers;
|
||||||
use super::{ProviderResponse, ProviderStreamChunk};
|
use super::{ProviderResponse, ProviderStreamChunk};
|
||||||
@@ -130,7 +131,7 @@ impl super::Provider for DeepSeekProvider {
|
|||||||
let messages_json = helpers::messages_to_openai_json_text_only(&request.messages).await?;
|
let messages_json = helpers::messages_to_openai_json_text_only(&request.messages).await?;
|
||||||
let mut body = helpers::build_openai_body(&request, messages_json, true);
|
let mut body = helpers::build_openai_body(&request, messages_json, true);
|
||||||
|
|
||||||
// DeepSeek reasoning model doesn't support stream_options
|
// Sanitize for deepseek-reasoner or general deepseek-chat
|
||||||
if request.model == "deepseek-reasoner" {
|
if request.model == "deepseek-reasoner" {
|
||||||
if let Some(obj) = body.as_object_mut() {
|
if let Some(obj) = body.as_object_mut() {
|
||||||
obj.remove("stream_options");
|
obj.remove("stream_options");
|
||||||
@@ -152,14 +153,62 @@ impl super::Provider for DeepSeekProvider {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let url = format!("{}/chat/completions", self.config.base_url);
|
||||||
|
let api_key = self.api_key.clone();
|
||||||
|
let probe_client = self.client.clone();
|
||||||
|
let probe_body = body.clone();
|
||||||
|
let model = request.model.clone();
|
||||||
|
|
||||||
let es = reqwest_eventsource::EventSource::new(
|
let es = reqwest_eventsource::EventSource::new(
|
||||||
self.client
|
self.client
|
||||||
.post(format!("{}/chat/completions", self.config.base_url))
|
.post(&url)
|
||||||
.header("Authorization", format!("Bearer {}", self.api_key))
|
.header("Authorization", format!("Bearer {}", self.api_key))
|
||||||
.json(&body),
|
.json(&body),
|
||||||
)
|
)
|
||||||
.map_err(|e| AppError::ProviderError(format!("Failed to create EventSource: {}", e)))?;
|
.map_err(|e| AppError::ProviderError(format!("Failed to create EventSource: {}", e)))?;
|
||||||
|
|
||||||
Ok(helpers::create_openai_stream(es, request.model, None))
|
let stream = async_stream::try_stream! {
|
||||||
|
let mut es = es;
|
||||||
|
while let Some(event) = es.next().await {
|
||||||
|
match event {
|
||||||
|
Ok(reqwest_eventsource::Event::Message(msg)) => {
|
||||||
|
if msg.data == "[DONE]" {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let chunk: serde_json::Value = serde_json::from_str(&msg.data)
|
||||||
|
.map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?;
|
||||||
|
|
||||||
|
if let Some(p_chunk) = helpers::parse_openai_stream_chunk(&chunk, &model, None) {
|
||||||
|
yield p_chunk?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(_) => continue,
|
||||||
|
Err(e) => {
|
||||||
|
// Attempt to probe for the actual error body
|
||||||
|
let probe_resp = probe_client
|
||||||
|
.post(&url)
|
||||||
|
.header("Authorization", format!("Bearer {}", api_key))
|
||||||
|
.json(&probe_body)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match probe_resp {
|
||||||
|
Ok(r) if !r.status().is_success() => {
|
||||||
|
let status = r.status();
|
||||||
|
let body = r.text().await.unwrap_or_default();
|
||||||
|
tracing::error!("DeepSeek Stream Error Probe ({}): {}", status, body);
|
||||||
|
Err(AppError::ProviderError(format!("DeepSeek API error ({}): {}", status, body)))?;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
Err(AppError::ProviderError(format!("Stream error: {}", e)))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Box::pin(stream))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user