feat(openai): implement Responses API streaming and proactive routing

This commit adds support for the OpenAI Responses API in both streaming and non-streaming modes. It also implements proactive routing for gpt-5 and codex models and cleans up unused 'session' variable warnings across the dashboard source files.
This commit is contained in:
2026-03-06 20:16:43 +00:00
parent 633b69a07b
commit dd54c14ff8
8 changed files with 155 additions and 14 deletions

View File

@@ -42,6 +42,16 @@ pub trait Provider: Send + Sync {
request: UnifiedRequest,
) -> Result<BoxStream<'static, Result<ProviderStreamChunk, AppError>>, AppError>;
/// Process a streaming chat request using provider-specific "responses" style endpoint
/// Default implementation falls back to `chat_completion_stream` for providers
/// that do not implement a dedicated responses endpoint.
async fn chat_responses_stream(
&self,
request: UnifiedRequest,
) -> Result<BoxStream<'static, Result<ProviderStreamChunk, AppError>>, AppError> {
self.chat_completion_stream(request).await
}
/// Estimate token count for a request (for cost calculation)
fn estimate_tokens(&self, request: &UnifiedRequest) -> Result<u32>;

View File

@@ -45,7 +45,13 @@ impl super::Provider for OpenAIProvider {
}
fn supports_model(&self, model: &str) -> bool {
model.starts_with("gpt-") || model.starts_with("o1-") || model.starts_with("o3-") || model.starts_with("o4-") || model == "gpt-5-nano"
model.starts_with("gpt-") ||
model.starts_with("o1-") ||
model.starts_with("o2-") ||
model.starts_with("o3-") ||
model.starts_with("o4-") ||
model.starts_with("o5-") ||
model.contains("gpt-5")
}
fn supports_multimodal(&self) -> bool {
@@ -53,6 +59,12 @@ impl super::Provider for OpenAIProvider {
}
async fn chat_completion(&self, request: UnifiedRequest) -> Result<ProviderResponse, AppError> {
// Allow proactive routing to Responses API based on heuristic
let model_lc = request.model.to_lowercase();
if model_lc.contains("gpt-5") || model_lc.contains("codex") {
return self.chat_responses(request).await;
}
let messages_json = helpers::messages_to_openai_json(&request.messages).await?;
let mut body = helpers::build_openai_body(&request, messages_json, false);
@@ -383,4 +395,115 @@ impl super::Provider for OpenAIProvider {
Ok(Box::pin(stream))
}
async fn chat_responses_stream(
&self,
request: UnifiedRequest,
) -> Result<BoxStream<'static, Result<ProviderStreamChunk, AppError>>, AppError> {
// Build a simple `input` string by concatenating message parts.
let messages_json = helpers::messages_to_openai_json(&request.messages).await?;
let mut inputs: Vec<String> = Vec::new();
for m in &messages_json {
let role = m["role"].as_str().unwrap_or("");
let parts = m.get("content").and_then(|c| c.as_array()).cloned().unwrap_or_default();
let mut text_parts = Vec::new();
for p in parts {
if let Some(t) = p.get("text").and_then(|v| v.as_str()) {
text_parts.push(t.to_string());
}
}
inputs.push(format!("{}: {}", role, text_parts.join("")));
}
let input_text = inputs.join("\n");
let body = serde_json::json!({
"model": request.model,
"input": input_text,
"stream": true
});
let url = format!("{}/responses", self.config.base_url);
let api_key = self.api_key.clone();
let model = request.model.clone();
let es = reqwest_eventsource::EventSource::new(
self.client
.post(&url)
.header("Authorization", format!("Bearer {}", api_key))
.json(&body),
)
.map_err(|e| AppError::ProviderError(format!("Failed to create EventSource for Responses API: {}", e)))?;
let stream = async_stream::try_stream! {
let mut es = es;
while let Some(event) = es.next().await {
match event {
Ok(reqwest_eventsource::Event::Message(msg)) => {
if msg.data == "[DONE]" {
break;
}
let chunk: serde_json::Value = serde_json::from_str(&msg.data)
.map_err(|e| AppError::ProviderError(format!("Failed to parse Responses stream chunk: {}", e)))?;
// Try standard OpenAI parsing first
if let Some(p_chunk) = helpers::parse_openai_stream_chunk(&chunk, &model, None) {
yield p_chunk?;
} else {
// Responses API specific parsing for streaming
// Often it follows a similar structure to the non-streaming response but in chunks
let mut content = String::new();
// Check for output[0].content[0].text (similar to non-stream)
if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) {
if let Some(first) = output.get(0) {
if let Some(contents) = first.get("content").and_then(|c| c.as_array()) {
for item in contents {
if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
content.push_str(text);
}
}
}
}
}
// Check for candidates[0].content.parts[0].text (Gemini-like, which OpenAI sometimes uses for v1/responses)
if content.is_empty() {
if let Some(cands) = chunk.get("candidates").and_then(|c| c.as_array()) {
if let Some(c0) = cands.get(0) {
if let Some(content_obj) = c0.get("content") {
if let Some(parts) = content_obj.get("parts").and_then(|p| p.as_array()) {
for p in parts {
if let Some(t) = p.get("text").and_then(|v| v.as_str()) {
content.push_str(t);
}
}
}
}
}
}
}
if !content.is_empty() {
yield ProviderStreamChunk {
content,
reasoning_content: None,
finish_reason: None,
tool_calls: None,
model: model.clone(),
usage: None,
};
}
}
}
Ok(_) => continue,
Err(e) => {
Err(AppError::ProviderError(format!("Responses stream error: {}", e)))?;
}
}
}
};
Ok(Box::pin(stream))
}
}