refactor: extract stream parsing helper and enable deepseek error probing
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled

This commit is contained in:
2026-03-05 18:16:56 +00:00
parent c2bad90a8f
commit e3c1b9fa20
2 changed files with 82 additions and 98 deletions

View File

@@ -58,22 +58,7 @@ impl super::Provider for DeepSeekProvider {
async fn chat_completion(&self, request: UnifiedRequest) -> Result<ProviderResponse, AppError> { async fn chat_completion(&self, request: UnifiedRequest) -> Result<ProviderResponse, AppError> {
let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let messages_json = helpers::messages_to_openai_json(&request.messages).await?;
let mut body = helpers::build_openai_body(&request, messages_json, false); let body = helpers::build_openai_body(&request, messages_json, false);
// Sanitize for deepseek-reasoner
if request.model == "deepseek-reasoner" {
if let Some(obj) = body.as_object_mut() {
obj.remove("tools");
obj.remove("tool_choice");
obj.remove("temperature");
obj.remove("top_p");
obj.remove("presence_penalty");
obj.remove("frequency_penalty");
obj.remove("logit_bias");
obj.remove("logprobs");
obj.remove("top_logprobs");
}
}
let response = self let response = self
.client .client
@@ -85,8 +70,10 @@ impl super::Provider for DeepSeekProvider {
.map_err(|e| AppError::ProviderError(e.to_string()))?; .map_err(|e| AppError::ProviderError(e.to_string()))?;
if !response.status().is_success() { if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default(); let error_text = response.text().await.unwrap_or_default();
return Err(AppError::ProviderError(format!("DeepSeek API error: {}", error_text))); tracing::error!("DeepSeek API error ({}): {}", status, error_text);
return Err(AppError::ProviderError(format!("DeepSeek API error ({}): {}", status, error_text)));
} }
let resp_json: serde_json::Value = response let resp_json: serde_json::Value = response
@@ -131,26 +118,9 @@ impl super::Provider for DeepSeekProvider {
let messages_json = helpers::messages_to_openai_json_text_only(&request.messages).await?; let messages_json = helpers::messages_to_openai_json_text_only(&request.messages).await?;
let mut body = helpers::build_openai_body(&request, messages_json, true); let mut body = helpers::build_openai_body(&request, messages_json, true);
// Sanitize for deepseek-reasoner or general deepseek-chat // Standard OpenAI cleanup
if request.model == "deepseek-reasoner" {
if let Some(obj) = body.as_object_mut() { if let Some(obj) = body.as_object_mut() {
obj.remove("stream_options"); obj.remove("stream_options");
// Also does not support these parameters
obj.remove("tools");
obj.remove("tool_choice");
obj.remove("temperature");
obj.remove("top_p");
obj.remove("presence_penalty");
obj.remove("frequency_penalty");
obj.remove("logit_bias");
obj.remove("logprobs");
obj.remove("top_logprobs");
}
} else {
// For standard deepseek-chat, keep it clean
if let Some(obj) = body.as_object_mut() {
obj.remove("stream_options");
}
} }
let url = format!("{}/chat/completions", self.config.base_url); let url = format!("{}/chat/completions", self.config.base_url);

View File

@@ -237,33 +237,13 @@ pub fn parse_openai_response(resp_json: &Value, model: String) -> Result<Provide
}) })
} }
/// Create an SSE stream that parses OpenAI-compatible streaming chunks. /// Parse a single OpenAI-compatible stream chunk into a ProviderStreamChunk.
/// /// Returns None if the chunk should be skipped (e.g. promptFeedback).
/// The optional `reasoning_field` allows overriding the field name for pub fn parse_openai_stream_chunk(
/// reasoning content (e.g., "thought" for Ollama). chunk: &Value,
/// Parses tool_calls deltas from streaming chunks when present. model: &str,
/// When `stream_options.include_usage: true` was sent, the provider sends a
/// final chunk with `usage` data — this is parsed into `StreamUsage` and
/// attached to the yielded `ProviderStreamChunk`.
pub fn create_openai_stream(
es: reqwest_eventsource::EventSource,
model: String,
reasoning_field: Option<&'static str>, reasoning_field: Option<&'static str>,
) -> BoxStream<'static, Result<ProviderStreamChunk, AppError>> { ) -> Option<Result<ProviderStreamChunk, AppError>> {
use reqwest_eventsource::Event;
let stream = async_stream::try_stream! {
let mut es = es;
while let Some(event) = es.next().await {
match event {
Ok(Event::Message(msg)) => {
if msg.data == "[DONE]" {
break;
}
let chunk: Value = serde_json::from_str(&msg.data)
.map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?;
// Parse usage from the final chunk (sent when stream_options.include_usage is true). // Parse usage from the final chunk (sent when stream_options.include_usage is true).
// This chunk may have an empty `choices` array. // This chunk may have an empty `choices` array.
let stream_usage = chunk.get("usage").and_then(|u| { let stream_usage = chunk.get("usage").and_then(|u| {
@@ -306,25 +286,59 @@ pub fn create_openai_stream(
.get("tool_calls") .get("tool_calls")
.and_then(|tc| serde_json::from_value(tc.clone()).ok()); .and_then(|tc| serde_json::from_value(tc.clone()).ok());
yield ProviderStreamChunk { Some(Ok(ProviderStreamChunk {
content, content,
reasoning_content, reasoning_content,
finish_reason, finish_reason,
tool_calls, tool_calls,
model: model.clone(), model: model.to_string(),
usage: stream_usage, usage: stream_usage,
}; }))
} else if stream_usage.is_some() { } else if stream_usage.is_some() {
// Final usage-only chunk (empty choices array) — yield it so // Final usage-only chunk (empty choices array) — yield it so
// AggregatingStream can capture the real token counts. // AggregatingStream can capture the real token counts.
yield ProviderStreamChunk { Some(Ok(ProviderStreamChunk {
content: String::new(), content: String::new(),
reasoning_content: None, reasoning_content: None,
finish_reason: None, finish_reason: None,
tool_calls: None, tool_calls: None,
model: model.clone(), model: model.to_string(),
usage: stream_usage, usage: stream_usage,
}; }))
} else {
None
}
}
/// Create an SSE stream that parses OpenAI-compatible streaming chunks.
///
/// The optional `reasoning_field` allows overriding the field name for
/// reasoning content (e.g., "thought" for Ollama).
/// Parses tool_calls deltas from streaming chunks when present.
/// When `stream_options.include_usage: true` was sent, the provider sends a
/// final chunk with `usage` data — this is parsed into `StreamUsage` and
/// attached to the yielded `ProviderStreamChunk`.
pub fn create_openai_stream(
es: reqwest_eventsource::EventSource,
model: String,
reasoning_field: Option<&'static str>,
) -> BoxStream<'static, Result<ProviderStreamChunk, AppError>> {
use reqwest_eventsource::Event;
let stream = async_stream::try_stream! {
let mut es = es;
while let Some(event) = es.next().await {
match event {
Ok(Event::Message(msg)) => {
if msg.data == "[DONE]" {
break;
}
let chunk: Value = serde_json::from_str(&msg.data)
.map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?;
if let Some(p_chunk) = parse_openai_stream_chunk(&chunk, &model, reasoning_field) {
yield p_chunk?;
} }
} }
Ok(_) => continue, Ok(_) => continue,