feat: add cache token tracking and cache-aware cost calculation
Track cache_read_tokens and cache_write_tokens end-to-end: parse from provider responses (OpenAI, DeepSeek, Grok, Gemini), persist to SQLite, apply cache-aware pricing from the model registry, and surface in API responses and the dashboard. - Add cache fields to ProviderResponse, StreamUsage, RequestLog structs - Parse cached_tokens (OpenAI/Grok), prompt_cache_hit/miss (DeepSeek), cachedContentTokenCount (Gemini) from provider responses - Send stream_options.include_usage for streaming; capture real usage from final SSE chunk in AggregatingStream - ALTER TABLE migration for cache_read_tokens/cache_write_tokens columns - Cache-aware cost formula using registry cache_read/cache_write rates - Update Provider trait calculate_cost signature across all providers - Add cache_read_tokens/cache_write_tokens to Usage API response - Dashboard: cache hit rate card, cache columns in pricing and usage tables, cache token aggregation in SQL queries - Remove API debug panel and verbose console logging from api.js - Bump static asset cache-bust to v5
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
use super::{ProviderResponse, ProviderStreamChunk};
|
||||
use super::{ProviderResponse, ProviderStreamChunk, StreamUsage};
|
||||
use crate::errors::AppError;
|
||||
use crate::models::{ContentPart, ToolCall, ToolCallDelta, UnifiedMessage, UnifiedRequest};
|
||||
use futures::stream::{BoxStream, StreamExt};
|
||||
@@ -156,6 +156,8 @@ pub async fn messages_to_openai_json_text_only(
|
||||
|
||||
/// Build an OpenAI-compatible request body from a UnifiedRequest and pre-converted messages.
|
||||
/// Includes tools and tool_choice when present.
|
||||
/// When streaming, adds `stream_options.include_usage: true` so providers report
|
||||
/// token counts in the final SSE chunk.
|
||||
pub fn build_openai_body(
|
||||
request: &UnifiedRequest,
|
||||
messages_json: Vec<serde_json::Value>,
|
||||
@@ -167,6 +169,10 @@ pub fn build_openai_body(
|
||||
"stream": stream,
|
||||
});
|
||||
|
||||
if stream {
|
||||
body["stream_options"] = serde_json::json!({ "include_usage": true });
|
||||
}
|
||||
|
||||
if let Some(temp) = request.temperature {
|
||||
body["temperature"] = serde_json::json!(temp);
|
||||
}
|
||||
@@ -185,6 +191,9 @@ pub fn build_openai_body(
|
||||
|
||||
/// Parse an OpenAI-compatible chat completion response JSON into a ProviderResponse.
|
||||
/// Extracts tool_calls from the message when present.
|
||||
/// Extracts cache token counts from:
|
||||
/// - OpenAI/Grok: `usage.prompt_tokens_details.cached_tokens`
|
||||
/// - DeepSeek: `usage.prompt_cache_hit_tokens` / `usage.prompt_cache_miss_tokens`
|
||||
pub fn parse_openai_response(resp_json: &Value, model: String) -> Result<ProviderResponse, AppError> {
|
||||
let choice = resp_json["choices"]
|
||||
.get(0)
|
||||
@@ -204,6 +213,17 @@ pub fn parse_openai_response(resp_json: &Value, model: String) -> Result<Provide
|
||||
let completion_tokens = usage["completion_tokens"].as_u64().unwrap_or(0) as u32;
|
||||
let total_tokens = usage["total_tokens"].as_u64().unwrap_or(0) as u32;
|
||||
|
||||
// Extract cache tokens — try OpenAI/Grok format first, then DeepSeek format
|
||||
let cache_read_tokens = usage["prompt_tokens_details"]["cached_tokens"]
|
||||
.as_u64()
|
||||
// DeepSeek uses a different field name
|
||||
.or_else(|| usage["prompt_cache_hit_tokens"].as_u64())
|
||||
.unwrap_or(0) as u32;
|
||||
|
||||
// DeepSeek reports cache_write as prompt_cache_miss_tokens (tokens written to cache for future use).
|
||||
// OpenAI doesn't report cache_write in this location, but may in the future.
|
||||
let cache_write_tokens = usage["prompt_cache_miss_tokens"].as_u64().unwrap_or(0) as u32;
|
||||
|
||||
Ok(ProviderResponse {
|
||||
content,
|
||||
reasoning_content,
|
||||
@@ -211,6 +231,8 @@ pub fn parse_openai_response(resp_json: &Value, model: String) -> Result<Provide
|
||||
prompt_tokens,
|
||||
completion_tokens,
|
||||
total_tokens,
|
||||
cache_read_tokens,
|
||||
cache_write_tokens,
|
||||
model,
|
||||
})
|
||||
}
|
||||
@@ -220,6 +242,9 @@ pub fn parse_openai_response(resp_json: &Value, model: String) -> Result<Provide
|
||||
/// The optional `reasoning_field` allows overriding the field name for
|
||||
/// reasoning content (e.g., "thought" for Ollama).
|
||||
/// Parses tool_calls deltas from streaming chunks when present.
|
||||
/// When `stream_options.include_usage: true` was sent, the provider sends a
|
||||
/// final chunk with `usage` data — this is parsed into `StreamUsage` and
|
||||
/// attached to the yielded `ProviderStreamChunk`.
|
||||
pub fn create_openai_stream(
|
||||
es: reqwest_eventsource::EventSource,
|
||||
model: String,
|
||||
@@ -239,6 +264,34 @@ pub fn create_openai_stream(
|
||||
let chunk: Value = serde_json::from_str(&msg.data)
|
||||
.map_err(|e| AppError::ProviderError(format!("Failed to parse stream chunk: {}", e)))?;
|
||||
|
||||
// Parse usage from the final chunk (sent when stream_options.include_usage is true).
|
||||
// This chunk may have an empty `choices` array.
|
||||
let stream_usage = chunk.get("usage").and_then(|u| {
|
||||
if u.is_null() {
|
||||
return None;
|
||||
}
|
||||
let prompt_tokens = u["prompt_tokens"].as_u64().unwrap_or(0) as u32;
|
||||
let completion_tokens = u["completion_tokens"].as_u64().unwrap_or(0) as u32;
|
||||
let total_tokens = u["total_tokens"].as_u64().unwrap_or(0) as u32;
|
||||
|
||||
let cache_read_tokens = u["prompt_tokens_details"]["cached_tokens"]
|
||||
.as_u64()
|
||||
.or_else(|| u["prompt_cache_hit_tokens"].as_u64())
|
||||
.unwrap_or(0) as u32;
|
||||
|
||||
let cache_write_tokens = u["prompt_cache_miss_tokens"]
|
||||
.as_u64()
|
||||
.unwrap_or(0) as u32;
|
||||
|
||||
Some(StreamUsage {
|
||||
prompt_tokens,
|
||||
completion_tokens,
|
||||
total_tokens,
|
||||
cache_read_tokens,
|
||||
cache_write_tokens,
|
||||
})
|
||||
});
|
||||
|
||||
if let Some(choice) = chunk["choices"].get(0) {
|
||||
let delta = &choice["delta"];
|
||||
let content = delta["content"].as_str().unwrap_or_default().to_string();
|
||||
@@ -259,6 +312,18 @@ pub fn create_openai_stream(
|
||||
finish_reason,
|
||||
tool_calls,
|
||||
model: model.clone(),
|
||||
usage: stream_usage,
|
||||
};
|
||||
} else if stream_usage.is_some() {
|
||||
// Final usage-only chunk (empty choices array) — yield it so
|
||||
// AggregatingStream can capture the real token counts.
|
||||
yield ProviderStreamChunk {
|
||||
content: String::new(),
|
||||
reasoning_content: None,
|
||||
finish_reason: None,
|
||||
tool_calls: None,
|
||||
model: model.clone(),
|
||||
usage: stream_usage,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -274,10 +339,20 @@ pub fn create_openai_stream(
|
||||
}
|
||||
|
||||
/// Calculate cost using the model registry first, then falling back to provider pricing config.
|
||||
///
|
||||
/// When the registry provides `cache_read` / `cache_write` rates, the formula is:
|
||||
/// (prompt_tokens - cache_read_tokens) * input_rate
|
||||
/// + cache_read_tokens * cache_read_rate
|
||||
/// + cache_write_tokens * cache_write_rate (if applicable)
|
||||
/// + completion_tokens * output_rate
|
||||
///
|
||||
/// All rates are per-token (the registry stores per-million-token rates).
|
||||
pub fn calculate_cost_with_registry(
|
||||
model: &str,
|
||||
prompt_tokens: u32,
|
||||
completion_tokens: u32,
|
||||
cache_read_tokens: u32,
|
||||
cache_write_tokens: u32,
|
||||
registry: &crate::models::registry::ModelRegistry,
|
||||
pricing: &[crate::config::ModelPricing],
|
||||
default_prompt_rate: f64,
|
||||
@@ -286,10 +361,25 @@ pub fn calculate_cost_with_registry(
|
||||
if let Some(metadata) = registry.find_model(model)
|
||||
&& let Some(cost) = &metadata.cost
|
||||
{
|
||||
return (prompt_tokens as f64 * cost.input / 1_000_000.0)
|
||||
let non_cached_prompt = prompt_tokens.saturating_sub(cache_read_tokens);
|
||||
let mut total = (non_cached_prompt as f64 * cost.input / 1_000_000.0)
|
||||
+ (completion_tokens as f64 * cost.output / 1_000_000.0);
|
||||
|
||||
if let Some(cache_read_rate) = cost.cache_read {
|
||||
total += cache_read_tokens as f64 * cache_read_rate / 1_000_000.0;
|
||||
} else {
|
||||
// No cache_read rate — charge cached tokens at full input rate
|
||||
total += cache_read_tokens as f64 * cost.input / 1_000_000.0;
|
||||
}
|
||||
|
||||
if let Some(cache_write_rate) = cost.cache_write {
|
||||
total += cache_write_tokens as f64 * cache_write_rate / 1_000_000.0;
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
// Fallback: no registry entry — use provider pricing config (no cache awareness)
|
||||
let (prompt_rate, completion_rate) = pricing
|
||||
.iter()
|
||||
.find(|p| model.contains(&p.model))
|
||||
|
||||
Reference in New Issue
Block a user