Compare commits

...

10 Commits

Author SHA1 Message Date
79dc8fe409 fix(openai): correctly parse Responses API tool call events
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- The Responses API does not use 'response.item.delta' for tool calls.
- It uses 'response.output_item.added' to initialize the function call.
- It uses 'response.function_call_arguments.delta' for the payload stream.
- Updated the streaming parser to catch these events and correctly yield ToolCallDelta objects.
- This restores proper streaming of tool calls back to the client.
2026-03-18 16:13:13 +00:00
24a898c9a7 fix(openai): gracefully handle stream endings
- The Responses API ends streams without a final '[DONE]' message.
- This causes reqwest_eventsource to return Error::StreamEnded.
- Previously, this was treated as a premature termination, triggering an error probe.
- We now explicitly match and break on Err(StreamEnded) for normal completion.
2026-03-18 15:39:18 +00:00
7c2a317c01 fix(openai): add missing stream parameter for Responses API
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- The OpenAI Responses API actually requires the 'stream: true'
parameter in the JSON body, contrary to some documentation summaries.
- Omitting it caused the API to return a full application/json
response instead of SSE text/event-stream, leading to stream failures
and probe warnings in the proxy logs.
2026-03-18 15:32:08 +00:00
cb619f9286 fix(openai): improve Responses API stream robustness and diagnostics
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- Implement final buffer flush in streaming path to prevent data loss
- Increase probe response body logging to 500 characters
- Ensure internal metadata is stripped even on final flush
- Fix potential hang when stream ends without explicit [DONE] event
2026-03-18 15:17:56 +00:00
441270317c fix(openai): strip internal metadata from gpt-5.4 responses
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- Add strip_internal_metadata helper to remove prefixes like 'to=multi_tool_use.parallel'
- Clean up Thai text preambles reported in the journal
- Apply metadata stripping to both synchronous and streaming response paths
- Improve visual quality of proxied model responses
2026-03-18 15:07:17 +00:00
2e4318d84b fix(openai): improve gpt-5.4 parallel tool call intercepting
- Implement cross-delta content buffering in streaming Responses API
- Wait for full 'tool_uses' JSON block before yielding to client
- Handle 'to=multi_tool_use.parallel' preamble by buffering
- Fix stream error probe to not request a new stream
- Remove raw JSON leakage from streaming content
2026-03-18 15:04:15 +00:00
d0be16d8e3 fix(openai): parse embedded 'tool_uses' JSON for gpt-5.4 parallel calls
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- Add static parse_tool_uses_json helper to extract embedded tool calls
- Update synchronous and streaming Responses API parsers to detect tool_uses blocks
- Strip tool_uses JSON from content to prevent raw JSON leakage to client
- Resolve lifetime issues by avoiding &self capture in streaming closure
2026-03-18 14:28:38 +00:00
83e0ad0240 fix(openai): flatten tools and tool_choice for Responses API
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- Map nested 'function' object to top-level fields
- Support string and object-based 'tool_choice' formats
- Fix 400 Bad Request 'Missing required parameter: tools[0].name'
2026-03-18 14:00:49 +00:00
275ce34d05 fix(openai): fix missing tools and instructions in Responses API
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- Add 'tools' and 'tool_choice' parameters to streaming Responses API
- Include 'name' field in message items for Responses API input
- Use string content for text-only messages to improve instruction following
- Fix subagents not triggering and files not being created
2026-03-18 13:51:36 +00:00
cb5b921550 feat(openai): implement tool support for gpt-5.4 via Responses API
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- Implement polymorphic 'input' structure for /responses endpoint
- Map 'tool' role to 'function_call_output' items
- Handle assistant 'tool_calls' as separate 'function_call' items
- Add synchronous and streaming parsers for function_call items
- Fix 400 Bad Request 'Invalid value: tool' error
2026-03-18 13:14:51 +00:00

View File

