Compare commits

...

2 Commits

Author SHA1 Message Date
79dc8fe409 fix(openai): correctly parse Responses API tool call events
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled
- The Responses API does not use 'response.item.delta' for tool calls.
- It uses 'response.output_item.added' to initialize the function call.
- It uses 'response.function_call_arguments.delta' for the payload stream.
- Updated the streaming parser to catch these events and correctly yield ToolCallDelta objects.
- This restores proper streaming of tool calls back to the client.
2026-03-18 16:13:13 +00:00
24a898c9a7 fix(openai): gracefully handle stream endings
- The Responses API ends streams without a final '[DONE]' message.
- This causes reqwest_eventsource to return Error::StreamEnded.
- Previously, this was treated as a premature termination, triggering an error probe.
- We now explicitly match and break on Err(StreamEnded) for normal completion.
2026-03-18 15:39:18 +00:00

View File

@@ -524,6 +524,7 @@ impl super::Provider for OpenAIProvider {
}
}
Ok(_) => continue,
Err(reqwest_eventsource::Error::StreamEnded) => break,
Err(e) => {
// Attempt to probe for the actual error body
let probe_resp = probe_client
@@ -756,15 +757,11 @@ impl super::Provider for OpenAIProvider {
content_buffer.push_str(delta);
}
}
"response.item.delta" => {
if let Some(delta) = chunk.get("delta") {
let t = delta.get("type").and_then(|v| v.as_str()).unwrap_or("");
if t == "function_call" {
let call_id = delta.get("call_id")
.or_else(|| chunk.get("item_id"))
.and_then(|v| v.as_str());
let name = delta.get("name").and_then(|v| v.as_str());
let arguments = delta.get("arguments").and_then(|v| v.as_str());
"response.output_item.added" => {
if let Some(item) = chunk.get("item") {
if item.get("type").and_then(|v| v.as_str()) == Some("function_call") {
let call_id = item.get("call_id").and_then(|v| v.as_str());
let name = item.get("name").and_then(|v| v.as_str());
tool_calls = Some(vec![crate::models::ToolCallDelta {
index: chunk.get("output_index").and_then(|v| v.as_u64()).unwrap_or(0) as u32,
@@ -772,17 +769,26 @@ impl super::Provider for OpenAIProvider {
call_type: Some("function".to_string()),
function: Some(crate::models::FunctionCallDelta {
name: name.map(|s| s.to_string()),
arguments: arguments.map(|s| s.to_string()),
arguments: Some("".to_string()), // Start with empty arguments
}),
}]);
} else if t == "message" {
if let Some(text) = delta.get("text").and_then(|v| v.as_str()) {
content_buffer.push_str(text);
}
}
}
"response.function_call_arguments.delta" => {
if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
tool_calls = Some(vec![crate::models::ToolCallDelta {
index: chunk.get("output_index").and_then(|v| v.as_u64()).unwrap_or(0) as u32,
id: None,
call_type: None,
function: Some(crate::models::FunctionCallDelta {
name: None,
arguments: Some(delta.to_string()),
}),
}]);
}
"response.output_text.done" | "response.item.done" | "response.done" => {
}
"response.output_text.done" | "response.item.done" | "response.completed" => {
finish_reason = Some("stop".to_string());
}
_ => {}
@@ -887,13 +893,20 @@ impl super::Provider for OpenAIProvider {
}
}
Ok(_) => continue,
Err(reqwest_eventsource::Error::StreamEnded) => break,
Err(e) => {
tracing::error!("Responses stream encountered an error: {}", e);
// Attempt to probe for the actual error body
let mut probe_body_no_stream = probe_body.clone();
if let Some(obj) = probe_body_no_stream.as_object_mut() {
obj.remove("stream");
}
let probe_resp = probe_client
.post(&url)
.header("Authorization", format!("Bearer {}", api_key))
.header("Accept", "application/json")
.json(&probe_body)
.json(&probe_body_no_stream)
.send()
.await;