From c2bad90a8f6cf1dc82a7b11be878ce10e0f713a4 Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Thu, 5 Mar 2026 18:08:20 +0000 Subject: [PATCH] fix(deepseek): add missing StreamExt import --- src/providers/deepseek.rs | 55 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/src/providers/deepseek.rs b/src/providers/deepseek.rs index 3cfb3225..213628a5 100644 --- a/src/providers/deepseek.rs +++ b/src/providers/deepseek.rs @@ -1,6 +1,7 @@ use anyhow::Result; use async_trait::async_trait; use futures::stream::BoxStream; +use futures::StreamExt; use super::helpers; use super::{ProviderResponse, ProviderStreamChunk}; @@ -130,7 +131,7 @@ impl super::Provider for DeepSeekProvider { let messages_json = helpers::messages_to_openai_json_text_only(&request.messages).await?; let mut body = helpers::build_openai_body(&request, messages_json, true); - // DeepSeek reasoning model doesn't support stream_options + // Sanitize for deepseek-reasoner or general deepseek-chat if request.model == "deepseek-reasoner" { if let Some(obj) = body.as_object_mut() { obj.remove("stream_options"); @@ -152,14 +153,62 @@ impl super::Provider for DeepSeekProvider { } } + let url = format!("{}/chat/completions", self.config.base_url); + let api_key = self.api_key.clone(); + let probe_client = self.client.clone(); + let probe_body = body.clone(); + let model = request.model.clone(); + let es = reqwest_eventsource::EventSource::new( self.client - .post(format!("{}/chat/completions", self.config.base_url)) + .post(&url) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&body), ) .map_err(|e| AppError::ProviderError(format!("Failed to create EventSource: {}", e)))?; - Ok(helpers::create_openai_stream(es, request.model, None)) + let stream = async_stream::try_stream! { + let mut es = es; + while let Some(event) = es.next().await { + match event { + Ok(reqwest_eventsource::Event::Message(msg)) => { + if msg.data == "[DONE]" { + break; + } + + let chunk: serde_json::Value = serde_json::from_str(&msg.data) + .map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?; + + if let Some(p_chunk) = helpers::parse_openai_stream_chunk(&chunk, &model, None) { + yield p_chunk?; + } + } + Ok(_) => continue, + Err(e) => { + // Attempt to probe for the actual error body + let probe_resp = probe_client + .post(&url) + .header("Authorization", format!("Bearer {}", api_key)) + .json(&probe_body) + .send() + .await; + + match probe_resp { + Ok(r) if !r.status().is_success() => { + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + tracing::error!("DeepSeek Stream Error Probe ({}): {}", status, body); + Err(AppError::ProviderError(format!("DeepSeek API error ({}): {}", status, body)))?; + } + _ => { + Err(AppError::ProviderError(format!("Stream error: {}", e)))?; + } + } + } + } + } + }; + + Ok(Box::pin(stream)) } }