fix(gemini): tolerate non-candidate SSE chunks
Some checks failed
CI / Check (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Formatting (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release Build (push) Has been cancelled

This commit is contained in:
2026-03-03 11:06:58 -05:00
parent a7c692dae8
commit 9b1e0ff669

View File

@@ -130,6 +130,26 @@ struct GeminiResponse {
usage_metadata: Option<GeminiUsageMetadata>, usage_metadata: Option<GeminiUsageMetadata>,
} }
// Streaming responses from Gemini may include messages without `candidates` (e.g. promptFeedback).
// Use a more permissive struct for streaming to avoid aborting the SSE prematurely.
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct GeminiStreamResponse {
#[serde(default)]
candidates: Vec<GeminiStreamCandidate>,
#[serde(default)]
usage_metadata: Option<GeminiUsageMetadata>,
}
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct GeminiStreamCandidate {
#[serde(default)]
content: Option<GeminiContent>,
#[serde(default)]
finish_reason: Option<String>,
}
// ========== Provider Implementation ========== // ========== Provider Implementation ==========
pub struct GeminiProvider { pub struct GeminiProvider {
@@ -547,7 +567,7 @@ impl super::Provider for GeminiProvider {
while let Some(event) = es.next().await { while let Some(event) = es.next().await {
match event { match event {
Ok(Event::Message(msg)) => { Ok(Event::Message(msg)) => {
let gemini_response: GeminiResponse = serde_json::from_str(&msg.data) let gemini_response: GeminiStreamResponse = serde_json::from_str(&msg.data)
.map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?; .map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?;
// Extract usage from usageMetadata if present (reported on every/last chunk) // Extract usage from usageMetadata if present (reported on every/last chunk)
@@ -561,29 +581,55 @@ impl super::Provider for GeminiProvider {
} }
}); });
// Some streaming events may not contain candidates (e.g. promptFeedback).
// Only emit chunks when we have candidate content or tool calls.
if let Some(candidate) = gemini_response.candidates.first() { if let Some(candidate) = gemini_response.candidates.first() {
let content = candidate if let Some(content_obj) = &candidate.content {
.content let content = content_obj
.parts .parts
.iter() .iter()
.find_map(|p| p.text.clone()) .find_map(|p| p.text.clone())
.unwrap_or_default(); .unwrap_or_default();
let tool_calls = Self::extract_tool_call_deltas(&candidate.content.parts); let tool_calls = Self::extract_tool_call_deltas(&content_obj.parts);
// Determine finish_reason // Determine finish_reason
let finish_reason = candidate.finish_reason.as_ref().map(|fr| { let finish_reason = candidate.finish_reason.as_ref().map(|fr| {
match fr.as_str() { match fr.as_str() {
"STOP" => "stop".to_string(), "STOP" => "stop".to_string(),
_ => fr.to_lowercase(), _ => fr.to_lowercase(),
}
});
// Avoid emitting completely empty chunks unless they carry usage.
if !content.is_empty() || tool_calls.is_some() || stream_usage.is_some() {
yield ProviderStreamChunk {
content,
reasoning_content: None,
finish_reason,
tool_calls,
model: model.clone(),
usage: stream_usage,
};
} }
}); } else if stream_usage.is_some() {
// Usage-only update
yield ProviderStreamChunk {
content: String::new(),
reasoning_content: None,
finish_reason: None,
tool_calls: None,
model: model.clone(),
usage: stream_usage,
};
}
} else if stream_usage.is_some() {
// No candidates but usage present
yield ProviderStreamChunk { yield ProviderStreamChunk {
content, content: String::new(),
reasoning_content: None, reasoning_content: None,
finish_reason, finish_reason: None,
tool_calls, tool_calls: None,
model: model.clone(), model: model.clone(),
usage: stream_usage, usage: stream_usage,
}; };