From dd54c14ff8312bd07048b63ab974a7b5f268e1e7 Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Fri, 6 Mar 2026 20:16:43 +0000 Subject: [PATCH] feat(openai): implement Responses API streaming and proactive routing This commit adds support for the OpenAI Responses API in both streaming and non-streaming modes. It also implements proactive routing for gpt-5 and codex models and cleans up unused 'session' variable warnings across the dashboard source files. --- src/dashboard/clients.rs | 10 +-- src/dashboard/models.rs | 2 +- src/dashboard/providers.rs | 2 +- src/dashboard/system.rs | 4 +- src/dashboard/users.rs | 6 +- src/providers/mod.rs | 10 +++ src/providers/openai.rs | 125 ++++++++++++++++++++++++++++++++++++- src/server/mod.rs | 10 ++- 8 files changed, 155 insertions(+), 14 deletions(-) diff --git a/src/dashboard/clients.rs b/src/dashboard/clients.rs index 3f5e2476..7cd08d18 100644 --- a/src/dashboard/clients.rs +++ b/src/dashboard/clients.rs @@ -88,7 +88,7 @@ pub(super) async fn handle_create_client( headers: axum::http::HeaderMap, Json(payload): Json, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; @@ -199,7 +199,7 @@ pub(super) async fn handle_update_client( Path(id): Path, Json(payload): Json, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; @@ -296,7 +296,7 @@ pub(super) async fn handle_delete_client( headers: axum::http::HeaderMap, Path(id): Path, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; @@ -440,7 +440,7 @@ pub(super) async fn handle_create_client_token( Path(id): Path, Json(payload): Json, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; @@ -489,7 +489,7 @@ pub(super) async fn handle_delete_client_token( headers: axum::http::HeaderMap, Path((client_id, token_id)): Path<(String, i64)>, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; diff --git a/src/dashboard/models.rs b/src/dashboard/models.rs index 618d1a41..ed321cb9 100644 --- a/src/dashboard/models.rs +++ b/src/dashboard/models.rs @@ -156,7 +156,7 @@ pub(super) async fn handle_update_model( Path(id): Path, Json(payload): Json, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; diff --git a/src/dashboard/providers.rs b/src/dashboard/providers.rs index 744e35d7..86d9db7b 100644 --- a/src/dashboard/providers.rs +++ b/src/dashboard/providers.rs @@ -266,7 +266,7 @@ pub(super) async fn handle_update_provider( Path(name): Path, Json(payload): Json, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; diff --git a/src/dashboard/system.rs b/src/dashboard/system.rs index 9985001b..540aface 100644 --- a/src/dashboard/system.rs +++ b/src/dashboard/system.rs @@ -279,7 +279,7 @@ pub(super) async fn handle_system_backup( State(state): State, headers: axum::http::HeaderMap, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; @@ -342,7 +342,7 @@ pub(super) async fn handle_update_settings( State(state): State, headers: axum::http::HeaderMap, ) -> Json> { - let (session, _) = match super::auth::require_admin(&state, &headers).await { + let (_session, _) = match super::auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; diff --git a/src/dashboard/users.rs b/src/dashboard/users.rs index a790199a..d71a7899 100644 --- a/src/dashboard/users.rs +++ b/src/dashboard/users.rs @@ -14,7 +14,7 @@ pub(super) async fn handle_get_users( State(state): State, headers: axum::http::HeaderMap, ) -> Json> { - let (session, _) = match auth::require_admin(&state, &headers).await { + let (_session, _) = match auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; @@ -67,7 +67,7 @@ pub(super) async fn handle_create_user( headers: axum::http::HeaderMap, Json(payload): Json, ) -> Json> { - let (session, _) = match auth::require_admin(&state, &headers).await { + let (_session, _) = match auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; @@ -149,7 +149,7 @@ pub(super) async fn handle_update_user( Path(id): Path, Json(payload): Json, ) -> Json> { - let (session, _) = match auth::require_admin(&state, &headers).await { + let (_session, _) = match auth::require_admin(&state, &headers).await { Ok((session, new_token)) => (session, new_token), Err(e) => return e, }; diff --git a/src/providers/mod.rs b/src/providers/mod.rs index 03938c0d..2a68d41e 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -42,6 +42,16 @@ pub trait Provider: Send + Sync { request: UnifiedRequest, ) -> Result>, AppError>; + /// Process a streaming chat request using provider-specific "responses" style endpoint + /// Default implementation falls back to `chat_completion_stream` for providers + /// that do not implement a dedicated responses endpoint. + async fn chat_responses_stream( + &self, + request: UnifiedRequest, + ) -> Result>, AppError> { + self.chat_completion_stream(request).await + } + /// Estimate token count for a request (for cost calculation) fn estimate_tokens(&self, request: &UnifiedRequest) -> Result; diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 7a63d31e..d3c5031a 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -45,7 +45,13 @@ impl super::Provider for OpenAIProvider { } fn supports_model(&self, model: &str) -> bool { - model.starts_with("gpt-") || model.starts_with("o1-") || model.starts_with("o3-") || model.starts_with("o4-") || model == "gpt-5-nano" + model.starts_with("gpt-") || + model.starts_with("o1-") || + model.starts_with("o2-") || + model.starts_with("o3-") || + model.starts_with("o4-") || + model.starts_with("o5-") || + model.contains("gpt-5") } fn supports_multimodal(&self) -> bool { @@ -53,6 +59,12 @@ impl super::Provider for OpenAIProvider { } async fn chat_completion(&self, request: UnifiedRequest) -> Result { + // Allow proactive routing to Responses API based on heuristic + let model_lc = request.model.to_lowercase(); + if model_lc.contains("gpt-5") || model_lc.contains("codex") { + return self.chat_responses(request).await; + } + let messages_json = helpers::messages_to_openai_json(&request.messages).await?; let mut body = helpers::build_openai_body(&request, messages_json, false); @@ -383,4 +395,115 @@ impl super::Provider for OpenAIProvider { Ok(Box::pin(stream)) } + + async fn chat_responses_stream( + &self, + request: UnifiedRequest, + ) -> Result>, AppError> { + // Build a simple `input` string by concatenating message parts. + let messages_json = helpers::messages_to_openai_json(&request.messages).await?; + let mut inputs: Vec = Vec::new(); + for m in &messages_json { + let role = m["role"].as_str().unwrap_or(""); + let parts = m.get("content").and_then(|c| c.as_array()).cloned().unwrap_or_default(); + let mut text_parts = Vec::new(); + for p in parts { + if let Some(t) = p.get("text").and_then(|v| v.as_str()) { + text_parts.push(t.to_string()); + } + } + inputs.push(format!("{}: {}", role, text_parts.join(""))); + } + let input_text = inputs.join("\n"); + + let body = serde_json::json!({ + "model": request.model, + "input": input_text, + "stream": true + }); + + let url = format!("{}/responses", self.config.base_url); + let api_key = self.api_key.clone(); + let model = request.model.clone(); + + let es = reqwest_eventsource::EventSource::new( + self.client + .post(&url) + .header("Authorization", format!("Bearer {}", api_key)) + .json(&body), + ) + .map_err(|e| AppError::ProviderError(format!("Failed to create EventSource for Responses API: {}", e)))?; + + let stream = async_stream::try_stream! { + let mut es = es; + while let Some(event) = es.next().await { + match event { + Ok(reqwest_eventsource::Event::Message(msg)) => { + if msg.data == "[DONE]" { + break; + } + + let chunk: serde_json::Value = serde_json::from_str(&msg.data) + .map_err(|e| AppError::ProviderError(format!("Failed to parse Responses stream chunk: {}", e)))?; + + // Try standard OpenAI parsing first + if let Some(p_chunk) = helpers::parse_openai_stream_chunk(&chunk, &model, None) { + yield p_chunk?; + } else { + // Responses API specific parsing for streaming + // Often it follows a similar structure to the non-streaming response but in chunks + let mut content = String::new(); + + // Check for output[0].content[0].text (similar to non-stream) + if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) { + if let Some(first) = output.get(0) { + if let Some(contents) = first.get("content").and_then(|c| c.as_array()) { + for item in contents { + if let Some(text) = item.get("text").and_then(|t| t.as_str()) { + content.push_str(text); + } + } + } + } + } + + // Check for candidates[0].content.parts[0].text (Gemini-like, which OpenAI sometimes uses for v1/responses) + if content.is_empty() { + if let Some(cands) = chunk.get("candidates").and_then(|c| c.as_array()) { + if let Some(c0) = cands.get(0) { + if let Some(content_obj) = c0.get("content") { + if let Some(parts) = content_obj.get("parts").and_then(|p| p.as_array()) { + for p in parts { + if let Some(t) = p.get("text").and_then(|v| v.as_str()) { + content.push_str(t); + } + } + } + } + } + } + } + + if !content.is_empty() { + yield ProviderStreamChunk { + content, + reasoning_content: None, + finish_reason: None, + tool_calls: None, + model: model.clone(), + usage: None, + }; + } + } + } + Ok(_) => continue, + Err(e) => { + Err(AppError::ProviderError(format!("Responses stream error: {}", e)))?; + } + } + } + }; + + Ok(Box::pin(stream)) + } } diff --git a/src/server/mod.rs b/src/server/mod.rs index c7faf9e5..1726e890 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -226,7 +226,15 @@ async fn chat_completions( let prompt_tokens = crate::utils::tokens::estimate_request_tokens(&model, &unified_request); // Handle streaming response - let stream_result = provider.chat_completion_stream(unified_request).await; + // Allow provider-specific routing for streaming too + let use_responses = provider.name() == "openai" + && crate::utils::registry::model_prefers_responses(&state.model_registry, &unified_request.model); + + let stream_result = if use_responses { + provider.chat_responses_stream(unified_request).await + } else { + provider.chat_completion_stream(unified_request).await + }; match stream_result { Ok(stream) => {