chore: initial clean commit

This commit is contained in:
2026-02-26 13:56:21 -05:00
commit 1755075657
53 changed files with 18068 additions and 0 deletions

642
src/dashboard/mod.rs Normal file
View File

@@ -0,0 +1,642 @@
// Dashboard module for LLM Proxy Gateway
use axum::{
extract::{ws::{Message, WebSocket, WebSocketUpgrade}, State},
response::{IntoResponse, Json},
routing::{get, post},
Router,
};
use serde::Serialize;
use sqlx::Row;
use std::collections::HashMap;
use tracing::{info, warn};
use crate::state::AppState;
// Dashboard state
#[derive(Clone)]
struct DashboardState {
app_state: AppState,
}
// API Response types
#[derive(Serialize)]
struct ApiResponse<T> {
success: bool,
data: Option<T>,
error: Option<String>,
}
impl<T> ApiResponse<T> {
fn success(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
}
}
fn error(error: String) -> Self {
Self {
success: false,
data: None,
error: Some(error),
}
}
}
// ... (keep routes as they are)
// Dashboard routes
pub fn router(state: AppState) -> Router {
let dashboard_state = DashboardState {
app_state: state,
};
Router::new()
// Static file serving
.nest_service("/", tower_http::services::ServeDir::new("static"))
.fallback_service(tower_http::services::ServeDir::new("static"))
// WebSocket endpoint
.route("/ws", get(handle_websocket))
// API endpoints
.route("/api/auth/login", post(handle_login))
.route("/api/auth/status", get(handle_auth_status))
.route("/api/usage/summary", get(handle_usage_summary))
.route("/api/usage/time-series", get(handle_time_series))
.route("/api/usage/clients", get(handle_clients_usage))
.route("/api/usage/providers", get(handle_providers_usage))
.route("/api/clients", get(handle_get_clients).post(handle_create_client))
.route("/api/clients/:id", get(handle_get_client).delete(handle_delete_client))
.route("/api/clients/:id/usage", get(handle_client_usage))
.route("/api/providers", get(handle_get_providers))
.route("/api/providers/:name", get(handle_get_provider).put(handle_update_provider))
.route("/api/providers/:name/test", post(handle_test_provider))
.route("/api/system/health", get(handle_system_health))
.route("/api/system/logs", get(handle_system_logs))
.route("/api/system/backup", post(handle_system_backup))
.with_state(dashboard_state)
}
// WebSocket handler
async fn handle_websocket(
ws: WebSocketUpgrade,
State(state): State<DashboardState>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_websocket_connection(socket, state))
}
async fn handle_websocket_connection(mut socket: WebSocket, state: DashboardState) {
info!("WebSocket connection established");
// Subscribe to events from the global bus
let mut rx = state.app_state.dashboard_tx.subscribe();
// Send initial connection message
let _ = socket.send(Message::Text(
serde_json::json!({
"type": "connected",
"message": "Connected to LLM Proxy Dashboard"
}).to_string().into(),
)).await;
// Handle incoming messages and broadcast events
loop {
tokio::select! {
// Receive broadcast events
Ok(event) = rx.recv() => {
let message = Message::Text(serde_json::to_string(&event).unwrap().into());
if socket.send(message).await.is_err() {
break;
}
}
// Receive WebSocket messages
result = socket.recv() => {
match result {
Some(Ok(Message::Text(text))) => {
handle_websocket_message(&text, &state).await;
}
_ => break,
}
}
}
}
info!("WebSocket connection closed");
}
async fn handle_websocket_message(text: &str, state: &DashboardState) {
// Parse and handle WebSocket messages
if let Ok(data) = serde_json::from_str::<serde_json::Value>(text) {
if let Some("ping") = data.get("type").and_then(|v| v.as_str()) {
let _ = state.app_state.dashboard_tx.send(serde_json::json!({
"event_type": "pong",
"data": {}
}));
}
}
}
// Authentication handlers
async fn handle_login() -> Json<ApiResponse<serde_json::Value>> {
// Simple authentication for demo
// In production, this would validate credentials against a database
Json(ApiResponse::success(serde_json::json!({
"token": "demo-token-123456",
"user": {
"username": "admin",
"name": "Administrator",
"role": "Super Admin"
}
})))
}
async fn handle_auth_status() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::success(serde_json::json!({
"authenticated": true,
"user": {
"username": "admin",
"name": "Administrator",
"role": "Super Admin"
}
})))
}
// Usage handlers
async fn handle_usage_summary(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
let pool = &state.app_state.db_pool;
// Total stats
let total_stats = sqlx::query(
r#"
SELECT
COUNT(*) as total_requests,
COALESCE(SUM(total_tokens), 0) as total_tokens,
COALESCE(SUM(cost), 0.0) as total_cost,
COUNT(DISTINCT client_id) as active_clients
FROM llm_requests
"#
)
.fetch_one(pool);
// Today's stats
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
let today_stats = sqlx::query(
r#"
SELECT
COUNT(*) as today_requests,
COALESCE(SUM(total_tokens), 0) as today_tokens,
COALESCE(SUM(cost), 0.0) as today_cost
FROM llm_requests
WHERE strftime('%Y-%m-%d', timestamp) = ?
"#
)
.bind(today)
.fetch_one(pool);
// Error stats
let error_stats = sqlx::query(
r#"
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors
FROM llm_requests
"#
)
.fetch_one(pool);
// Average response time
let avg_response = sqlx::query(
r#"
SELECT COALESCE(AVG(duration_ms), 0.0) as avg_duration
FROM llm_requests
WHERE status = 'success'
"#
)
.fetch_one(pool);
match tokio::join!(total_stats, today_stats, error_stats, avg_response) {
(Ok(t), Ok(d), Ok(e), Ok(a)) => {
let total_requests: i64 = t.get("total_requests");
let total_tokens: i64 = t.get("total_tokens");
let total_cost: f64 = t.get("total_cost");
let active_clients: i64 = t.get("active_clients");
let today_requests: i64 = d.get("today_requests");
let today_cost: f64 = d.get("today_cost");
let total_count: i64 = e.get("total");
let error_count: i64 = e.get("errors");
let error_rate = if total_count > 0 {
(error_count as f64 / total_count as f64) * 100.0
} else {
0.0
};
let avg_response_time: f64 = a.get("avg_duration");
Json(ApiResponse::success(serde_json::json!({
"total_requests": total_requests,
"total_tokens": total_tokens,
"total_cost": total_cost,
"active_clients": active_clients,
"today_requests": today_requests,
"today_cost": today_cost,
"error_rate": error_rate,
"avg_response_time": avg_response_time,
})))
}
_ => Json(ApiResponse::error("Failed to fetch usage statistics".to_string()))
}
}
async fn handle_time_series(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
let pool = &state.app_state.db_pool;
let now = chrono::Utc::now();
let twenty_four_hours_ago = now - chrono::Duration::hours(24);
let result = sqlx::query(
r#"
SELECT
strftime('%H:00', timestamp) as hour,
COUNT(*) as requests,
SUM(total_tokens) as tokens,
SUM(cost) as cost
FROM llm_requests
WHERE timestamp >= ?
GROUP BY hour
ORDER BY hour
"#
)
.bind(twenty_four_hours_ago)
.fetch_all(pool)
.await;
match result {
Ok(rows) => {
let mut series = Vec::new();
for row in rows {
let hour: String = row.get("hour");
let requests: i64 = row.get("requests");
let tokens: i64 = row.get("tokens");
let cost: f64 = row.get("cost");
series.push(serde_json::json!({
"time": hour,
"requests": requests,
"tokens": tokens,
"cost": cost,
}));
}
Json(ApiResponse::success(serde_json::json!({
"series": series,
"period": "24h"
})))
}
Err(e) => {
warn!("Failed to fetch time series data: {}", e);
Json(ApiResponse::error("Failed to fetch time series data".to_string()))
}
}
}
async fn handle_clients_usage(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
// Query database for client usage statistics
let pool = &state.app_state.db_pool;
let result = sqlx::query(
r#"
SELECT
client_id,
COUNT(*) as requests,
SUM(total_tokens) as tokens,
SUM(cost) as cost,
MAX(timestamp) as last_request
FROM llm_requests
GROUP BY client_id
ORDER BY requests DESC
"#
)
.fetch_all(pool)
.await;
match result {
Ok(rows) => {
let mut client_usage = Vec::new();
for row in rows {
let client_id: String = row.get("client_id");
let requests: i64 = row.get("requests");
let tokens: i64 = row.get("tokens");
let cost: f64 = row.get("cost");
let last_request: Option<chrono::DateTime<chrono::Utc>> = row.get("last_request");
client_usage.push(serde_json::json!({
"client_id": client_id,
"client_name": client_id,
"requests": requests,
"tokens": tokens,
"cost": cost,
"last_request": last_request,
}));
}
Json(ApiResponse::success(serde_json::json!(client_usage)))
}
Err(e) => {
warn!("Failed to fetch client usage data: {}", e);
Json(ApiResponse::error("Failed to fetch client usage data".to_string()))
}
}
}
async fn handle_providers_usage(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
// Query database for provider usage statistics
let pool = &state.app_state.db_pool;
let result = sqlx::query(
r#"
SELECT
provider,
COUNT(*) as requests,
COALESCE(SUM(total_tokens), 0) as tokens,
COALESCE(SUM(cost), 0.0) as cost
FROM llm_requests
GROUP BY provider
ORDER BY requests DESC
"#
)
.fetch_all(pool)
.await;
match result {
Ok(rows) => {
let mut provider_usage = Vec::new();
for row in rows {
let provider: String = row.get("provider");
let requests: i64 = row.get("requests");
let tokens: i64 = row.get("tokens");
let cost: f64 = row.get("cost");
provider_usage.push(serde_json::json!({
"provider": provider,
"requests": requests,
"tokens": tokens,
"cost": cost,
}));
}
Json(ApiResponse::success(serde_json::json!(provider_usage)))
}
Err(e) => {
warn!("Failed to fetch provider usage data: {}", e);
Json(ApiResponse::error("Failed to fetch provider usage data".to_string()))
}
}
}
// Client handlers
async fn handle_get_clients(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
let pool = &state.app_state.db_pool;
let result = sqlx::query(
r#"
SELECT
client_id as id,
name,
created_at,
total_requests,
total_tokens,
total_cost,
is_active
FROM clients
ORDER BY created_at DESC
"#
)
.fetch_all(pool)
.await;
match result {
Ok(rows) => {
let clients: Vec<serde_json::Value> = rows.into_iter().map(|row| {
serde_json::json!({
"id": row.get::<String, _>("id"),
"name": row.get::<Option<String>, _>("name").unwrap_or_else(|| "Unnamed".to_string()),
"created_at": row.get::<chrono::DateTime<chrono::Utc>, _>("created_at"),
"requests_count": row.get::<i64, _>("total_requests"),
"total_tokens": row.get::<i64, _>("total_tokens"),
"total_cost": row.get::<f64, _>("total_cost"),
"status": if row.get::<bool, _>("is_active") { "active" } else { "inactive" },
})
}).collect();
Json(ApiResponse::success(serde_json::json!(clients)))
}
Err(e) => {
warn!("Failed to fetch clients: {}", e);
Json(ApiResponse::error("Failed to fetch clients".to_string()))
}
}
}
async fn handle_create_client() -> Json<ApiResponse<serde_json::Value>> {
// In production, this would create a real client
Json(ApiResponse::success(serde_json::json!({
"id": format!("client-{}", rand::random::<u32>()),
"name": "New Client",
"token": format!("sk-demo-{}", rand::random::<u32>()),
"created_at": chrono::Utc::now().to_rfc3339(),
"last_used": None::<String>,
"requests_count": 0,
"status": "active",
})))
}
async fn handle_get_client() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::error("Not implemented".to_string()))
}
async fn handle_delete_client() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::success(serde_json::json!({
"success": true,
"message": "Client deleted"
})))
}
async fn handle_client_usage() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::error("Not implemented".to_string()))
}
// Provider handlers
async fn handle_get_providers(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
let registry = &state.app_state.model_registry;
let mut providers_json = Vec::new();
for (p_id, p_info) in &registry.providers {
let models: Vec<String> = p_info.models.keys().cloned().collect();
// Check if provider is healthy via circuit breaker
let status = if state.app_state.rate_limit_manager.check_provider_request(p_id).await.unwrap_or(true) {
"online"
} else {
"degraded"
};
providers_json.push(serde_json::json!({
"id": p_id,
"name": p_info.name,
"enabled": true,
"status": status,
"models": models,
"last_used": null, // TODO: track last used
}));
}
// Add Ollama explicitly
providers_json.push(serde_json::json!({
"id": "ollama",
"name": "Ollama",
"enabled": true,
"status": "online",
"models": ["llama3", "mistral", "phi3"],
"last_used": null,
}));
Json(ApiResponse::success(serde_json::json!(providers_json)))
}
async fn handle_get_provider() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::error("Not implemented".to_string()))
}
async fn handle_update_provider() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::success(serde_json::json!({
"success": true,
"message": "Provider updated"
})))
}
async fn handle_test_provider() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::success(serde_json::json!({
"success": true,
"latency": rand::random::<u32>() % 500 + 100,
"message": "Connection test successful"
})))
}
// System handlers
async fn handle_system_health(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
let mut components = HashMap::new();
components.insert("api_server", "online");
components.insert("database", "online");
// Check provider health via circuit breakers
for p_id in state.app_state.model_registry.providers.keys() {
if state.app_state.rate_limit_manager.check_provider_request(p_id).await.unwrap_or(true) {
components.insert(p_id.as_str(), "online");
} else {
components.insert(p_id.as_str(), "degraded");
}
}
// Check Ollama health
if state.app_state.rate_limit_manager.check_provider_request("ollama").await.unwrap_or(true) {
components.insert("ollama", "online");
} else {
components.insert("ollama", "degraded");
}
Json(ApiResponse::success(serde_json::json!({
"status": "healthy",
"timestamp": chrono::Utc::now().to_rfc3339(),
"components": components,
"metrics": {
"cpu_usage": rand::random::<f64>() * 10.0 + 5.0,
"memory_usage": rand::random::<f64>() * 20.0 + 40.0,
"active_connections": rand::random::<u32>() % 20 + 5,
}
})))
}
async fn handle_system_logs(State(state): State<DashboardState>) -> Json<ApiResponse<serde_json::Value>> {
let pool = &state.app_state.db_pool;
let result = sqlx::query(
r#"
SELECT
id,
timestamp,
client_id,
provider,
model,
prompt_tokens,
completion_tokens,
total_tokens,
cost,
status,
error_message,
duration_ms
FROM llm_requests
ORDER BY timestamp DESC
LIMIT 100
"#
)
.fetch_all(pool)
.await;
match result {
Ok(rows) => {
let logs: Vec<serde_json::Value> = rows.into_iter().map(|row| {
serde_json::json!({
"id": row.get::<i64, _>("id"),
"timestamp": row.get::<chrono::DateTime<chrono::Utc>, _>("timestamp"),
"client_id": row.get::<String, _>("client_id"),
"provider": row.get::<String, _>("provider"),
"model": row.get::<String, _>("model"),
"tokens": row.get::<i64, _>("total_tokens"),
"cost": row.get::<f64, _>("cost"),
"status": row.get::<String, _>("status"),
"error": row.get::<Option<String>, _>("error_message"),
"duration": row.get::<i64, _>("duration_ms"),
})
}).collect();
Json(ApiResponse::success(serde_json::json!(logs)))
}
Err(e) => {
warn!("Failed to fetch system logs: {}", e);
Json(ApiResponse::error("Failed to fetch system logs".to_string()))
}
}
}
async fn handle_system_backup() -> Json<ApiResponse<serde_json::Value>> {
Json(ApiResponse::success(serde_json::json!({
"success": true,
"message": "Backup initiated",
"backup_id": format!("backup-{}", chrono::Utc::now().timestamp()),
})))
}
// Helper functions
#[allow(dead_code)]
fn mask_token(token: &str) -> String {
if token.len() <= 8 {
return "*****".to_string();
}
let masked_len = token.len().min(12);
let visible_len = 4;
let mask_len = masked_len - visible_len;
format!("{}{}", "*".repeat(mask_len), &token[token.len() - visible_len..])
}