use axum::{ extract::{Query, State}, response::Json, }; use chrono; use serde::Deserialize; use serde_json; use sqlx::Row; use tracing::warn; use super::{ApiResponse, DashboardState}; /// Query parameters for time-based filtering on usage endpoints. #[derive(Debug, Deserialize, Default)] pub(super) struct UsagePeriodFilter { /// Preset period: "today", "24h", "7d", "30d", "all" (default: "all") pub period: Option, /// Custom range start (ISO 8601, e.g. "2025-06-01T00:00:00Z") pub from: Option, /// Custom range end (ISO 8601) pub to: Option, } impl UsagePeriodFilter { /// Returns `(sql_fragment, bind_values)` for a WHERE clause. /// The fragment is either empty (no filter) or " AND timestamp >= ? [AND timestamp <= ?]". fn to_sql(&self) -> (String, Vec) { let period = self.period.as_deref().unwrap_or("all"); if period == "custom" { let mut clause = String::new(); let mut binds = Vec::new(); if let Some(ref from) = self.from { clause.push_str(" AND timestamp >= ?"); binds.push(from.clone()); } if let Some(ref to) = self.to { clause.push_str(" AND timestamp <= ?"); binds.push(to.clone()); } return (clause, binds); } let now = chrono::Utc::now(); let cutoff = match period { "today" => { // Start of today (UTC) let today = now.format("%Y-%m-%dT00:00:00Z").to_string(); Some(today) } "24h" => Some((now - chrono::Duration::hours(24)).to_rfc3339()), "7d" => Some((now - chrono::Duration::days(7)).to_rfc3339()), "30d" => Some((now - chrono::Duration::days(30)).to_rfc3339()), _ => None, // "all" or unrecognized }; match cutoff { Some(ts) => (" AND timestamp >= ?".to_string(), vec![ts]), None => (String::new(), vec![]), } } /// Determine the time-series granularity label for grouping. fn granularity(&self) -> &'static str { match self.period.as_deref().unwrap_or("all") { "today" | "24h" => "hour", _ => "day", } } } pub(super) async fn handle_usage_summary( State(state): State, Query(filter): Query, ) -> Json> { let pool = &state.app_state.db_pool; let (period_clause, period_binds) = filter.to_sql(); // Total stats (filtered by period) let period_sql = format!( r#" SELECT COUNT(*) as total_requests, COALESCE(SUM(total_tokens), 0) as total_tokens, COALESCE(SUM(cost), 0.0) as total_cost, COUNT(DISTINCT client_id) as active_clients, COALESCE(SUM(cache_read_tokens), 0) as total_cache_read, COALESCE(SUM(cache_write_tokens), 0) as total_cache_write FROM llm_requests WHERE 1=1 {} "#, period_clause ); let mut q = sqlx::query(&period_sql); for b in &period_binds { q = q.bind(b); } let total_stats = q.fetch_one(pool); // Today's stats let today = chrono::Utc::now().format("%Y-%m-%d").to_string(); let today_stats = sqlx::query( r#" SELECT COUNT(*) as today_requests, COALESCE(SUM(total_tokens), 0) as today_tokens, COALESCE(SUM(cost), 0.0) as today_cost FROM llm_requests WHERE strftime('%Y-%m-%d', timestamp) = ? "#, ) .bind(today) .fetch_one(pool); // Error stats let error_stats = sqlx::query( r#" SELECT COUNT(*) as total, SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors FROM llm_requests "#, ) .fetch_one(pool); // Average response time let avg_response = sqlx::query( r#" SELECT COALESCE(AVG(duration_ms), 0.0) as avg_duration FROM llm_requests WHERE status = 'success' "#, ) .fetch_one(pool); match tokio::join!(total_stats, today_stats, error_stats, avg_response) { (Ok(t), Ok(d), Ok(e), Ok(a)) => { let total_requests: i64 = t.get("total_requests"); let total_tokens: i64 = t.get("total_tokens"); let total_cost: f64 = t.get("total_cost"); let active_clients: i64 = t.get("active_clients"); let total_cache_read: i64 = t.get("total_cache_read"); let total_cache_write: i64 = t.get("total_cache_write"); let today_requests: i64 = d.get("today_requests"); let today_cost: f64 = d.get("today_cost"); let total_count: i64 = e.get("total"); let error_count: i64 = e.get("errors"); let error_rate = if total_count > 0 { (error_count as f64 / total_count as f64) * 100.0 } else { 0.0 }; let avg_response_time: f64 = a.get("avg_duration"); Json(ApiResponse::success(serde_json::json!({ "total_requests": total_requests, "total_tokens": total_tokens, "total_cost": total_cost, "active_clients": active_clients, "today_requests": today_requests, "today_cost": today_cost, "error_rate": error_rate, "avg_response_time": avg_response_time, "total_cache_read_tokens": total_cache_read, "total_cache_write_tokens": total_cache_write, }))) } _ => Json(ApiResponse::error("Failed to fetch usage statistics".to_string())), } } pub(super) async fn handle_time_series( State(state): State, Query(filter): Query, ) -> Json> { let pool = &state.app_state.db_pool; let (period_clause, period_binds) = filter.to_sql(); let granularity = filter.granularity(); // Determine the strftime format and default lookback let (strftime_fmt, _label_key, default_lookback) = match granularity { "hour" => ("%H:00", "hour", chrono::Duration::hours(24)), _ => ("%Y-%m-%d", "day", chrono::Duration::days(30)), }; // If no period filter, apply a sensible default lookback let (clause, binds) = if period_clause.is_empty() { let cutoff = (chrono::Utc::now() - default_lookback).to_rfc3339(); (" AND timestamp >= ?".to_string(), vec![cutoff]) } else { (period_clause, period_binds) }; let sql = format!( r#" SELECT strftime('{strftime_fmt}', timestamp) as bucket, COUNT(*) as requests, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0.0) as cost FROM llm_requests WHERE 1=1 {clause} GROUP BY bucket ORDER BY bucket "#, ); let mut q = sqlx::query(&sql); for b in &binds { q = q.bind(b); } let result = q.fetch_all(pool).await; match result { Ok(rows) => { let mut series = Vec::new(); for row in rows { let bucket: String = row.get("bucket"); let requests: i64 = row.get("requests"); let tokens: i64 = row.get("tokens"); let cost: f64 = row.get("cost"); series.push(serde_json::json!({ "time": bucket, "requests": requests, "tokens": tokens, "cost": cost, })); } Json(ApiResponse::success(serde_json::json!({ "series": series, "period": filter.period.as_deref().unwrap_or("all"), "granularity": granularity, }))) } Err(e) => { warn!("Failed to fetch time series data: {}", e); Json(ApiResponse::error("Failed to fetch time series data".to_string())) } } } pub(super) async fn handle_clients_usage( State(state): State, Query(filter): Query, ) -> Json> { let pool = &state.app_state.db_pool; let (period_clause, period_binds) = filter.to_sql(); let sql = format!( r#" SELECT client_id, COUNT(*) as requests, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0.0) as cost, MAX(timestamp) as last_request FROM llm_requests WHERE 1=1 {} GROUP BY client_id ORDER BY requests DESC "#, period_clause ); let mut q = sqlx::query(&sql); for b in &period_binds { q = q.bind(b); } let result = q.fetch_all(pool).await; match result { Ok(rows) => { let mut client_usage = Vec::new(); for row in rows { let client_id: String = row.get("client_id"); let requests: i64 = row.get("requests"); let tokens: i64 = row.get("tokens"); let cost: f64 = row.get("cost"); let last_request: Option> = row.get("last_request"); client_usage.push(serde_json::json!({ "client_id": client_id, "client_name": client_id, "requests": requests, "tokens": tokens, "cost": cost, "last_request": last_request, })); } Json(ApiResponse::success(serde_json::json!(client_usage))) } Err(e) => { warn!("Failed to fetch client usage data: {}", e); Json(ApiResponse::error("Failed to fetch client usage data".to_string())) } } } pub(super) async fn handle_providers_usage( State(state): State, Query(filter): Query, ) -> Json> { let pool = &state.app_state.db_pool; let (period_clause, period_binds) = filter.to_sql(); let sql = format!( r#" SELECT provider, COUNT(*) as requests, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0.0) as cost, COALESCE(SUM(cache_read_tokens), 0) as cache_read, COALESCE(SUM(cache_write_tokens), 0) as cache_write FROM llm_requests WHERE 1=1 {} GROUP BY provider ORDER BY requests DESC "#, period_clause ); let mut q = sqlx::query(&sql); for b in &period_binds { q = q.bind(b); } let result = q.fetch_all(pool).await; match result { Ok(rows) => { let mut provider_usage = Vec::new(); for row in rows { let provider: String = row.get("provider"); let requests: i64 = row.get("requests"); let tokens: i64 = row.get("tokens"); let cost: f64 = row.get("cost"); let cache_read: i64 = row.get("cache_read"); let cache_write: i64 = row.get("cache_write"); provider_usage.push(serde_json::json!({ "provider": provider, "requests": requests, "tokens": tokens, "cost": cost, "cache_read_tokens": cache_read, "cache_write_tokens": cache_write, })); } Json(ApiResponse::success(serde_json::json!(provider_usage))) } Err(e) => { warn!("Failed to fetch provider usage data: {}", e); Json(ApiResponse::error("Failed to fetch provider usage data".to_string())) } } } pub(super) async fn handle_detailed_usage( State(state): State, Query(filter): Query, ) -> Json> { let pool = &state.app_state.db_pool; let (period_clause, period_binds) = filter.to_sql(); let sql = format!( r#" SELECT strftime('%Y-%m-%d', timestamp) as date, client_id, provider, model, COUNT(*) as requests, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0.0) as cost, COALESCE(SUM(cache_read_tokens), 0) as cache_read, COALESCE(SUM(cache_write_tokens), 0) as cache_write FROM llm_requests WHERE 1=1 {} GROUP BY date, client_id, provider, model ORDER BY date DESC LIMIT 200 "#, period_clause ); let mut q = sqlx::query(&sql); for b in &period_binds { q = q.bind(b); } let result = q.fetch_all(pool).await; match result { Ok(rows) => { let usage: Vec = rows .into_iter() .map(|row| { serde_json::json!({ "date": row.get::("date"), "client": row.get::("client_id"), "provider": row.get::("provider"), "model": row.get::("model"), "requests": row.get::("requests"), "tokens": row.get::("tokens"), "cost": row.get::("cost"), "cache_read_tokens": row.get::("cache_read"), "cache_write_tokens": row.get::("cache_write"), }) }) .collect(); Json(ApiResponse::success(serde_json::json!(usage))) } Err(e) => { warn!("Failed to fetch detailed usage: {}", e); Json(ApiResponse::error("Failed to fetch detailed usage".to_string())) } } } pub(super) async fn handle_analytics_breakdown( State(state): State, Query(filter): Query, ) -> Json> { let pool = &state.app_state.db_pool; let (period_clause, period_binds) = filter.to_sql(); // Model breakdown let model_sql = format!( "SELECT model as label, COUNT(*) as value FROM llm_requests WHERE 1=1 {} GROUP BY model ORDER BY value DESC", period_clause ); let mut mq = sqlx::query(&model_sql); for b in &period_binds { mq = mq.bind(b); } let models = mq.fetch_all(pool); // Client breakdown let client_sql = format!( "SELECT client_id as label, COUNT(*) as value FROM llm_requests WHERE 1=1 {} GROUP BY client_id ORDER BY value DESC", period_clause ); let mut cq = sqlx::query(&client_sql); for b in &period_binds { cq = cq.bind(b); } let clients = cq.fetch_all(pool); match tokio::join!(models, clients) { (Ok(m_rows), Ok(c_rows)) => { let model_breakdown: Vec = m_rows .into_iter() .map(|r| serde_json::json!({ "label": r.get::("label"), "value": r.get::("value") })) .collect(); let client_breakdown: Vec = c_rows .into_iter() .map(|r| serde_json::json!({ "label": r.get::("label"), "value": r.get::("value") })) .collect(); Json(ApiResponse::success(serde_json::json!({ "models": model_breakdown, "clients": client_breakdown }))) } _ => Json(ApiResponse::error("Failed to fetch analytics breakdown".to_string())), } }