@@ -36,6 +36,79 @@ impl OpenAIProvider {
pricing: app_config.pricing.openai.clone(), pricing: app_config.pricing.openai.clone(),
}) })
} }
/// GPT-5.4 models sometimes emit parallel tool calls as a JSON block starting with
/// '{"tool_uses":' inside a text message instead of discrete function_call items.
/// This method attempts to extract and parse such tool calls.
pub fn parse_tool_uses_json(text: &str) -> Vec<crate::models::ToolCall> {
let mut calls = Vec::new();
if let Some(start) = text.find("{\"tool_uses\":") {
// ... (rest of method unchanged)
// Find the end of the JSON block by matching braces
let sub = &text[start..];
let mut brace_count = 0;
let mut end_idx = 0;
let mut found = false;
for (i, c) in sub.char_indices() {
if c == '{' { brace_count += 1; }
else if c == '}' {
brace_count -= 1;
if brace_count == 0 {
end_idx = i + 1;
found = true;
break;
}
}
}
if found {
let json_str = &sub[..end_idx];
if let Ok(val) = serde_json::from_str::<serde_json::Value>(json_str) {
if let Some(uses) = val.get("tool_uses").and_then(|u| u.as_array()) {
for (idx, u) in uses.iter().enumerate() {
let name = u.get("recipient_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
// Strip "functions." prefix if present
.replace("functions.", "");
let arguments = u.get("parameters")
.map(|v| v.to_string())
.unwrap_or_else(|| "{}".to_string());
calls.push(crate::models::ToolCall {
id: format!("call_tu_{}_{}", uuid::Uuid::new_v4().to_string()[..8].to_string(), idx),
call_type: "function".to_string(),
function: crate::models::FunctionCall { name, arguments },
});
}
}
}
}
}
calls
}
/// Strips internal metadata prefixes like 'to=multi_tool_use.parallel' from model responses.
pub fn strip_internal_metadata(text: &str) -> String {
let mut result = text.to_string();
// Patterns to strip
let patterns = [
"to=multi_tool_use.parallel",
"to=functions.multi_tool_use",
"ส่งเงินบาทไทยjson", // User reported Thai text preamble
];
for p in patterns {
if let Some(start) = result.find(p) {
// Remove the pattern and any whitespace around it
result.replace_range(start..start + p.len(), "");
}
}
result.trim().to_string()
}
} }
#[async_trait] #[async_trait]
@@ -112,10 +185,57 @@ impl super::Provider for OpenAIProvider {
let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let messages_json = helpers::messages_to_openai_json(&request.messages).await?;
let mut input_parts = Vec::new(); let mut input_parts = Vec::new();
for m in &messages_json { for m in &messages_json {
let mut role = m["role"].as_str().unwrap_or("user").to_string(); let role = m["role"].as_str().unwrap_or("user");
// Newer models (gpt-5, o1) prefer "developer" over "system"
if role == "system" { if role == "tool" {
role = "developer".to_string(); input_parts.push(serde_json::json!({
"type": "function_call_output",
"call_id": m.get("tool_call_id").and_then(|v| v.as_str()).unwrap_or(""),
"output": m.get("content").and_then(|v| v.as_str()).unwrap_or("")
}));
continue;
}
if role == "assistant" && m.get("tool_calls").is_some() {
// Push message part if it exists
let content_val = m.get("content").cloned().unwrap_or(serde_json::json!(""));
if !content_val.is_null() && (content_val.is_array() && !content_val.as_array().unwrap().is_empty() || content_val.is_string() && !content_val.as_str().unwrap().is_empty()) {
let mut content = content_val.clone();
if let Some(text) = content.as_str() {
content = serde_json::json!([{ "type": "output_text", "text": text }]);
} else if let Some(arr) = content.as_array_mut() {
for part in arr {
if let Some(obj) = part.as_object_mut() {
if obj.get("type").and_then(|v| v.as_str()) == Some("text") {
obj.insert("type".to_string(), serde_json::json!("output_text"));
}
}
}
}
input_parts.push(serde_json::json!({
"type": "message",
"role": "assistant",
"content": content
}));
}
// Push tool calls as separate items
if let Some(tcs) = m.get("tool_calls").and_then(|v| v.as_array()) {
for tc in tcs {
input_parts.push(serde_json::json!({
"type": "function_call",
"call_id": tc["id"],
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}));
}
}
continue;
}
let mut mapped_role = role.to_string();
if mapped_role == "system" {
mapped_role = "developer".to_string();
} }
let mut content = m.get("content").cloned().unwrap_or(serde_json::json!([])); let mut content = m.get("content").cloned().unwrap_or(serde_json::json!([]));
@@ -127,12 +247,11 @@ impl super::Provider for OpenAIProvider {
if let Some(t) = part_obj.get("type").and_then(|v| v.as_str()) { if let Some(t) = part_obj.get("type").and_then(|v| v.as_str()) {
match t { match t {
"text" => { "text" => {
let new_type = if role == "assistant" { "output_text" } else { "input_text" }; let new_type = if mapped_role == "assistant" { "output_text" } else { "input_text" };
part_obj.insert("type".to_string(), serde_json::json!(new_type)); part_obj.insert("type".to_string(), serde_json::json!(new_type));
} }
"image_url" => { "image_url" => {
// Assistant typically doesn't have image_url in history this way, but for safety: let new_type = if mapped_role == "assistant" { "output_image" } else { "input_image" };
let new_type = if role == "assistant" { "output_image" } else { "input_image" };
part_obj.insert("type".to_string(), serde_json::json!(new_type)); part_obj.insert("type".to_string(), serde_json::json!(new_type));
if let Some(img_url) = part_obj.remove("image_url") { if let Some(img_url) = part_obj.remove("image_url") {
part_obj.insert("image".to_string(), img_url); part_obj.insert("image".to_string(), img_url);
@@ -144,14 +263,20 @@ impl super::Provider for OpenAIProvider {
} }
} }
} else if let Some(text) = content.as_str() { } else if let Some(text) = content.as_str() {
let new_type = if role == "assistant" { "output_text" } else { "input_text" }; // If it's just a string, send it as a string instead of an array of objects
content = serde_json::json!([{ "type": new_type, "text": text }]); // as it's safer for standard conversational messages.
content = serde_json::json!(text);
} }
input_parts.push(serde_json::json!({ let mut msg_item = serde_json::json!({
"role": role, "type": "message",
"role": mapped_role,
"content": content "content": content
})); });
if let Some(name) = m.get("name") {
msg_item["name"] = name.clone();
}
input_parts.push(msg_item);
} }
let mut body = serde_json::json!({ let mut body = serde_json::json!({
@@ -174,7 +299,33 @@ impl super::Provider for OpenAIProvider {
} }
if let Some(tools) = &request.tools { if let Some(tools) = &request.tools {
body["tools"] = serde_json::json!(tools); let flattened: Vec<serde_json::Value> = tools.iter().map(|t| {
let mut obj = serde_json::json!({
"type": t.tool_type,
"name": t.function.name,
});
if let Some(desc) = &t.function.description {
obj["description"] = serde_json::json!(desc);
}
if let Some(params) = &t.function.parameters {
obj["parameters"] = params.clone();
}
obj
}).collect();
body["tools"] = serde_json::json!(flattened);
}
if let Some(tool_choice) = &request.tool_choice {
match tool_choice {
crate::models::ToolChoice::Mode(mode) => {
body["tool_choice"] = serde_json::json!(mode);
}
crate::models::ToolChoice::Specific(specific) => {
body["tool_choice"] = serde_json::json!({
"type": specific.choice_type,
"name": specific.function.name,
});
}
}
} }
let resp = self let resp = self
@@ -200,18 +351,43 @@ impl super::Provider for OpenAIProvider {
// Normalize Responses API output into ProviderResponse // Normalize Responses API output into ProviderResponse
let mut content_text = String::new(); let mut content_text = String::new();
let mut tool_calls = Vec::new();
if let Some(output) = resp_json.get("output").and_then(|o| o.as_array()) { if let Some(output) = resp_json.get("output").and_then(|o| o.as_array()) {
for out in output { for out in output {
if let Some(contents) = out.get("content").and_then(|c| c.as_array()) { let item_type = out.get("type").and_then(|v| v.as_str()).unwrap_or("");
for item in contents { match item_type {
if let Some(text) = item.get("text").and_then(|t| t.as_str()) { "message" => {
if !content_text.is_empty() { content_text.push_str("\n"); } if let Some(contents) = out.get("content").and_then(|c| c.as_array()) {
content_text.push_str(text); for item in contents {
} else if let Some(parts) = item.get("parts").and_then(|p| p.as_array()) { if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
for p in parts {
if let Some(t) = p.as_str() {
if !content_text.is_empty() { content_text.push_str("\n"); } if !content_text.is_empty() { content_text.push_str("\n"); }
content_text.push_str(t); content_text.push_str(text);
}
}
}
}
"function_call" => {
let id = out.get("call_id")
.or_else(|| out.get("item_id"))
.or_else(|| out.get("id"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let name = out.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string();
let arguments = out.get("arguments").and_then(|v| v.as_str()).unwrap_or("").to_string();
tool_calls.push(crate::models::ToolCall {
id,
call_type: "function".to_string(),
function: crate::models::FunctionCall { name, arguments },
});
}
_ => {
// Fallback for older/nested structure
if let Some(contents) = out.get("content").and_then(|c| c.as_array()) {
for item in contents {
if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
if !content_text.is_empty() { content_text.push_str("\n"); }
content_text.push_str(text);
} }
} }
} }
@@ -241,10 +417,22 @@ impl super::Provider for OpenAIProvider {
let completion_tokens = resp_json.get("usage").and_then(|u| u.get("completion_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; let completion_tokens = resp_json.get("usage").and_then(|u| u.get("completion_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let total_tokens = resp_json.get("usage").and_then(|u| u.get("total_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; let total_tokens = resp_json.get("usage").and_then(|u| u.get("total_tokens")).and_then(|v| v.as_u64()).unwrap_or(0) as u32;
// GPT-5.4 parallel tool calls might be embedded in content_text as a JSON block
let embedded_calls = Self::parse_tool_uses_json(&content_text);
if !embedded_calls.is_empty() {
// Strip the JSON part from content_text to keep it clean
if let Some(start) = content_text.find("{\"tool_uses\":") {
content_text = content_text[..start].to_string();
}
tool_calls.extend(embedded_calls);
}
content_text = Self::strip_internal_metadata(&content_text);
Ok(ProviderResponse { Ok(ProviderResponse {
content: content_text, content: content_text,
reasoning_content: None, reasoning_content: None,
tool_calls: None, tool_calls: if tool_calls.is_empty() { None } else { Some(tool_calls) },
prompt_tokens, prompt_tokens,
completion_tokens, completion_tokens,
reasoning_tokens: 0, reasoning_tokens: 0,
@@ -336,6 +524,7 @@ impl super::Provider for OpenAIProvider {
} }
} }
Ok(_) => continue, Ok(_) => continue,
Err(reqwest_eventsource::Error::StreamEnded) => break,
Err(e) => { Err(e) => {
// Attempt to probe for the actual error body // Attempt to probe for the actual error body
let probe_resp = probe_client let probe_resp = probe_client
@@ -379,10 +568,57 @@ impl super::Provider for OpenAIProvider {
let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let messages_json = helpers::messages_to_openai_json(&request.messages).await?;
let mut input_parts = Vec::new(); let mut input_parts = Vec::new();
for m in &messages_json { for m in &messages_json {
let mut role = m["role"].as_str().unwrap_or("user").to_string(); let role = m["role"].as_str().unwrap_or("user");
// Newer models (gpt-5, o1) prefer "developer" over "system"
if role == "system" { if role == "tool" {
role = "developer".to_string(); input_parts.push(serde_json::json!({
"type": "function_call_output",
"call_id": m.get("tool_call_id").and_then(|v| v.as_str()).unwrap_or(""),
"output": m.get("content").and_then(|v| v.as_str()).unwrap_or("")
}));
continue;
}
if role == "assistant" && m.get("tool_calls").is_some() {
// Push message part if it exists
let content_val = m.get("content").cloned().unwrap_or(serde_json::json!(""));
if !content_val.is_null() && (content_val.is_array() && !content_val.as_array().unwrap().is_empty() || content_val.is_string() && !content_val.as_str().unwrap().is_empty()) {
let mut content = content_val.clone();
if let Some(text) = content.as_str() {
content = serde_json::json!([{ "type": "output_text", "text": text }]);
} else if let Some(arr) = content.as_array_mut() {
for part in arr {
if let Some(obj) = part.as_object_mut() {
if obj.get("type").and_then(|v| v.as_str()) == Some("text") {
obj.insert("type".to_string(), serde_json::json!("output_text"));
}
}
}
}
input_parts.push(serde_json::json!({
"type": "message",
"role": "assistant",
"content": content
}));
}
// Push tool calls as separate items
if let Some(tcs) = m.get("tool_calls").and_then(|v| v.as_array()) {
for tc in tcs {
input_parts.push(serde_json::json!({
"type": "function_call",
"call_id": tc["id"],
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}));
}
}
continue;
}
let mut mapped_role = role.to_string();
if mapped_role == "system" {
mapped_role = "developer".to_string();
} }
let mut content = m.get("content").cloned().unwrap_or(serde_json::json!([])); let mut content = m.get("content").cloned().unwrap_or(serde_json::json!([]));
@@ -394,12 +630,11 @@ impl super::Provider for OpenAIProvider {
if let Some(t) = part_obj.get("type").and_then(|v| v.as_str()) { if let Some(t) = part_obj.get("type").and_then(|v| v.as_str()) {
match t { match t {
"text" => { "text" => {
let new_type = if role == "assistant" { "output_text" } else { "input_text" }; let new_type = if mapped_role == "assistant" { "output_text" } else { "input_text" };
part_obj.insert("type".to_string(), serde_json::json!(new_type)); part_obj.insert("type".to_string(), serde_json::json!(new_type));
} }
"image_url" => { "image_url" => {
// Assistant typically doesn't have image_url in history this way, but for safety: let new_type = if mapped_role == "assistant" { "output_image" } else { "input_image" };
let new_type = if role == "assistant" { "output_image" } else { "input_image" };
part_obj.insert("type".to_string(), serde_json::json!(new_type)); part_obj.insert("type".to_string(), serde_json::json!(new_type));
if let Some(img_url) = part_obj.remove("image_url") { if let Some(img_url) = part_obj.remove("image_url") {
part_obj.insert("image".to_string(), img_url); part_obj.insert("image".to_string(), img_url);
@@ -411,20 +646,25 @@ impl super::Provider for OpenAIProvider {
} }
} }
} else if let Some(text) = content.as_str() { } else if let Some(text) = content.as_str() {
let new_type = if role == "assistant" { "output_text" } else { "input_text" }; // If it's just a string, send it as a string instead of an array of objects
content = serde_json::json!([{ "type": new_type, "text": text }]); // as it's safer for standard conversational messages.
content = serde_json::json!(text);
} }
input_parts.push(serde_json::json!({ let mut msg_item = serde_json::json!({
"role": role, "type": "message",
"role": mapped_role,
"content": content "content": content
})); });
if let Some(name) = m.get("name") {
msg_item["name"] = name.clone();
}
input_parts.push(msg_item);
} }
let mut body = serde_json::json!({ let mut body = serde_json::json!({
"model": request.model, "model": request.model,
"input": input_parts, "input": input_parts,
"stream": true,
}); });
// Add standard parameters // Add standard parameters
@@ -441,6 +681,37 @@ impl super::Provider for OpenAIProvider {
} }
} }
if let Some(tools) = &request.tools {
let flattened: Vec<serde_json::Value> = tools.iter().map(|t| {
let mut obj = serde_json::json!({
"type": t.tool_type,
"name": t.function.name,
});
if let Some(desc) = &t.function.description {
obj["description"] = serde_json::json!(desc);
}
if let Some(params) = &t.function.parameters {
obj["parameters"] = params.clone();
}
obj
}).collect();
body["tools"] = serde_json::json!(flattened);
}
if let Some(tool_choice) = &request.tool_choice {
match tool_choice {
crate::models::ToolChoice::Mode(mode) => {
body["tool_choice"] = serde_json::json!(mode);
}
crate::models::ToolChoice::Specific(specific) => {
body["tool_choice"] = serde_json::json!({
"type": specific.choice_type,
"name": specific.function.name,
});
}
}
}
body["stream"] = serde_json::json!(true);
let url = format!("{}/responses", self.config.base_url); let url = format!("{}/responses", self.config.base_url);
let api_key = self.api_key.clone(); let api_key = self.api_key.clone();
let model = request.model.clone(); let model = request.model.clone();
@@ -458,6 +729,8 @@ impl super::Provider for OpenAIProvider {
let stream = async_stream::try_stream! { let stream = async_stream::try_stream! {
let mut es = es; let mut es = es;
let mut content_buffer = String::new();
while let Some(event) = es.next().await { while let Some(event) = es.next().await {
match event { match event {
Ok(reqwest_eventsource::Event::Message(msg)) => { Ok(reqwest_eventsource::Event::Message(msg)) => {
@@ -473,53 +746,146 @@ impl super::Provider for OpenAIProvider {
yield p_chunk?; yield p_chunk?;
} else { } else {
// Responses API specific parsing for streaming // Responses API specific parsing for streaming
let mut content = String::new();
let mut finish_reason = None; let mut finish_reason = None;
let mut tool_calls = None;
let event_type = chunk.get("type").and_then(|v| v.as_str()).unwrap_or(""); let event_type = chunk.get("type").and_then(|v| v.as_str()).unwrap_or("");
match event_type { match event_type {
"response.output_text.delta" => { "response.output_text.delta" => {
if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) { if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
content.push_str(delta); content_buffer.push_str(delta);
} }
} }
"response.output_text.done" => { "response.output_item.added" => {
if let Some(text) = chunk.get("text").and_then(|v| v.as_str()) { if let Some(item) = chunk.get("item") {
// Some implementations send the full text at the end if item.get("type").and_then(|v| v.as_str()) == Some("function_call") {
// We usually prefer deltas, but if we haven't seen them, this is the fallback. let call_id = item.get("call_id").and_then(|v| v.as_str());
// However, if we're already yielding deltas, we might not want this. let name = item.get("name").and_then(|v| v.as_str());
// For now, let's just use it as a signal that we're done.
finish_reason = Some("stop".to_string()); tool_calls = Some(vec![crate::models::ToolCallDelta {
index: chunk.get("output_index").and_then(|v| v.as_u64()).unwrap_or(0) as u32,
id: call_id.map(|s| s.to_string()),
call_type: Some("function".to_string()),
function: Some(crate::models::FunctionCallDelta {
name: name.map(|s| s.to_string()),
arguments: Some("".to_string()), // Start with empty arguments
}),
}]);
}
} }
} }
"response.done" => { "response.function_call_arguments.delta" => {
if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
tool_calls = Some(vec![crate::models::ToolCallDelta {
index: chunk.get("output_index").and_then(|v| v.as_u64()).unwrap_or(0) as u32,
id: None,
call_type: None,
function: Some(crate::models::FunctionCallDelta {
name: None,
arguments: Some(delta.to_string()),
}),
}]);
}
}
"response.output_text.done" | "response.item.done" | "response.completed" => {
finish_reason = Some("stop".to_string()); finish_reason = Some("stop".to_string());
} }
_ => { _ => {}
// Fallback to older nested structure if present }
if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) {
for out in output { // Process content_buffer to extract embedded tool calls or yield text
if let Some(contents) = out.get("content").and_then(|c| c.as_array()) { if !content_buffer.is_empty() {
for item in contents { // If we see the start of a tool call block, we wait for the full block
if let Some(text) = item.get("text").and_then(|t| t.as_str()) { if content_buffer.contains("{\"tool_uses\":") {
content.push_str(text); let embedded_calls = Self::parse_tool_uses_json(&content_buffer);
} else if let Some(delta) = item.get("delta").and_then(|d| d.get("text")).and_then(|t| t.as_str()) { if !embedded_calls.is_empty() {
content.push_str(delta); if let Some(start) = content_buffer.find("{\"tool_uses\":") {
// Yield text before the JSON block
let preamble = content_buffer[..start].to_string();
let stripped_preamble = Self::strip_internal_metadata(&preamble);
if !stripped_preamble.is_empty() {
yield ProviderStreamChunk {
content: stripped_preamble,
reasoning_content: None,
finish_reason: None,
tool_calls: None,
model: model.clone(),
usage: None,
};
}
// Yield the tool calls
// ... (rest of tool call yielding unchanged)
let deltas: Vec<crate::models::ToolCallDelta> = embedded_calls.into_iter().enumerate().map(|(idx, tc)| {
crate::models::ToolCallDelta {
index: idx as u32,
id: Some(tc.id),
call_type: Some("function".to_string()),
function: Some(crate::models::FunctionCallDelta {
name: Some(tc.function.name),
arguments: Some(tc.function.arguments),
}),
}
}).collect();
yield ProviderStreamChunk {
content: String::new(),
reasoning_content: None,
finish_reason: None,
tool_calls: Some(deltas),
model: model.clone(),
usage: None,
};
// Remove the processed part from buffer
// We need to find the end index correctly
let sub = &content_buffer[start..];
let mut brace_count = 0;
let mut end_idx = 0;
for (i, c) in sub.char_indices() {
if c == '{' { brace_count += 1; }
else if c == '}' {
brace_count -= 1;
if brace_count == 0 {
end_idx = start + i + 1;
break;
} }
} }
} }
if end_idx > 0 {
content_buffer = content_buffer[end_idx..].to_string();
} else {
content_buffer.clear();
}
} }
} }
// If we have "{"tool_uses":" but no full block yet, we just wait (don't yield)
} else if content_buffer.contains("to=multi_tool_use.parallel") {
// Wait for the JSON block that usually follows
} else {
// Standard text, yield and clear buffer
let content = std::mem::take(&mut content_buffer);
let stripped_content = Self::strip_internal_metadata(&content);
if !stripped_content.is_empty() {
yield ProviderStreamChunk {
content: stripped_content,
reasoning_content: None,
finish_reason: None,
tool_calls: None,
model: model.clone(),
usage: None,
};
}
} }
} }
if !content.is_empty() || finish_reason.is_some() { if finish_reason.is_some() || tool_calls.is_some() {
yield ProviderStreamChunk { yield ProviderStreamChunk {
content, content: String::new(),
reasoning_content: None, reasoning_content: None,
finish_reason, finish_reason,
tool_calls: None, tool_calls,
model: model.clone(), model: model.clone(),
usage: None, usage: None,
}; };
@@ -527,13 +893,20 @@ impl super::Provider for OpenAIProvider {
} }
} }
Ok(_) => continue, Ok(_) => continue,
Err(reqwest_eventsource::Error::StreamEnded) => break,
Err(e) => { Err(e) => {
tracing::error!("Responses stream encountered an error: {}", e);
// Attempt to probe for the actual error body // Attempt to probe for the actual error body
let mut probe_body_no_stream = probe_body.clone();
if let Some(obj) = probe_body_no_stream.as_object_mut() {
obj.remove("stream");
}
let probe_resp = probe_client let probe_resp = probe_client
.post(&url) .post(&url)
.header("Authorization", format!("Bearer {}", api_key)) .header("Authorization", format!("Bearer {}", api_key))
.header("Accept", "application/json") // Ask for JSON during probe .header("Accept", "application/json")
.json(&probe_body) .json(&probe_body_no_stream)
.send() .send()
.await; .await;
@@ -542,8 +915,9 @@ impl super::Provider for OpenAIProvider {
let status = r.status(); let status = r.status();
let body = r.text().await.unwrap_or_default(); let body = r.text().await.unwrap_or_default();
if status.is_success() { if status.is_success() {
tracing::warn!("Responses stream ended prematurely but probe returned 200 OK. Body: {}", body); let preview = if body.len() > 500 { format!("{}...", &body[..500]) } else { body.clone() };
Err(AppError::ProviderError(format!("Responses stream ended (server sent 200 OK with body: {})", body)))?; tracing::warn!("Responses stream ended prematurely but probe returned 200 OK. Body: {}", preview);
Err(AppError::ProviderError(format!("Responses stream ended (server sent 200 OK with body: {})", preview)))?;
} else { } else {
tracing::error!("OpenAI Responses Stream Error Probe ({}): {}", status, body); tracing::error!("OpenAI Responses Stream Error Probe ({}): {}", status, body);
Err(AppError::ProviderError(format!("OpenAI Responses API error ({}): {}", status, body)))?; Err(AppError::ProviderError(format!("OpenAI Responses API error ({}): {}", status, body)))?;
@@ -557,6 +931,21 @@ impl super::Provider for OpenAIProvider {
} }
} }
} }
// Final flush of content_buffer if not empty
if !content_buffer.is_empty() {
let stripped = Self::strip_internal_metadata(&content_buffer);
if !stripped.is_empty() {
yield ProviderStreamChunk {
content: stripped,
reasoning_content: None,
finish_reason: None,
tool_calls: None,
model: model.clone(),
usage: None,
};
}
}
}; };
Ok(Box::pin(stream)) Ok(Box::pin(stream))