From 9b1e0ff669dde3ade90ed3fdf254ff1721adb3f7 Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Tue, 3 Mar 2026 11:06:58 -0500 Subject: [PATCH] fix(gemini): tolerate non-candidate SSE chunks --- src/providers/gemini.rs | 82 ++++++++++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/src/providers/gemini.rs b/src/providers/gemini.rs index 357a7866..9b70cf5d 100644 --- a/src/providers/gemini.rs +++ b/src/providers/gemini.rs @@ -130,6 +130,26 @@ struct GeminiResponse { usage_metadata: Option, } +// Streaming responses from Gemini may include messages without `candidates` (e.g. promptFeedback). +// Use a more permissive struct for streaming to avoid aborting the SSE prematurely. +#[derive(Debug, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +struct GeminiStreamResponse { + #[serde(default)] + candidates: Vec, + #[serde(default)] + usage_metadata: Option, +} + +#[derive(Debug, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +struct GeminiStreamCandidate { + #[serde(default)] + content: Option, + #[serde(default)] + finish_reason: Option, +} + // ========== Provider Implementation ========== pub struct GeminiProvider { @@ -547,7 +567,7 @@ impl super::Provider for GeminiProvider { while let Some(event) = es.next().await { match event { Ok(Event::Message(msg)) => { - let gemini_response: GeminiResponse = serde_json::from_str(&msg.data) + let gemini_response: GeminiStreamResponse = serde_json::from_str(&msg.data) .map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?; // Extract usage from usageMetadata if present (reported on every/last chunk) @@ -561,29 +581,55 @@ impl super::Provider for GeminiProvider { } }); + // Some streaming events may not contain candidates (e.g. promptFeedback). + // Only emit chunks when we have candidate content or tool calls. if let Some(candidate) = gemini_response.candidates.first() { - let content = candidate - .content - .parts - .iter() - .find_map(|p| p.text.clone()) - .unwrap_or_default(); + if let Some(content_obj) = &candidate.content { + let content = content_obj + .parts + .iter() + .find_map(|p| p.text.clone()) + .unwrap_or_default(); - let tool_calls = Self::extract_tool_call_deltas(&candidate.content.parts); + let tool_calls = Self::extract_tool_call_deltas(&content_obj.parts); - // Determine finish_reason - let finish_reason = candidate.finish_reason.as_ref().map(|fr| { - match fr.as_str() { - "STOP" => "stop".to_string(), - _ => fr.to_lowercase(), + // Determine finish_reason + let finish_reason = candidate.finish_reason.as_ref().map(|fr| { + match fr.as_str() { + "STOP" => "stop".to_string(), + _ => fr.to_lowercase(), + } + }); + + // Avoid emitting completely empty chunks unless they carry usage. + if !content.is_empty() || tool_calls.is_some() || stream_usage.is_some() { + yield ProviderStreamChunk { + content, + reasoning_content: None, + finish_reason, + tool_calls, + model: model.clone(), + usage: stream_usage, + }; } - }); - + } else if stream_usage.is_some() { + // Usage-only update + yield ProviderStreamChunk { + content: String::new(), + reasoning_content: None, + finish_reason: None, + tool_calls: None, + model: model.clone(), + usage: stream_usage, + }; + } + } else if stream_usage.is_some() { + // No candidates but usage present yield ProviderStreamChunk { - content, + content: String::new(), reasoning_content: None, - finish_reason, - tool_calls, + finish_reason: None, + tool_calls: None, model: model.clone(), usage: stream_usage, };