diff --git a/src/providers/deepseek.rs b/src/providers/deepseek.rs index 213628a5..c964ee55 100644 --- a/src/providers/deepseek.rs +++ b/src/providers/deepseek.rs @@ -58,22 +58,7 @@ impl super::Provider for DeepSeekProvider { async fn chat_completion(&self, request: UnifiedRequest) -> Result { let messages_json = helpers::messages_to_openai_json(&request.messages).await?; - let mut body = helpers::build_openai_body(&request, messages_json, false); - - // Sanitize for deepseek-reasoner - if request.model == "deepseek-reasoner" { - if let Some(obj) = body.as_object_mut() { - obj.remove("tools"); - obj.remove("tool_choice"); - obj.remove("temperature"); - obj.remove("top_p"); - obj.remove("presence_penalty"); - obj.remove("frequency_penalty"); - obj.remove("logit_bias"); - obj.remove("logprobs"); - obj.remove("top_logprobs"); - } - } + let body = helpers::build_openai_body(&request, messages_json, false); let response = self .client @@ -85,8 +70,10 @@ impl super::Provider for DeepSeekProvider { .map_err(|e| AppError::ProviderError(e.to_string()))?; if !response.status().is_success() { + let status = response.status(); let error_text = response.text().await.unwrap_or_default(); - return Err(AppError::ProviderError(format!("DeepSeek API error: {}", error_text))); + tracing::error!("DeepSeek API error ({}): {}", status, error_text); + return Err(AppError::ProviderError(format!("DeepSeek API error ({}): {}", status, error_text))); } let resp_json: serde_json::Value = response @@ -131,26 +118,9 @@ impl super::Provider for DeepSeekProvider { let messages_json = helpers::messages_to_openai_json_text_only(&request.messages).await?; let mut body = helpers::build_openai_body(&request, messages_json, true); - // Sanitize for deepseek-reasoner or general deepseek-chat - if request.model == "deepseek-reasoner" { - if let Some(obj) = body.as_object_mut() { - obj.remove("stream_options"); - // Also does not support these parameters - obj.remove("tools"); - obj.remove("tool_choice"); - obj.remove("temperature"); - obj.remove("top_p"); - obj.remove("presence_penalty"); - obj.remove("frequency_penalty"); - obj.remove("logit_bias"); - obj.remove("logprobs"); - obj.remove("top_logprobs"); - } - } else { - // For standard deepseek-chat, keep it clean - if let Some(obj) = body.as_object_mut() { - obj.remove("stream_options"); - } + // Standard OpenAI cleanup + if let Some(obj) = body.as_object_mut() { + obj.remove("stream_options"); } let url = format!("{}/chat/completions", self.config.base_url); diff --git a/src/providers/helpers.rs b/src/providers/helpers.rs index 4a048323..f4464a6d 100644 --- a/src/providers/helpers.rs +++ b/src/providers/helpers.rs @@ -237,6 +237,79 @@ pub fn parse_openai_response(resp_json: &Value, model: String) -> Result, +) -> Option> { + // Parse usage from the final chunk (sent when stream_options.include_usage is true). + // This chunk may have an empty `choices` array. + let stream_usage = chunk.get("usage").and_then(|u| { + if u.is_null() { + return None; + } + let prompt_tokens = u["prompt_tokens"].as_u64().unwrap_or(0) as u32; + let completion_tokens = u["completion_tokens"].as_u64().unwrap_or(0) as u32; + let total_tokens = u["total_tokens"].as_u64().unwrap_or(0) as u32; + + let cache_read_tokens = u["prompt_tokens_details"]["cached_tokens"] + .as_u64() + .or_else(|| u["prompt_cache_hit_tokens"].as_u64()) + .unwrap_or(0) as u32; + + let cache_write_tokens = u["prompt_cache_miss_tokens"] + .as_u64() + .unwrap_or(0) as u32; + + Some(StreamUsage { + prompt_tokens, + completion_tokens, + total_tokens, + cache_read_tokens, + cache_write_tokens, + }) + }); + + if let Some(choice) = chunk["choices"].get(0) { + let delta = &choice["delta"]; + let content = delta["content"].as_str().unwrap_or_default().to_string(); + let reasoning_content = delta["reasoning_content"] + .as_str() + .or_else(|| reasoning_field.and_then(|f| delta[f].as_str())) + .map(|s| s.to_string()); + let finish_reason = choice["finish_reason"].as_str().map(|s| s.to_string()); + + // Parse tool_calls deltas from the stream chunk + let tool_calls: Option> = delta + .get("tool_calls") + .and_then(|tc| serde_json::from_value(tc.clone()).ok()); + + Some(Ok(ProviderStreamChunk { + content, + reasoning_content, + finish_reason, + tool_calls, + model: model.to_string(), + usage: stream_usage, + })) + } else if stream_usage.is_some() { + // Final usage-only chunk (empty choices array) — yield it so + // AggregatingStream can capture the real token counts. + Some(Ok(ProviderStreamChunk { + content: String::new(), + reasoning_content: None, + finish_reason: None, + tool_calls: None, + model: model.to_string(), + usage: stream_usage, + })) + } else { + None + } +} + /// Create an SSE stream that parses OpenAI-compatible streaming chunks. /// /// The optional `reasoning_field` allows overriding the field name for @@ -264,67 +337,8 @@ pub fn create_openai_stream( let chunk: Value = serde_json::from_str(&msg.data) .map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?; - // Parse usage from the final chunk (sent when stream_options.include_usage is true). - // This chunk may have an empty `choices` array. - let stream_usage = chunk.get("usage").and_then(|u| { - if u.is_null() { - return None; - } - let prompt_tokens = u["prompt_tokens"].as_u64().unwrap_or(0) as u32; - let completion_tokens = u["completion_tokens"].as_u64().unwrap_or(0) as u32; - let total_tokens = u["total_tokens"].as_u64().unwrap_or(0) as u32; - - let cache_read_tokens = u["prompt_tokens_details"]["cached_tokens"] - .as_u64() - .or_else(|| u["prompt_cache_hit_tokens"].as_u64()) - .unwrap_or(0) as u32; - - let cache_write_tokens = u["prompt_cache_miss_tokens"] - .as_u64() - .unwrap_or(0) as u32; - - Some(StreamUsage { - prompt_tokens, - completion_tokens, - total_tokens, - cache_read_tokens, - cache_write_tokens, - }) - }); - - if let Some(choice) = chunk["choices"].get(0) { - let delta = &choice["delta"]; - let content = delta["content"].as_str().unwrap_or_default().to_string(); - let reasoning_content = delta["reasoning_content"] - .as_str() - .or_else(|| reasoning_field.and_then(|f| delta[f].as_str())) - .map(|s| s.to_string()); - let finish_reason = choice["finish_reason"].as_str().map(|s| s.to_string()); - - // Parse tool_calls deltas from the stream chunk - let tool_calls: Option> = delta - .get("tool_calls") - .and_then(|tc| serde_json::from_value(tc.clone()).ok()); - - yield ProviderStreamChunk { - content, - reasoning_content, - finish_reason, - tool_calls, - model: model.clone(), - usage: stream_usage, - }; - } else if stream_usage.is_some() { - // Final usage-only chunk (empty choices array) — yield it so - // AggregatingStream can capture the real token counts. - yield ProviderStreamChunk { - content: String::new(), - reasoning_content: None, - finish_reason: None, - tool_calls: None, - model: model.clone(), - usage: stream_usage, - }; + if let Some(p_chunk) = parse_openai_stream_chunk(&chunk, &model, reasoning_field) { + yield p_chunk?; } } Ok(_) => continue,