use anyhow::Result; use async_trait::async_trait; use futures::stream::BoxStream; use futures::StreamExt; use super::helpers; use super::{ProviderResponse, ProviderStreamChunk}; use crate::{config::AppConfig, errors::AppError, models::UnifiedRequest}; pub struct OpenAIProvider { client: reqwest::Client, config: crate::config::OpenAIConfig, api_key: String, pricing: Vec, } impl OpenAIProvider { pub fn new(config: &crate::config::OpenAIConfig, app_config: &AppConfig) -> Result { let api_key = app_config.get_api_key("openai")?; Self::new_with_key(config, app_config, api_key) } pub fn new_with_key(config: &crate::config::OpenAIConfig, app_config: &AppConfig, api_key: String) -> Result { let client = reqwest::Client::builder() .connect_timeout(std::time::Duration::from_secs(5)) .timeout(std::time::Duration::from_secs(300)) .pool_idle_timeout(std::time::Duration::from_secs(90)) .pool_max_idle_per_host(4) .tcp_keepalive(std::time::Duration::from_secs(15)) .build()?; Ok(Self { client, config: config.clone(), api_key, pricing: app_config.pricing.openai.clone(), }) } /// GPT-5.4 models sometimes emit parallel tool calls as a JSON block starting with /// '{"tool_uses":' inside a text message instead of discrete function_call items. /// This method attempts to extract and parse such tool calls. pub fn parse_tool_uses_json(text: &str) -> Vec { let mut calls = Vec::new(); if let Some(start) = text.find("{\"tool_uses\":") { // ... (rest of method unchanged) // Find the end of the JSON block by matching braces let sub = &text[start..]; let mut brace_count = 0; let mut end_idx = 0; let mut found = false; for (i, c) in sub.char_indices() { if c == '{' { brace_count += 1; } else if c == '}' { brace_count -= 1; if brace_count == 0 { end_idx = i + 1; found = true; break; } } } if found { let json_str = &sub[..end_idx]; if let Ok(val) = serde_json::from_str::(json_str) { if let Some(uses) = val.get("tool_uses").and_then(|u| u.as_array()) { for (idx, u) in uses.iter().enumerate() { let name = u.get("recipient_name") .and_then(|v| v.as_str()) .unwrap_or("unknown") // Strip "functions." prefix if present .replace("functions.", ""); let arguments = u.get("parameters") .map(|v| v.to_string()) .unwrap_or_else(|| "{}".to_string()); calls.push(crate::models::ToolCall { id: format!("call_tu_{}_{}", uuid::Uuid::new_v4().to_string()[..8].to_string(), idx), call_type: "function".to_string(), function: crate::models::FunctionCall { name, arguments }, }); } } } } } calls } /// Strips internal metadata prefixes like 'to=multi_tool_use.parallel' from model responses. pub fn strip_internal_metadata(text: &str) -> String { let mut result = text.to_string(); // Patterns to strip let patterns = [ "to=multi_tool_use.parallel", "to=functions.multi_tool_use", "ส่งเงินบาทไทยjson", // User reported Thai text preamble ]; for p in patterns { if let Some(start) = result.find(p) { // Remove the pattern result.replace_range(start..start + p.len(), ""); } } result } } #[async_trait] impl super::Provider for OpenAIProvider { fn name(&self) -> &str { "openai" } fn supports_model(&self, model: &str) -> bool { model.starts_with("gpt-") || model.starts_with("o1-") || model.starts_with("o2-") || model.starts_with("o3-") || model.starts_with("o4-") || model.starts_with("o5-") || model.contains("gpt-5") } fn supports_multimodal(&self) -> bool { true } async fn chat_completion(&self, request: UnifiedRequest) -> Result { // Allow proactive routing to Responses API based on heuristic let model_lc = request.model.to_lowercase(); if model_lc.contains("gpt-5") || model_lc.contains("codex") { return self.chat_responses(request).await; } let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let mut body = helpers::build_openai_body(&request, messages_json, false); // Transition: Newer OpenAI models (o1, o3, gpt-5) require max_completion_tokens // instead of the legacy max_tokens parameter. if request.model.starts_with("o1-") || request.model.starts_with("o3-") || request.model.contains("gpt-5") { if let Some(max_tokens) = body.as_object_mut().and_then(|obj| obj.remove("max_tokens")) { body["max_completion_tokens"] = max_tokens; } } let response = self .client .post(format!("{}/chat/completions", self.config.base_url)) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&body) .send() .await .map_err(|e| AppError::ProviderError(e.to_string()))?; if !response.status().is_success() { let status = response.status(); let error_text = response.text().await.unwrap_or_default(); // Read error body to diagnose. If the model requires the Responses // API (v1/responses), retry against that endpoint. if error_text.to_lowercase().contains("v1/responses") || error_text.to_lowercase().contains("only supported in v1/responses") { return self.chat_responses(request).await; } tracing::error!("OpenAI API error ({}): {}", status, error_text); return Err(AppError::ProviderError(format!("OpenAI API error ({}): {}", status, error_text))); } let resp_json: serde_json::Value = response .json() .await .map_err(|e| AppError::ProviderError(e.to_string()))?; helpers::parse_openai_response(&resp_json, request.model) } async fn chat_responses(&self, request: UnifiedRequest) -> Result { // Build a structured input for the Responses API. let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let mut input_parts = Vec::new(); for m in &messages_json { let role = m["role"].as_str().unwrap_or("user"); if role == "tool" { input_parts.push(serde_json::json!({ "type": "function_call_output", "call_id": m.get("tool_call_id").and_then(|v| v.as_str()).unwrap_or(""), "output": m.get("content").and_then(|v| v.as_str()).unwrap_or("") })); continue; } if role == "assistant" && m.get("tool_calls").is_some() { // Push message part if it exists let content_val = m.get("content").cloned().unwrap_or(serde_json::json!("")); if !content_val.is_null() && (content_val.is_array() && !content_val.as_array().unwrap().is_empty() || content_val.is_string() && !content_val.as_str().unwrap().is_empty()) { let mut content = content_val.clone(); if let Some(text) = content.as_str() { content = serde_json::json!([{ "type": "output_text", "text": text }]); } else if let Some(arr) = content.as_array_mut() { for part in arr { if let Some(obj) = part.as_object_mut() { if obj.get("type").and_then(|v| v.as_str()) == Some("text") { obj.insert("type".to_string(), serde_json::json!("output_text")); } } } } input_parts.push(serde_json::json!({ "type": "message", "role": "assistant", "content": content })); } // Push tool calls as separate items if let Some(tcs) = m.get("tool_calls").and_then(|v| v.as_array()) { for tc in tcs { input_parts.push(serde_json::json!({ "type": "function_call", "call_id": tc["id"], "name": tc["function"]["name"], "arguments": tc["function"]["arguments"] })); } } continue; } let mut mapped_role = role.to_string(); if mapped_role == "system" { mapped_role = "developer".to_string(); } let mut content = m.get("content").cloned().unwrap_or(serde_json::json!([])); // Map content types based on role for Responses API if let Some(content_array) = content.as_array_mut() { for part in content_array { if let Some(part_obj) = part.as_object_mut() { if let Some(t) = part_obj.get("type").and_then(|v| v.as_str()) { match t { "text" => { let new_type = if mapped_role == "assistant" { "output_text" } else { "input_text" }; part_obj.insert("type".to_string(), serde_json::json!(new_type)); } "image_url" => { let new_type = if mapped_role == "assistant" { "output_image" } else { "input_image" }; part_obj.insert("type".to_string(), serde_json::json!(new_type)); if let Some(img_url) = part_obj.remove("image_url") { part_obj.insert("image".to_string(), img_url); } } _ => {} } } } } } else if let Some(text) = content.as_str() { // If it's just a string, send it as a string instead of an array of objects // as it's safer for standard conversational messages. content = serde_json::json!(text); } let mut msg_item = serde_json::json!({ "type": "message", "role": mapped_role, "content": content }); if let Some(name) = m.get("name") { msg_item["name"] = name.clone(); } input_parts.push(msg_item); } let mut body = serde_json::json!({ "model": request.model, "input": input_parts, }); // Add standard parameters if let Some(temp) = request.temperature { body["temperature"] = serde_json::json!(temp); } // Newer models (gpt-5, o1) in Responses API use max_output_tokens if let Some(max_tokens) = request.max_tokens { if request.model.contains("gpt-5") || request.model.starts_with("o1-") || request.model.starts_with("o3-") { body["max_output_tokens"] = serde_json::json!(max_tokens); } else { body["max_tokens"] = serde_json::json!(max_tokens); } } if let Some(tools) = &request.tools { let flattened: Vec = tools.iter().map(|t| { let mut obj = serde_json::json!({ "type": t.tool_type, "name": t.function.name, }); if let Some(desc) = &t.function.description { obj["description"] = serde_json::json!(desc); } if let Some(params) = &t.function.parameters { obj["parameters"] = params.clone(); } obj }).collect(); body["tools"] = serde_json::json!(flattened); } if let Some(tool_choice) = &request.tool_choice { match tool_choice { crate::models::ToolChoice::Mode(mode) => { body["tool_choice"] = serde_json::json!(mode); } crate::models::ToolChoice::Specific(specific) => { body["tool_choice"] = serde_json::json!({ "type": specific.choice_type, "name": specific.function.name, }); } } } let resp = self .client .post(format!("{}/responses", self.config.base_url)) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&body) .send() .await .map_err(|e| AppError::ProviderError(e.to_string()))?; if !resp.status().is_success() { let err = resp.text().await.unwrap_or_default(); return Err(AppError::ProviderError(format!("OpenAI Responses API error: {}", err))); } let resp_json: serde_json::Value = resp.json().await.map_err(|e| AppError::ProviderError(e.to_string()))?; // Try to normalize: if it's chat-style, use existing parser if resp_json.get("choices").is_some() { return helpers::parse_openai_response(&resp_json, request.model); } // Normalize Responses API output into ProviderResponse let mut content_text = String::new(); let mut tool_calls = Vec::new(); if let Some(output) = resp_json.get("output").and_then(|o| o.as_array()) { for out in output { let item_type = out.get("type").and_then(|v| v.as_str()).unwrap_or(""); match item_type { "message" => { if let Some(contents) = out.get("content").and_then(|c| c.as_array()) { for item in contents { if let Some(text) = item.get("text").and_then(|t| t.as_str()) { if !content_text.is_empty() { content_text.push_str("\n"); } content_text.push_str(text); } } } } "function_call" => { let id = out.get("call_id") .or_else(|| out.get("item_id")) .or_else(|| out.get("id")) .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let name = out.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string(); let arguments = out.get("arguments").and_then(|v| v.as_str()).unwrap_or("").to_string(); tool_calls.push(crate::models::ToolCall { id, call_type: "function".to_string(), function: crate::models::FunctionCall { name, arguments }, }); } _ => { // Fallback for older/nested structure if let Some(contents) = out.get("content").and_then(|c| c.as_array()) { for item in contents { if let Some(text) = item.get("text").and_then(|t| t.as_str()) { if !content_text.is_empty() { content_text.push_str("\n"); } content_text.push_str(text); } } } } } } } if content_text.is_empty() { if let Some(cands) = resp_json.get("candidates").and_then(|c| c.as_array()) { if let Some(c0) = cands.get(0) { if let Some(content) = c0.get("content") { if let Some(parts) = content.get("parts").and_then(|p| p.as_array()) { for p in parts { if let Some(t) = p.get("text").and_then(|v| v.as_str()) { if !content_text.is_empty() { content_text.push_str("\n"); } content_text.push_str(t); } } } } } } } let prompt_tokens = resp_json.get("usage").and_then(|u| u.get("prompt_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; let completion_tokens = resp_json.get("usage").and_then(|u| u.get("completion_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; let total_tokens = resp_json.get("usage").and_then(|u| u.get("total_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; // GPT-5.4 parallel tool calls might be embedded in content_text as a JSON block let embedded_calls = Self::parse_tool_uses_json(&content_text); if !embedded_calls.is_empty() { // Strip the JSON part from content_text to keep it clean if let Some(start) = content_text.find("{\"tool_uses\":") { content_text = content_text[..start].to_string(); } tool_calls.extend(embedded_calls); } content_text = Self::strip_internal_metadata(&content_text); Ok(ProviderResponse { content: content_text, reasoning_content: None, tool_calls: if tool_calls.is_empty() { None } else { Some(tool_calls) }, prompt_tokens, completion_tokens, reasoning_tokens: 0, total_tokens, cache_read_tokens: 0, cache_write_tokens: 0, model: request.model, }) } fn estimate_tokens(&self, request: &UnifiedRequest) -> Result { Ok(crate::utils::tokens::estimate_request_tokens(&request.model, request)) } fn calculate_cost( &self, model: &str, prompt_tokens: u32, completion_tokens: u32, cache_read_tokens: u32, cache_write_tokens: u32, registry: &crate::models::registry::ModelRegistry, ) -> f64 { helpers::calculate_cost_with_registry( model, prompt_tokens, completion_tokens, cache_read_tokens, cache_write_tokens, registry, &self.pricing, 0.15, 0.60, ) } async fn chat_completion_stream( &self, request: UnifiedRequest, ) -> Result>, AppError> { // Allow proactive routing to Responses API based on heuristic let model_lc = request.model.to_lowercase(); if model_lc.contains("gpt-5") || model_lc.contains("codex") { return self.chat_responses_stream(request).await; } let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let mut body = helpers::build_openai_body(&request, messages_json, true); // Standard OpenAI cleanup if let Some(obj) = body.as_object_mut() { // stream_options.include_usage is supported by OpenAI for token usage in streaming // Transition: Newer OpenAI models (o1, o3, gpt-5) require max_completion_tokens if request.model.starts_with("o1-") || request.model.starts_with("o3-") || request.model.contains("gpt-5") { if let Some(max_tokens) = obj.remove("max_tokens") { obj.insert("max_completion_tokens".to_string(), max_tokens); } } } let url = format!("{}/chat/completions", self.config.base_url); let api_key = self.api_key.clone(); let probe_client = self.client.clone(); let probe_body = body.clone(); let model = request.model.clone(); let es = reqwest_eventsource::EventSource::new( self.client .post(&url) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&body), ) .map_err(|e| AppError::ProviderError(format!("Failed to create EventSource: {}", e)))?; let stream = async_stream::try_stream! { let mut es = es; while let Some(event) = es.next().await { match event { Ok(reqwest_eventsource::Event::Message(msg)) => { if msg.data == "[DONE]" { break; } let chunk: serde_json::Value = serde_json::from_str(&msg.data) .map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?; if let Some(p_chunk) = helpers::parse_openai_stream_chunk(&chunk, &model, None) { yield p_chunk?; } } Ok(_) => continue, Err(reqwest_eventsource::Error::StreamEnded) => break, Err(e) => { // Attempt to probe for the actual error body let probe_resp = probe_client .post(&url) .header("Authorization", format!("Bearer {}", api_key)) .json(&probe_body) .send() .await; match probe_resp { Ok(r) if !r.status().is_success() => { let status = r.status(); let error_body = r.text().await.unwrap_or_default(); tracing::error!("OpenAI Stream Error Probe ({}): {}", status, error_body); tracing::debug!("Offending OpenAI Request Body: {}", serde_json::to_string(&probe_body).unwrap_or_default()); Err(AppError::ProviderError(format!("OpenAI API error ({}): {}", status, error_body)))?; } Ok(_) => { // Probe returned success? This is unexpected if the original stream failed. Err(AppError::ProviderError(format!("Stream error (probe returned 200): {}", e)))?; } Err(probe_err) => { // Probe itself failed tracing::error!("OpenAI Stream Error Probe failed: {}", probe_err); Err(AppError::ProviderError(format!("Stream error (probe failed: {}): {}", probe_err, e)))?; } } } } } }; Ok(Box::pin(stream)) } async fn chat_responses_stream( &self, request: UnifiedRequest, ) -> Result>, AppError> { // Build a structured input for the Responses API. let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let mut input_parts = Vec::new(); for m in &messages_json { let role = m["role"].as_str().unwrap_or("user"); if role == "tool" { input_parts.push(serde_json::json!({ "type": "function_call_output", "call_id": m.get("tool_call_id").and_then(|v| v.as_str()).unwrap_or(""), "output": m.get("content").and_then(|v| v.as_str()).unwrap_or("") })); continue; } if role == "assistant" && m.get("tool_calls").is_some() { // Push message part if it exists let content_val = m.get("content").cloned().unwrap_or(serde_json::json!("")); if !content_val.is_null() && (content_val.is_array() && !content_val.as_array().unwrap().is_empty() || content_val.is_string() && !content_val.as_str().unwrap().is_empty()) { let mut content = content_val.clone(); if let Some(text) = content.as_str() { content = serde_json::json!([{ "type": "output_text", "text": text }]); } else if let Some(arr) = content.as_array_mut() { for part in arr { if let Some(obj) = part.as_object_mut() { if obj.get("type").and_then(|v| v.as_str()) == Some("text") { obj.insert("type".to_string(), serde_json::json!("output_text")); } } } } input_parts.push(serde_json::json!({ "type": "message", "role": "assistant", "content": content })); } // Push tool calls as separate items if let Some(tcs) = m.get("tool_calls").and_then(|v| v.as_array()) { for tc in tcs { input_parts.push(serde_json::json!({ "type": "function_call", "call_id": tc["id"], "name": tc["function"]["name"], "arguments": tc["function"]["arguments"] })); } } continue; } let mut mapped_role = role.to_string(); if mapped_role == "system" { mapped_role = "developer".to_string(); } let mut content = m.get("content").cloned().unwrap_or(serde_json::json!([])); // Map content types based on role for Responses API if let Some(content_array) = content.as_array_mut() { for part in content_array { if let Some(part_obj) = part.as_object_mut() { if let Some(t) = part_obj.get("type").and_then(|v| v.as_str()) { match t { "text" => { let new_type = if mapped_role == "assistant" { "output_text" } else { "input_text" }; part_obj.insert("type".to_string(), serde_json::json!(new_type)); } "image_url" => { let new_type = if mapped_role == "assistant" { "output_image" } else { "input_image" }; part_obj.insert("type".to_string(), serde_json::json!(new_type)); if let Some(img_url) = part_obj.remove("image_url") { part_obj.insert("image".to_string(), img_url); } } _ => {} } } } } } else if let Some(text) = content.as_str() { // If it's just a string, send it as a string instead of an array of objects // as it's safer for standard conversational messages. content = serde_json::json!(text); } let mut msg_item = serde_json::json!({ "type": "message", "role": mapped_role, "content": content }); if let Some(name) = m.get("name") { msg_item["name"] = name.clone(); } input_parts.push(msg_item); } let mut body = serde_json::json!({ "model": request.model, "input": input_parts, }); // Add standard parameters if let Some(temp) = request.temperature { body["temperature"] = serde_json::json!(temp); } // Newer models (gpt-5, o1) in Responses API use max_output_tokens if let Some(max_tokens) = request.max_tokens { if request.model.contains("gpt-5") || request.model.starts_with("o1-") || request.model.starts_with("o3-") { body["max_output_tokens"] = serde_json::json!(max_tokens); } else { body["max_tokens"] = serde_json::json!(max_tokens); } } if let Some(tools) = &request.tools { let flattened: Vec = tools.iter().map(|t| { let mut obj = serde_json::json!({ "type": t.tool_type, "name": t.function.name, }); if let Some(desc) = &t.function.description { obj["description"] = serde_json::json!(desc); } if let Some(params) = &t.function.parameters { obj["parameters"] = params.clone(); } obj }).collect(); body["tools"] = serde_json::json!(flattened); } if let Some(tool_choice) = &request.tool_choice { match tool_choice { crate::models::ToolChoice::Mode(mode) => { body["tool_choice"] = serde_json::json!(mode); } crate::models::ToolChoice::Specific(specific) => { body["tool_choice"] = serde_json::json!({ "type": specific.choice_type, "name": specific.function.name, }); } } } body["stream"] = serde_json::json!(true); let url = format!("{}/responses", self.config.base_url); let api_key = self.api_key.clone(); let model = request.model.clone(); let probe_client = self.client.clone(); let probe_body = body.clone(); let es = reqwest_eventsource::EventSource::new( self.client .post(&url) .header("Authorization", format!("Bearer {}", api_key)) .header("Accept", "text/event-stream") .json(&body), ) .map_err(|e| AppError::ProviderError(format!("Failed to create EventSource for Responses API: {}", e)))?; let stream = async_stream::try_stream! { let mut es = es; let mut content_buffer = String::new(); let mut has_tool_calls = false; let mut tool_index_map = std::collections::HashMap::::new(); let mut next_tool_index = 0u32; while let Some(event) = es.next().await { match event { Ok(reqwest_eventsource::Event::Message(msg)) => { if msg.data == "[DONE]" { break; } let chunk: serde_json::Value = serde_json::from_str(&msg.data) .map_err(|e| AppError::ProviderError(format!("Failed to parse Responses stream chunk: {}", e)))?; // Try standard OpenAI parsing first (choices/usage) if let Some(p_chunk) = helpers::parse_openai_stream_chunk(&chunk, &model, None) { yield p_chunk?; } else { // Responses API specific parsing for streaming let mut finish_reason = None; let mut tool_calls = None; let event_type = chunk.get("type").and_then(|v| v.as_str()).unwrap_or(""); match event_type { "response.output_text.delta" => { if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) { content_buffer.push_str(delta); } } "response.output_item.added" => { if let Some(item) = chunk.get("item") { if item.get("type").and_then(|v| v.as_str()) == Some("function_call") { has_tool_calls = true; let call_id = item.get("call_id").and_then(|v| v.as_str()); let name = item.get("name").and_then(|v| v.as_str()); let out_idx = chunk.get("output_index").and_then(|v| v.as_u64()).unwrap_or(0) as u32; let tc_idx = *tool_index_map.entry(out_idx).or_insert_with(|| { let i = next_tool_index; next_tool_index += 1; i }); tool_calls = Some(vec![crate::models::ToolCallDelta { index: tc_idx, id: call_id.map(|s| s.to_string()), call_type: Some("function".to_string()), function: Some(crate::models::FunctionCallDelta { name: name.map(|s| s.to_string()), arguments: Some("".to_string()), // Start with empty arguments }), }]); } } } "response.function_call_arguments.delta" => { if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) { has_tool_calls = true; let out_idx = chunk.get("output_index").and_then(|v| v.as_u64()).unwrap_or(0) as u32; let tc_idx = *tool_index_map.entry(out_idx).or_insert_with(|| { let i = next_tool_index; next_tool_index += 1; i }); tool_calls = Some(vec![crate::models::ToolCallDelta { index: tc_idx, id: None, call_type: None, function: Some(crate::models::FunctionCallDelta { name: None, arguments: Some(delta.to_string()), }), }]); } } "response.completed" => { finish_reason = Some(if has_tool_calls { "tool_calls".to_string() } else { "stop".to_string() }); } _ => {} } // Process content_buffer to extract embedded tool calls or yield text if !content_buffer.is_empty() { // If we see the start of a tool call block, we wait for the full block if content_buffer.contains("{\"tool_uses\":") { let embedded_calls = Self::parse_tool_uses_json(&content_buffer); if !embedded_calls.is_empty() { has_tool_calls = true; if let Some(start) = content_buffer.find("{\"tool_uses\":") { // Yield text before the JSON block let preamble = content_buffer[..start].to_string(); let stripped_preamble = Self::strip_internal_metadata(&preamble); if !stripped_preamble.is_empty() { yield ProviderStreamChunk { content: stripped_preamble, reasoning_content: None, finish_reason: None, tool_calls: None, model: model.clone(), usage: None, }; } // Yield the tool calls in two chunks to mimic standard streaming behavior // Chunk 1: Initialization (id, name) let init_deltas: Vec = embedded_calls.iter().map(|tc| { let tc_idx = next_tool_index; next_tool_index += 1; crate::models::ToolCallDelta { index: tc_idx, id: Some(tc.id.clone()), call_type: Some("function".to_string()), function: Some(crate::models::FunctionCallDelta { name: Some(tc.function.name.clone()), arguments: Some("".to_string()), }), } }).collect(); yield ProviderStreamChunk { content: String::new(), reasoning_content: None, finish_reason: None, tool_calls: Some(init_deltas), model: model.clone(), usage: None, }; // Chunk 2: Payload (arguments) // Reset temp index for payload chunk let mut payload_idx = next_tool_index - embedded_calls.len() as u32; let arg_deltas: Vec = embedded_calls.into_iter().map(|tc| { let tc_idx = payload_idx; payload_idx += 1; crate::models::ToolCallDelta { index: tc_idx, id: None, call_type: None, function: Some(crate::models::FunctionCallDelta { name: None, arguments: Some(tc.function.arguments), }), } }).collect(); yield ProviderStreamChunk { content: String::new(), reasoning_content: None, finish_reason: None, tool_calls: Some(arg_deltas), model: model.clone(), usage: None, }; // Remove the processed part from buffer // We need to find the end index correctly let sub = &content_buffer[start..]; let mut brace_count = 0; let mut end_idx = 0; for (i, c) in sub.char_indices() { if c == '{' { brace_count += 1; } else if c == '}' { brace_count -= 1; if brace_count == 0 { end_idx = start + i + 1; break; } } } if end_idx > 0 { content_buffer = content_buffer[end_idx..].to_string(); } else { content_buffer.clear(); } } } // If we have "{"tool_uses":" but no full block yet, we just wait (don't yield) } else if content_buffer.contains("to=multi_tool_use.parallel") { // Wait for the JSON block that usually follows } else { // Standard text, yield and clear buffer let content = std::mem::take(&mut content_buffer); let stripped_content = Self::strip_internal_metadata(&content); if !stripped_content.is_empty() { yield ProviderStreamChunk { content: stripped_content, reasoning_content: None, finish_reason: None, tool_calls: None, model: model.clone(), usage: None, }; } } } if finish_reason.is_some() || tool_calls.is_some() { yield ProviderStreamChunk { content: String::new(), reasoning_content: None, finish_reason, tool_calls, model: model.clone(), usage: None, }; } } } Ok(_) => continue, Err(reqwest_eventsource::Error::StreamEnded) => break, Err(e) => { tracing::error!("Responses stream encountered an error: {}", e); // Attempt to probe for the actual error body let mut probe_body_no_stream = probe_body.clone(); if let Some(obj) = probe_body_no_stream.as_object_mut() { obj.remove("stream"); } let probe_resp = probe_client .post(&url) .header("Authorization", format!("Bearer {}", api_key)) .header("Accept", "application/json") .json(&probe_body_no_stream) .send() .await; match probe_resp { Ok(r) => { let status = r.status(); let body = r.text().await.unwrap_or_default(); if status.is_success() { let preview = if body.len() > 500 { format!("{}...", &body[..500]) } else { body.clone() }; tracing::warn!("Responses stream ended prematurely but probe returned 200 OK. Body: {}", preview); Err(AppError::ProviderError(format!("Responses stream ended (server sent 200 OK with body: {})", preview)))?; } else { tracing::error!("OpenAI Responses Stream Error Probe ({}): {}", status, body); Err(AppError::ProviderError(format!("OpenAI Responses API error ({}): {}", status, body)))?; } } Err(probe_err) => { tracing::error!("OpenAI Responses Stream Error Probe failed: {}", probe_err); Err(AppError::ProviderError(format!("Responses stream error (probe failed: {}): {}", probe_err, e)))?; } } } } } // Final flush of content_buffer if not empty if !content_buffer.is_empty() { let stripped = Self::strip_internal_metadata(&content_buffer); if !stripped.is_empty() { yield ProviderStreamChunk { content: stripped, reasoning_content: None, finish_reason: None, tool_calls: None, model: model.clone(), usage: None, }; } } }; Ok(Box::pin(stream)) } }