From 2e4318d84b2e5ed574a767241f25f6568f1e78fa Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Wed, 18 Mar 2026 15:04:15 +0000 Subject: [PATCH] fix(openai): improve gpt-5.4 parallel tool call intercepting - Implement cross-delta content buffering in streaming Responses API - Wait for full 'tool_uses' JSON block before yielding to client - Handle 'to=multi_tool_use.parallel' preamble by buffering - Fix stream error probe to not request a new stream - Remove raw JSON leakage from streaming content --- src/providers/openai.rs | 138 ++++++++++++++++++++++++++-------------- 1 file changed, 89 insertions(+), 49 deletions(-) diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 9791f989..19c8c81c 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -704,6 +704,8 @@ impl super::Provider for OpenAIProvider { let stream = async_stream::try_stream! { let mut es = es; + let mut content_buffer = String::new(); + while let Some(event) = es.next().await { match event { Ok(reqwest_eventsource::Event::Message(msg)) => { @@ -719,7 +721,6 @@ impl super::Provider for OpenAIProvider { yield p_chunk?; } else { // Responses API specific parsing for streaming - let mut content = String::new(); let mut finish_reason = None; let mut tool_calls = None; @@ -728,7 +729,7 @@ impl super::Provider for OpenAIProvider { match event_type { "response.output_text.delta" => { if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) { - content.push_str(delta); + content_buffer.push_str(delta); } } "response.item.delta" => { @@ -752,67 +753,101 @@ impl super::Provider for OpenAIProvider { }]); } else if t == "message" { if let Some(text) = delta.get("text").and_then(|v| v.as_str()) { - content.push_str(text); + content_buffer.push_str(text); } } } } - "response.output_text.done" | "response.item.done" => { + "response.output_text.done" | "response.item.done" | "response.done" => { finish_reason = Some("stop".to_string()); } - "response.done" => { - finish_reason = Some("stop".to_string()); - } - _ => { - // Fallback to older nested structure if present - if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) { - for out in output { - if let Some(contents) = out.get("content").and_then(|c| c.as_array()) { - for item in contents { - if let Some(text) = item.get("text").and_then(|t| t.as_str()) { - content.push_str(text); - } else if let Some(delta) = item.get("delta").and_then(|d| d.get("text")).and_then(|t| t.as_str()) { - content.push_str(delta); + _ => {} + } + + // Process content_buffer to extract embedded tool calls or yield text + if !content_buffer.is_empty() { + // If we see the start of a tool call block, we wait for the full block + if content_buffer.contains("{\"tool_uses\":") { + let embedded_calls = Self::parse_tool_uses_json(&content_buffer); + if !embedded_calls.is_empty() { + if let Some(start) = content_buffer.find("{\"tool_uses\":") { + // Yield text before the JSON block + let preamble = content_buffer[..start].to_string(); + if !preamble.is_empty() { + yield ProviderStreamChunk { + content: preamble, + reasoning_content: None, + finish_reason: None, + tool_calls: None, + model: model.clone(), + usage: None, + }; + } + + // Yield the tool calls + let deltas: Vec = embedded_calls.into_iter().enumerate().map(|(idx, tc)| { + crate::models::ToolCallDelta { + index: idx as u32, + id: Some(tc.id), + call_type: Some("function".to_string()), + function: Some(crate::models::FunctionCallDelta { + name: Some(tc.function.name), + arguments: Some(tc.function.arguments), + }), + } + }).collect(); + + yield ProviderStreamChunk { + content: String::new(), + reasoning_content: None, + finish_reason: None, + tool_calls: Some(deltas), + model: model.clone(), + usage: None, + }; + + // Remove the processed part from buffer + // We need to find the end index correctly + let sub = &content_buffer[start..]; + let mut brace_count = 0; + let mut end_idx = 0; + for (i, c) in sub.char_indices() { + if c == '{' { brace_count += 1; } + else if c == '}' { + brace_count -= 1; + if brace_count == 0 { + end_idx = start + i + 1; + break; } } } + if end_idx > 0 { + content_buffer = content_buffer[end_idx..].to_string(); + } else { + content_buffer.clear(); + } } } - } - } - - // GPT-5.4 parallel tool calls might be embedded in content as a JSON block - let embedded_calls = Self::parse_tool_uses_json(&content); - - if !embedded_calls.is_empty() { - // Strip the JSON part from content to keep it clean - if let Some(start) = content.find("{\"tool_uses\":") { - content = content[..start].to_string(); - } - - // Convert ToolCall to ToolCallDelta for streaming - let deltas: Vec = embedded_calls.into_iter().enumerate().map(|(idx, tc)| { - crate::models::ToolCallDelta { - index: idx as u32, - id: Some(tc.id), - call_type: Some("function".to_string()), - function: Some(crate::models::FunctionCallDelta { - name: Some(tc.function.name), - arguments: Some(tc.function.arguments), - }), - } - }).collect(); - - if let Some(ref mut existing) = tool_calls { - existing.extend(deltas); + // If we have "{"tool_uses":" but no full block yet, we just wait (don't yield) + } else if content_buffer.contains("to=multi_tool_use.parallel") { + // Wait for the JSON block that usually follows } else { - tool_calls = Some(deltas); + // Standard text, yield and clear buffer + let content = std::mem::take(&mut content_buffer); + yield ProviderStreamChunk { + content, + reasoning_content: None, + finish_reason: None, + tool_calls: None, + model: model.clone(), + usage: None, + }; } } - if !content.is_empty() || finish_reason.is_some() || tool_calls.is_some() { + if finish_reason.is_some() || tool_calls.is_some() { yield ProviderStreamChunk { - content, + content: String::new(), reasoning_content: None, finish_reason, tool_calls, @@ -825,11 +860,16 @@ impl super::Provider for OpenAIProvider { Ok(_) => continue, Err(e) => { // Attempt to probe for the actual error body + let mut probe_body_no_stream = probe_body.clone(); + if let Some(obj) = probe_body_no_stream.as_object_mut() { + obj.remove("stream"); + } + let probe_resp = probe_client .post(&url) .header("Authorization", format!("Bearer {}", api_key)) - .header("Accept", "application/json") // Ask for JSON during probe - .json(&probe_body) + .header("Accept", "application/json") + .json(&probe_body_no_stream) .send() .await;