fix(openai): improve gpt-5.4 parallel tool call intercepting

- Implement cross-delta content buffering in streaming Responses API
- Wait for full 'tool_uses' JSON block before yielding to client
- Handle 'to=multi_tool_use.parallel' preamble by buffering
- Fix stream error probe to not request a new stream
- Remove raw JSON leakage from streaming content
This commit is contained in:
2026-03-18 15:04:15 +00:00
parent d0be16d8e3
commit 2e4318d84b

View File

@@ -704,6 +704,8 @@ impl super::Provider for OpenAIProvider {
let stream = async_stream::try_stream! { let stream = async_stream::try_stream! {
let mut es = es; let mut es = es;
let mut content_buffer = String::new();
while let Some(event) = es.next().await { while let Some(event) = es.next().await {
match event { match event {
Ok(reqwest_eventsource::Event::Message(msg)) => { Ok(reqwest_eventsource::Event::Message(msg)) => {
@@ -719,7 +721,6 @@ impl super::Provider for OpenAIProvider {
yield p_chunk?; yield p_chunk?;
} else { } else {
// Responses API specific parsing for streaming // Responses API specific parsing for streaming
let mut content = String::new();
let mut finish_reason = None; let mut finish_reason = None;
let mut tool_calls = None; let mut tool_calls = None;
@@ -728,7 +729,7 @@ impl super::Provider for OpenAIProvider {
match event_type { match event_type {
"response.output_text.delta" => { "response.output_text.delta" => {
if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) { if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
content.push_str(delta); content_buffer.push_str(delta);
} }
} }
"response.item.delta" => { "response.item.delta" => {
@@ -752,45 +753,38 @@ impl super::Provider for OpenAIProvider {
}]); }]);
} else if t == "message" { } else if t == "message" {
if let Some(text) = delta.get("text").and_then(|v| v.as_str()) { if let Some(text) = delta.get("text").and_then(|v| v.as_str()) {
content.push_str(text); content_buffer.push_str(text);
} }
} }
} }
} }
"response.output_text.done" | "response.item.done" => { "response.output_text.done" | "response.item.done" | "response.done" => {
finish_reason = Some("stop".to_string()); finish_reason = Some("stop".to_string());
} }
"response.done" => { _ => {}
finish_reason = Some("stop".to_string());
}
_ => {
// Fallback to older nested structure if present
if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) {
for out in output {
if let Some(contents) = out.get("content").and_then(|c| c.as_array()) {
for item in contents {
if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
content.push_str(text);
} else if let Some(delta) = item.get("delta").and_then(|d| d.get("text")).and_then(|t| t.as_str()) {
content.push_str(delta);
}
}
}
}
}
}
} }
// GPT-5.4 parallel tool calls might be embedded in content as a JSON block // Process content_buffer to extract embedded tool calls or yield text
let embedded_calls = Self::parse_tool_uses_json(&content); if !content_buffer.is_empty() {
// If we see the start of a tool call block, we wait for the full block
if content_buffer.contains("{\"tool_uses\":") {
let embedded_calls = Self::parse_tool_uses_json(&content_buffer);
if !embedded_calls.is_empty() { if !embedded_calls.is_empty() {
// Strip the JSON part from content to keep it clean if let Some(start) = content_buffer.find("{\"tool_uses\":") {
if let Some(start) = content.find("{\"tool_uses\":") { // Yield text before the JSON block
content = content[..start].to_string(); let preamble = content_buffer[..start].to_string();
if !preamble.is_empty() {
yield ProviderStreamChunk {
content: preamble,
reasoning_content: None,
finish_reason: None,
tool_calls: None,
model: model.clone(),
usage: None,
};
} }
// Convert ToolCall to ToolCallDelta for streaming // Yield the tool calls
let deltas: Vec<crate::models::ToolCallDelta> = embedded_calls.into_iter().enumerate().map(|(idx, tc)| { let deltas: Vec<crate::models::ToolCallDelta> = embedded_calls.into_iter().enumerate().map(|(idx, tc)| {
crate::models::ToolCallDelta { crate::models::ToolCallDelta {
index: idx as u32, index: idx as u32,
@@ -803,16 +797,57 @@ impl super::Provider for OpenAIProvider {
} }
}).collect(); }).collect();
if let Some(ref mut existing) = tool_calls { yield ProviderStreamChunk {
existing.extend(deltas); content: String::new(),
reasoning_content: None,
finish_reason: None,
tool_calls: Some(deltas),
model: model.clone(),
usage: None,
};
// Remove the processed part from buffer
// We need to find the end index correctly
let sub = &content_buffer[start..];
let mut brace_count = 0;
let mut end_idx = 0;
for (i, c) in sub.char_indices() {
if c == '{' { brace_count += 1; }
else if c == '}' {
brace_count -= 1;
if brace_count == 0 {
end_idx = start + i + 1;
break;
}
}
}
if end_idx > 0 {
content_buffer = content_buffer[end_idx..].to_string();
} else { } else {
tool_calls = Some(deltas); content_buffer.clear();
}
}
}
// If we have "{"tool_uses":" but no full block yet, we just wait (don't yield)
} else if content_buffer.contains("to=multi_tool_use.parallel") {
// Wait for the JSON block that usually follows
} else {
// Standard text, yield and clear buffer
let content = std::mem::take(&mut content_buffer);
yield ProviderStreamChunk {
content,
reasoning_content: None,
finish_reason: None,
tool_calls: None,
model: model.clone(),
usage: None,
};
} }
} }
if !content.is_empty() || finish_reason.is_some() || tool_calls.is_some() { if finish_reason.is_some() || tool_calls.is_some() {
yield ProviderStreamChunk { yield ProviderStreamChunk {
content, content: String::new(),
reasoning_content: None, reasoning_content: None,
finish_reason, finish_reason,
tool_calls, tool_calls,
@@ -825,11 +860,16 @@ impl super::Provider for OpenAIProvider {
Ok(_) => continue, Ok(_) => continue,
Err(e) => { Err(e) => {
// Attempt to probe for the actual error body // Attempt to probe for the actual error body
let mut probe_body_no_stream = probe_body.clone();
if let Some(obj) = probe_body_no_stream.as_object_mut() {
obj.remove("stream");
}
let probe_resp = probe_client let probe_resp = probe_client
.post(&url) .post(&url)
.header("Authorization", format!("Bearer {}", api_key)) .header("Authorization", format!("Bearer {}", api_key))
.header("Accept", "application/json") // Ask for JSON during probe .header("Accept", "application/json")
.json(&probe_body) .json(&probe_body_no_stream)
.send() .send()
.await; .await;