From 649371154fcd4097ad8da927ed329fdec26595d3 Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Tue, 17 Mar 2026 19:00:40 +0000 Subject: [PATCH] fix(openai): implement parsing for real-time Responses API streaming format --- src/providers/openai.rs | 55 +++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 305f0114..7bc5d86c 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -474,31 +474,38 @@ impl super::Provider for OpenAIProvider { } else { // Responses API specific parsing for streaming let mut content = String::new(); + let mut finish_reason = None; - // Check for output[0].content[0].text (similar to non-stream) - if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) { - for out in output { - if let Some(contents) = out.get("content").and_then(|c| c.as_array()) { - for item in contents { - if let Some(text) = item.get("text").and_then(|t| t.as_str()) { - content.push_str(text); - } else if let Some(delta) = item.get("delta").and_then(|d| d.get("text")).and_then(|t| t.as_str()) { - content.push_str(delta); - } - } + let event_type = chunk.get("type").and_then(|v| v.as_str()).unwrap_or(""); + + match event_type { + "response.output_text.delta" => { + if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) { + content.push_str(delta); } } - } - - // Check for candidates[0].content.parts[0].text - if content.is_empty() { - if let Some(cands) = chunk.get("candidates").and_then(|c| c.as_array()) { - for c in cands { - if let Some(content_obj) = c.get("content") { - if let Some(parts) = content_obj.get("parts").and_then(|p| p.as_array()) { - for p in parts { - if let Some(t) = p.get("text").and_then(|v| v.as_str()) { - content.push_str(t); + "response.output_text.done" => { + if let Some(text) = chunk.get("text").and_then(|v| v.as_str()) { + // Some implementations send the full text at the end + // We usually prefer deltas, but if we haven't seen them, this is the fallback. + // However, if we're already yielding deltas, we might not want this. + // For now, let's just use it as a signal that we're done. + finish_reason = Some("stop".to_string()); + } + } + "response.done" => { + finish_reason = Some("stop".to_string()); + } + _ => { + // Fallback to older nested structure if present + if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) { + for out in output { + if let Some(contents) = out.get("content").and_then(|c| c.as_array()) { + for item in contents { + if let Some(text) = item.get("text").and_then(|t| t.as_str()) { + content.push_str(text); + } else if let Some(delta) = item.get("delta").and_then(|d| d.get("text")).and_then(|t| t.as_str()) { + content.push_str(delta); } } } @@ -507,11 +514,11 @@ impl super::Provider for OpenAIProvider { } } - if !content.is_empty() { + if !content.is_empty() || finish_reason.is_some() { yield ProviderStreamChunk { content, reasoning_content: None, - finish_reason: None, + finish_reason, tool_calls: None, model: model.clone(), usage: None,