feat: Phase 2 - reliability & observability
- Circuit breaker: proper thresholds (3 failures, 30s timeout) - HTTP timeouts: 30s on all providers (was no timeout) - Structured logging: slog replaces fmt.Printf throughout - Stream errors: propagated as SSE error events to client - Registry fetch: retry with backoff (3 attempts) - Registry reads in dashboard protected by RWMutex
This commit is contained in:
@@ -0,0 +1,47 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var level = slog.LevelInfo
|
||||
|
||||
func init() {
|
||||
env := os.Getenv("LLM_PROXY_LOG_LEVEL")
|
||||
switch strings.ToLower(env) {
|
||||
case "debug":
|
||||
level = slog.LevelDebug
|
||||
case "warn":
|
||||
level = slog.LevelWarn
|
||||
case "error":
|
||||
level = slog.LevelError
|
||||
}
|
||||
|
||||
h := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: level,
|
||||
})
|
||||
slog.SetDefault(slog.New(h))
|
||||
}
|
||||
|
||||
// Warn is a helper to emit structured warnings.
|
||||
func Warn(msg string, args ...any) {
|
||||
slog.Warn(msg, args...)
|
||||
}
|
||||
|
||||
// Error is a helper to emit structured errors.
|
||||
func Error(msg string, args ...any) {
|
||||
slog.Error(msg, args...)
|
||||
}
|
||||
|
||||
// Debug is a helper to emit structured debug messages.
|
||||
func Debug(msg string, args ...any) {
|
||||
slog.Debug(msg, args...)
|
||||
}
|
||||
|
||||
// Ctx wraps slog with context.
|
||||
func Ctx(ctx context.Context) *slog.Logger {
|
||||
return slog.Default()
|
||||
}
|
||||
@@ -26,12 +26,12 @@ type ChatCompletionRequest struct {
|
||||
}
|
||||
|
||||
type ChatMessage struct {
|
||||
Role string `json:"role"` // "system", "user", "assistant", "tool"
|
||||
Content interface{} `json:"content"`
|
||||
ReasoningContent *string `json:"reasoning_content,omitempty"`
|
||||
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
|
||||
Name *string `json:"name,omitempty"`
|
||||
ToolCallID *string `json:"tool_call_id,omitempty"`
|
||||
Role string `json:"role"` // "system", "user", "assistant", "tool"
|
||||
Content interface{} `json:"content"`
|
||||
ReasoningContent *string `json:"reasoning_content,omitempty"`
|
||||
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
|
||||
Name *string `json:"name,omitempty"`
|
||||
ToolCallID *string `json:"tool_call_id,omitempty"`
|
||||
}
|
||||
|
||||
type ContentPart struct {
|
||||
@@ -53,9 +53,9 @@ type Tool struct {
|
||||
}
|
||||
|
||||
type FunctionDef struct {
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name"`
|
||||
Description *string `json:"description,omitempty"`
|
||||
Parameters json.RawMessage `json:"parameters,omitempty"`
|
||||
Parameters json.RawMessage `json:"parameters,omitempty"`
|
||||
}
|
||||
|
||||
type ToolCall struct {
|
||||
@@ -116,6 +116,7 @@ type ChatCompletionStreamResponse struct {
|
||||
Model string `json:"model"`
|
||||
Choices []ChatStreamChoice `json:"choices"`
|
||||
Usage *Usage `json:"usage,omitempty"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type ChatStreamChoice struct {
|
||||
|
||||
@@ -2,6 +2,7 @@ package providers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/sony/gobreaker"
|
||||
"gophergate/internal/models"
|
||||
@@ -13,8 +14,20 @@ type CircuitBreakerProvider struct {
|
||||
}
|
||||
|
||||
func NewCircuitBreakerProvider(p Provider) Provider {
|
||||
name := p.Name()
|
||||
var maxRequests uint32 = 5
|
||||
var interval = 60 * time.Second
|
||||
var timeout = 30 * time.Second
|
||||
|
||||
settings := gobreaker.Settings{
|
||||
Name: p.Name(),
|
||||
Name: name,
|
||||
MaxRequests: maxRequests,
|
||||
Interval: interval,
|
||||
Timeout: timeout,
|
||||
ReadyToTrip: func(counts gobreaker.Counts) bool {
|
||||
// Trip after 3 consecutive failures
|
||||
return counts.ConsecutiveFailures > 3
|
||||
},
|
||||
}
|
||||
return &CircuitBreakerProvider{
|
||||
provider: p,
|
||||
|
||||
@@ -3,6 +3,7 @@ package providers
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"time"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -21,7 +22,7 @@ type DeepSeekProvider struct {
|
||||
|
||||
func NewDeepSeekProvider(cfg config.DeepSeekConfig, apiKey string) *DeepSeekProvider {
|
||||
return &DeepSeekProvider{
|
||||
client: resty.New(),
|
||||
client: resty.New().SetTimeout(30 * time.Second),
|
||||
config: cfg,
|
||||
apiKey: apiKey,
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package providers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -19,7 +20,7 @@ type GeminiProvider struct {
|
||||
|
||||
func NewGeminiProvider(cfg config.GeminiConfig, apiKey string) *GeminiProvider {
|
||||
return &GeminiProvider{
|
||||
client: resty.New(),
|
||||
client: resty.New().SetTimeout(30 * time.Second),
|
||||
config: cfg,
|
||||
apiKey: apiKey,
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package providers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
@@ -18,7 +19,7 @@ type GrokProvider struct {
|
||||
|
||||
func NewGrokProvider(cfg config.GrokConfig, apiKey string) *GrokProvider {
|
||||
return &GrokProvider{
|
||||
client: resty.New(),
|
||||
client: resty.New().SetTimeout(30 * time.Second),
|
||||
config: cfg,
|
||||
apiKey: apiKey,
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package providers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -19,7 +20,7 @@ type MoonshotProvider struct {
|
||||
|
||||
func NewMoonshotProvider(cfg config.MoonshotConfig, apiKey string) *MoonshotProvider {
|
||||
return &MoonshotProvider{
|
||||
client: resty.New(),
|
||||
client: resty.New().SetTimeout(30 * time.Second),
|
||||
config: cfg,
|
||||
apiKey: strings.TrimSpace(apiKey),
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/go-resty/resty/v2"
|
||||
"gophergate/internal/config"
|
||||
"gophergate/internal/models"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
type OllamaProvider struct {
|
||||
|
||||
@@ -2,6 +2,7 @@ package providers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"gophergate/internal/db"
|
||||
"gophergate/internal/models"
|
||||
"gophergate/internal/utils"
|
||||
"log/slog"
|
||||
|
||||
"github.com/shirou/gopsutil/v3/cpu"
|
||||
"github.com/shirou/gopsutil/v3/disk"
|
||||
@@ -879,6 +880,7 @@ func (s *Server) handleGetProviders(c *gin.Context) {
|
||||
|
||||
// Get models for this provider from registry
|
||||
var models []string
|
||||
s.registryMu.RLock()
|
||||
if s.registry != nil {
|
||||
registryID := id
|
||||
if id == "gemini" {
|
||||
@@ -897,6 +899,7 @@ func (s *Server) handleGetProviders(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
}
|
||||
s.registryMu.RUnlock()
|
||||
|
||||
// If it's ollama, also include models from config
|
||||
if id == "ollama" {
|
||||
@@ -1060,6 +1063,7 @@ func (s *Server) handleGetModels(c *gin.Context) {
|
||||
}
|
||||
|
||||
var result []gin.H
|
||||
s.registryMu.RLock()
|
||||
if s.registry != nil {
|
||||
for pID, pInfo := range s.registry.Providers {
|
||||
proxyProvider, allowed := allowedRegistryProviders[pID]
|
||||
@@ -1210,6 +1214,7 @@ func (s *Server) handleUpdateModel(c *gin.Context) {
|
||||
|
||||
// Find provider for this model
|
||||
providerID := "unknown"
|
||||
s.registryMu.RLock()
|
||||
if s.registry != nil {
|
||||
for pID, pInfo := range s.registry.Providers {
|
||||
if _, ok := pInfo.Models[id]; ok {
|
||||
@@ -1388,6 +1393,7 @@ func (s *Server) handleSystemMetrics(c *gin.Context) {
|
||||
func (s *Server) handleGetSettings(c *gin.Context) {
|
||||
providerCount := 0
|
||||
modelCount := 0
|
||||
s.registryMu.RLock()
|
||||
if s.registry != nil {
|
||||
providerCount = len(s.registry.Providers)
|
||||
for _, p := range s.registry.Providers {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"gophergate/internal/config"
|
||||
"gophergate/internal/db"
|
||||
"gophergate/internal/middleware"
|
||||
"log/slog"
|
||||
"gophergate/internal/models"
|
||||
"gophergate/internal/providers"
|
||||
"gophergate/internal/utils"
|
||||
|
||||
+28
-21
@@ -6,38 +6,48 @@ import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"gophergate/internal/models"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"gophergate/internal/models"
|
||||
)
|
||||
|
||||
const ModelsDevURL = "https://models.dev/api.json"
|
||||
|
||||
func FetchRegistry() (*models.ModelRegistry, error) {
|
||||
log.Printf("Fetching model registry from %s", ModelsDevURL)
|
||||
|
||||
client := resty.New().SetTimeout(10 * time.Second)
|
||||
resp, err := client.R().Get(ModelsDevURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch registry: %w", err)
|
||||
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < 3; attempt++ {
|
||||
if attempt > 0 {
|
||||
backoff := time.Duration(1<<attempt) * time.Second
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
resp, err := client.R().Get(ModelsDevURL)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("attempt %d: %w", attempt+1, err)
|
||||
continue
|
||||
}
|
||||
if !resp.IsSuccess() {
|
||||
lastErr = fmt.Errorf("attempt %d: HTTP %d", attempt+1, resp.StatusCode())
|
||||
continue
|
||||
}
|
||||
|
||||
var providers map[string]models.ProviderInfo
|
||||
if err := json.Unmarshal(resp.Body(), &providers); err != nil {
|
||||
lastErr = fmt.Errorf("attempt %d: unmarshal: %w", attempt+1, err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Println("Successfully loaded model registry")
|
||||
return &models.ModelRegistry{Providers: providers}, nil
|
||||
}
|
||||
|
||||
if !resp.IsSuccess() {
|
||||
return nil, fmt.Errorf("failed to fetch registry: HTTP %d", resp.StatusCode())
|
||||
}
|
||||
|
||||
var providers map[string]models.ProviderInfo
|
||||
if err := json.Unmarshal(resp.Body(), &providers); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal registry: %w", err)
|
||||
}
|
||||
|
||||
log.Println("Successfully loaded model registry")
|
||||
return &models.ModelRegistry{Providers: providers}, nil
|
||||
return nil, fmt.Errorf("failed to fetch registry after 3 attempts: %w", lastErr)
|
||||
}
|
||||
|
||||
func CalculateCost(registry *models.ModelRegistry, modelID string, promptTokens, completionTokens, reasoningTokens, cacheRead, cacheWrite uint32) float64 {
|
||||
meta := registry.FindModel(modelID)
|
||||
if meta == nil || meta.Cost == nil {
|
||||
log.Printf("[DEBUG] CalculateCost: model %s not found or has no cost metadata", modelID)
|
||||
return 0.0
|
||||
}
|
||||
|
||||
@@ -62,8 +72,5 @@ func CalculateCost(registry *models.ModelRegistry, modelID string, promptTokens,
|
||||
cost += float64(cacheWrite) * (*meta.Cost.CacheWrite) / 1000000.0
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] CalculateCost: model=%s, uncached=%d, completion=%d, reasoning=%d, cache_read=%d, cache_write=%d, cost=%f (input_rate=%f, output_rate=%f)",
|
||||
modelID, uncachedTokens, completionTokens, reasoningTokens, cacheRead, cacheWrite, cost, meta.Cost.Input, meta.Cost.Output)
|
||||
|
||||
return cost
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user