Compare commits
2 Commits
d0be16d8e3
...
441270317c
| Author | SHA1 | Date | |
|---|---|---|---|
| 441270317c | |||
| 2e4318d84b |
@@ -43,6 +43,7 @@ impl OpenAIProvider {
|
|||||||
pub fn parse_tool_uses_json(text: &str) -> Vec<crate::models::ToolCall> {
|
pub fn parse_tool_uses_json(text: &str) -> Vec<crate::models::ToolCall> {
|
||||||
let mut calls = Vec::new();
|
let mut calls = Vec::new();
|
||||||
if let Some(start) = text.find("{\"tool_uses\":") {
|
if let Some(start) = text.find("{\"tool_uses\":") {
|
||||||
|
// ... (rest of method unchanged)
|
||||||
// Find the end of the JSON block by matching braces
|
// Find the end of the JSON block by matching braces
|
||||||
let sub = &text[start..];
|
let sub = &text[start..];
|
||||||
let mut brace_count = 0;
|
let mut brace_count = 0;
|
||||||
@@ -87,6 +88,27 @@ impl OpenAIProvider {
|
|||||||
}
|
}
|
||||||
calls
|
calls
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Strips internal metadata prefixes like 'to=multi_tool_use.parallel' from model responses.
|
||||||
|
pub fn strip_internal_metadata(text: &str) -> String {
|
||||||
|
let mut result = text.to_string();
|
||||||
|
|
||||||
|
// Patterns to strip
|
||||||
|
let patterns = [
|
||||||
|
"to=multi_tool_use.parallel",
|
||||||
|
"to=functions.multi_tool_use",
|
||||||
|
"ส่งเงินบาทไทยjson", // User reported Thai text preamble
|
||||||
|
];
|
||||||
|
|
||||||
|
for p in patterns {
|
||||||
|
if let Some(start) = result.find(p) {
|
||||||
|
// Remove the pattern and any whitespace around it
|
||||||
|
result.replace_range(start..start + p.len(), "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result.trim().to_string()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -404,6 +426,8 @@ impl super::Provider for OpenAIProvider {
|
|||||||
}
|
}
|
||||||
tool_calls.extend(embedded_calls);
|
tool_calls.extend(embedded_calls);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
content_text = Self::strip_internal_metadata(&content_text);
|
||||||
|
|
||||||
Ok(ProviderResponse {
|
Ok(ProviderResponse {
|
||||||
content: content_text,
|
content: content_text,
|
||||||
@@ -704,6 +728,8 @@ impl super::Provider for OpenAIProvider {
|
|||||||
|
|
||||||
let stream = async_stream::try_stream! {
|
let stream = async_stream::try_stream! {
|
||||||
let mut es = es;
|
let mut es = es;
|
||||||
|
let mut content_buffer = String::new();
|
||||||
|
|
||||||
while let Some(event) = es.next().await {
|
while let Some(event) = es.next().await {
|
||||||
match event {
|
match event {
|
||||||
Ok(reqwest_eventsource::Event::Message(msg)) => {
|
Ok(reqwest_eventsource::Event::Message(msg)) => {
|
||||||
@@ -719,7 +745,6 @@ impl super::Provider for OpenAIProvider {
|
|||||||
yield p_chunk?;
|
yield p_chunk?;
|
||||||
} else {
|
} else {
|
||||||
// Responses API specific parsing for streaming
|
// Responses API specific parsing for streaming
|
||||||
let mut content = String::new();
|
|
||||||
let mut finish_reason = None;
|
let mut finish_reason = None;
|
||||||
let mut tool_calls = None;
|
let mut tool_calls = None;
|
||||||
|
|
||||||
@@ -728,7 +753,7 @@ impl super::Provider for OpenAIProvider {
|
|||||||
match event_type {
|
match event_type {
|
||||||
"response.output_text.delta" => {
|
"response.output_text.delta" => {
|
||||||
if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
|
if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
|
||||||
content.push_str(delta);
|
content_buffer.push_str(delta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"response.item.delta" => {
|
"response.item.delta" => {
|
||||||
@@ -752,67 +777,106 @@ impl super::Provider for OpenAIProvider {
|
|||||||
}]);
|
}]);
|
||||||
} else if t == "message" {
|
} else if t == "message" {
|
||||||
if let Some(text) = delta.get("text").and_then(|v| v.as_str()) {
|
if let Some(text) = delta.get("text").and_then(|v| v.as_str()) {
|
||||||
content.push_str(text);
|
content_buffer.push_str(text);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"response.output_text.done" | "response.item.done" => {
|
"response.output_text.done" | "response.item.done" | "response.done" => {
|
||||||
finish_reason = Some("stop".to_string());
|
finish_reason = Some("stop".to_string());
|
||||||
}
|
}
|
||||||
"response.done" => {
|
_ => {}
|
||||||
finish_reason = Some("stop".to_string());
|
}
|
||||||
}
|
|
||||||
_ => {
|
// Process content_buffer to extract embedded tool calls or yield text
|
||||||
// Fallback to older nested structure if present
|
if !content_buffer.is_empty() {
|
||||||
if let Some(output) = chunk.get("output").and_then(|o| o.as_array()) {
|
// If we see the start of a tool call block, we wait for the full block
|
||||||
for out in output {
|
if content_buffer.contains("{\"tool_uses\":") {
|
||||||
if let Some(contents) = out.get("content").and_then(|c| c.as_array()) {
|
let embedded_calls = Self::parse_tool_uses_json(&content_buffer);
|
||||||
for item in contents {
|
if !embedded_calls.is_empty() {
|
||||||
if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
|
if let Some(start) = content_buffer.find("{\"tool_uses\":") {
|
||||||
content.push_str(text);
|
// Yield text before the JSON block
|
||||||
} else if let Some(delta) = item.get("delta").and_then(|d| d.get("text")).and_then(|t| t.as_str()) {
|
let preamble = content_buffer[..start].to_string();
|
||||||
content.push_str(delta);
|
let stripped_preamble = Self::strip_internal_metadata(&preamble);
|
||||||
|
if !stripped_preamble.is_empty() {
|
||||||
|
yield ProviderStreamChunk {
|
||||||
|
content: stripped_preamble,
|
||||||
|
reasoning_content: None,
|
||||||
|
finish_reason: None,
|
||||||
|
tool_calls: None,
|
||||||
|
model: model.clone(),
|
||||||
|
usage: None,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Yield the tool calls
|
||||||
|
// ... (rest of tool call yielding unchanged)
|
||||||
|
let deltas: Vec<crate::models::ToolCallDelta> = embedded_calls.into_iter().enumerate().map(|(idx, tc)| {
|
||||||
|
crate::models::ToolCallDelta {
|
||||||
|
index: idx as u32,
|
||||||
|
id: Some(tc.id),
|
||||||
|
call_type: Some("function".to_string()),
|
||||||
|
function: Some(crate::models::FunctionCallDelta {
|
||||||
|
name: Some(tc.function.name),
|
||||||
|
arguments: Some(tc.function.arguments),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
yield ProviderStreamChunk {
|
||||||
|
content: String::new(),
|
||||||
|
reasoning_content: None,
|
||||||
|
finish_reason: None,
|
||||||
|
tool_calls: Some(deltas),
|
||||||
|
model: model.clone(),
|
||||||
|
usage: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Remove the processed part from buffer
|
||||||
|
// We need to find the end index correctly
|
||||||
|
let sub = &content_buffer[start..];
|
||||||
|
let mut brace_count = 0;
|
||||||
|
let mut end_idx = 0;
|
||||||
|
for (i, c) in sub.char_indices() {
|
||||||
|
if c == '{' { brace_count += 1; }
|
||||||
|
else if c == '}' {
|
||||||
|
brace_count -= 1;
|
||||||
|
if brace_count == 0 {
|
||||||
|
end_idx = start + i + 1;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if end_idx > 0 {
|
||||||
|
content_buffer = content_buffer[end_idx..].to_string();
|
||||||
|
} else {
|
||||||
|
content_buffer.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
// If we have "{"tool_uses":" but no full block yet, we just wait (don't yield)
|
||||||
}
|
} else if content_buffer.contains("to=multi_tool_use.parallel") {
|
||||||
|
// Wait for the JSON block that usually follows
|
||||||
// GPT-5.4 parallel tool calls might be embedded in content as a JSON block
|
|
||||||
let embedded_calls = Self::parse_tool_uses_json(&content);
|
|
||||||
|
|
||||||
if !embedded_calls.is_empty() {
|
|
||||||
// Strip the JSON part from content to keep it clean
|
|
||||||
if let Some(start) = content.find("{\"tool_uses\":") {
|
|
||||||
content = content[..start].to_string();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert ToolCall to ToolCallDelta for streaming
|
|
||||||
let deltas: Vec<crate::models::ToolCallDelta> = embedded_calls.into_iter().enumerate().map(|(idx, tc)| {
|
|
||||||
crate::models::ToolCallDelta {
|
|
||||||
index: idx as u32,
|
|
||||||
id: Some(tc.id),
|
|
||||||
call_type: Some("function".to_string()),
|
|
||||||
function: Some(crate::models::FunctionCallDelta {
|
|
||||||
name: Some(tc.function.name),
|
|
||||||
arguments: Some(tc.function.arguments),
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}).collect();
|
|
||||||
|
|
||||||
if let Some(ref mut existing) = tool_calls {
|
|
||||||
existing.extend(deltas);
|
|
||||||
} else {
|
} else {
|
||||||
tool_calls = Some(deltas);
|
// Standard text, yield and clear buffer
|
||||||
|
let content = std::mem::take(&mut content_buffer);
|
||||||
|
let stripped_content = Self::strip_internal_metadata(&content);
|
||||||
|
if !stripped_content.is_empty() {
|
||||||
|
yield ProviderStreamChunk {
|
||||||
|
content: stripped_content,
|
||||||
|
reasoning_content: None,
|
||||||
|
finish_reason: None,
|
||||||
|
tool_calls: None,
|
||||||
|
model: model.clone(),
|
||||||
|
usage: None,
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !content.is_empty() || finish_reason.is_some() || tool_calls.is_some() {
|
if finish_reason.is_some() || tool_calls.is_some() {
|
||||||
yield ProviderStreamChunk {
|
yield ProviderStreamChunk {
|
||||||
content,
|
content: String::new(),
|
||||||
reasoning_content: None,
|
reasoning_content: None,
|
||||||
finish_reason,
|
finish_reason,
|
||||||
tool_calls,
|
tool_calls,
|
||||||
@@ -825,11 +889,16 @@ impl super::Provider for OpenAIProvider {
|
|||||||
Ok(_) => continue,
|
Ok(_) => continue,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Attempt to probe for the actual error body
|
// Attempt to probe for the actual error body
|
||||||
|
let mut probe_body_no_stream = probe_body.clone();
|
||||||
|
if let Some(obj) = probe_body_no_stream.as_object_mut() {
|
||||||
|
obj.remove("stream");
|
||||||
|
}
|
||||||
|
|
||||||
let probe_resp = probe_client
|
let probe_resp = probe_client
|
||||||
.post(&url)
|
.post(&url)
|
||||||
.header("Authorization", format!("Bearer {}", api_key))
|
.header("Authorization", format!("Bearer {}", api_key))
|
||||||
.header("Accept", "application/json") // Ask for JSON during probe
|
.header("Accept", "application/json")
|
||||||
.json(&probe_body)
|
.json(&probe_body_no_stream)
|
||||||
.send()
|
.send()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user