diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 19297888..9bbff7cc 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -1,6 +1,7 @@ use anyhow::Result; use async_trait::async_trait; use futures::stream::BoxStream; +use futures::StreamExt; use super::helpers; use super::{ProviderResponse, ProviderStreamChunk}; @@ -44,7 +45,7 @@ impl super::Provider for OpenAIProvider { } fn supports_model(&self, model: &str) -> bool { - model.starts_with("gpt-") || model.starts_with("o1-") || model.starts_with("o3-") || model.starts_with("o4-") + model.starts_with("gpt-") || model.starts_with("o1-") || model.starts_with("o3-") || model.starts_with("o4-") || model == "gpt-5-nano" } fn supports_multimodal(&self) -> bool { @@ -65,9 +66,11 @@ impl super::Provider for OpenAIProvider { .map_err(|e| AppError::ProviderError(e.to_string()))?; if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await.unwrap_or_default(); + // Read error body to diagnose. If the model requires the Responses // API (v1/responses), retry against that endpoint. - let error_text = response.text().await.unwrap_or_default(); if error_text.to_lowercase().contains("v1/responses") || error_text.to_lowercase().contains("only supported in v1/responses") { // Build a simple `input` string by concatenating message parts. let messages_json = helpers::messages_to_openai_json(&request.messages).await?; @@ -106,16 +109,13 @@ impl super::Provider for OpenAIProvider { } // Responses API: try to extract text from `output` or `candidates` - // output -> [{"content": [{"type":..., "text": "..."}, ...]}] let mut content_text = String::new(); if let Some(output) = resp_json.get("output").and_then(|o| o.as_array()) { if let Some(first) = output.get(0) { if let Some(contents) = first.get("content").and_then(|c| c.as_array()) { for item in contents { if let Some(text) = item.get("text").and_then(|t| t.as_str()) { - if !content_text.is_empty() { - content_text.push_str("\n"); - } + if !content_text.is_empty() { content_text.push_str("\n"); } content_text.push_str(text); } else if let Some(parts) = item.get("parts").and_then(|p| p.as_array()) { for p in parts { @@ -130,7 +130,6 @@ impl super::Provider for OpenAIProvider { } } - // Fallback: check `candidates` -> candidate.content.parts.text if content_text.is_empty() { if let Some(cands) = resp_json.get("candidates").and_then(|c| c.as_array()) { if let Some(c0) = cands.get(0) { @@ -148,7 +147,6 @@ impl super::Provider for OpenAIProvider { } } - // Extract simple usage if present let prompt_tokens = resp_json.get("usage").and_then(|u| u.get("prompt_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; let completion_tokens = resp_json.get("usage").and_then(|u| u.get("completion_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; let total_tokens = resp_json.get("usage").and_then(|u| u.get("total_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; @@ -166,7 +164,8 @@ impl super::Provider for OpenAIProvider { }); } - return Err(AppError::ProviderError(format!("OpenAI API error: {}", error_text))); + tracing::error!("OpenAI API error ({}): {}", status, error_text); + return Err(AppError::ProviderError(format!("OpenAI API error ({}): {}", status, error_text))); } let resp_json: serde_json::Value = response @@ -297,46 +296,70 @@ impl super::Provider for OpenAIProvider { request: UnifiedRequest, ) -> Result>, AppError> { let messages_json = helpers::messages_to_openai_json(&request.messages).await?; - let body = helpers::build_openai_body(&request, messages_json, true); + let mut body = helpers::build_openai_body(&request, messages_json, true); - // Try to create an EventSource for streaming; if creation fails or - // the stream errors, fall back to a single synchronous request and - // emit its result as a single chunk. - let es_result = reqwest_eventsource::EventSource::new( - self.client - .post(format!("{}/chat/completions", self.config.base_url)) - .header("Authorization", format!("Bearer {}", self.api_key)) - .json(&body), - ); - - if es_result.is_err() { - // Fallback to non-streaming request which itself may retry to - // Responses API if necessary (handled in chat_completion). - let resp = self.chat_completion(request.clone()).await?; - let single_stream = async_stream::try_stream! { - let chunk = ProviderStreamChunk { - content: resp.content, - reasoning_content: resp.reasoning_content, - finish_reason: Some("stop".to_string()), - tool_calls: None, - model: resp.model.clone(), - usage: Some(super::StreamUsage { - prompt_tokens: resp.prompt_tokens, - completion_tokens: resp.completion_tokens, - total_tokens: resp.total_tokens, - cache_read_tokens: resp.cache_read_tokens, - cache_write_tokens: resp.cache_write_tokens, - }), - }; - - yield chunk; - }; - - return Ok(Box::pin(single_stream)); + // Standard OpenAI cleanup + if let Some(obj) = body.as_object_mut() { + obj.remove("stream_options"); } - let es = es_result.map_err(|e| AppError::ProviderError(format!("Failed to create EventSource: {}", e)))?; + let url = format!("{}/chat/completions", self.config.base_url); + let api_key = self.api_key.clone(); + let probe_client = self.client.clone(); + let probe_body = body.clone(); + let model = request.model.clone(); - Ok(helpers::create_openai_stream(es, request.model, None)) + let es = reqwest_eventsource::EventSource::new( + self.client + .post(&url) + .header("Authorization", format!("Bearer {}", self.api_key)) + .json(&body), + ) + .map_err(|e| AppError::ProviderError(format!("Failed to create EventSource: {}", e)))?; + + let stream = async_stream::try_stream! { + let mut es = es; + while let Some(event) = es.next().await { + match event { + Ok(reqwest_eventsource::Event::Message(msg)) => { + if msg.data == "[DONE]" { + break; + } + + let chunk: serde_json::Value = serde_json::from_str(&msg.data) + .map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?; + + if let Some(p_chunk) = helpers::parse_openai_stream_chunk(&chunk, &model, None) { + yield p_chunk?; + } + } + Ok(_) => continue, + Err(e) => { + // Attempt to probe for the actual error body + let probe_resp = probe_client + .post(&url) + .header("Authorization", format!("Bearer {}", api_key)) + .json(&probe_body) + .send() + .await; + + match probe_resp { + Ok(r) if !r.status().is_success() => { + let status = r.status(); + let error_body = r.text().await.unwrap_or_default(); + tracing::error!("OpenAI Stream Error Probe ({}): {}", status, error_body); + tracing::debug!("Offending OpenAI Request Body: {}", serde_json::to_string(&probe_body).unwrap_or_default()); + Err(AppError::ProviderError(format!("OpenAI API error ({}): {}", status, error_body)))?; + } + _ => { + Err(AppError::ProviderError(format!("Stream error: {}", e)))?; + } + } + } + } + } + }; + + Ok(Box::pin(stream)) } }