e5ef39f327
Add full Responses API endpoint alongside existing Chat Completions, with identical logging/tracking/cost pipeline. New: - internal/models/responses.go — request/response/stream types + ToUsage() bridge - internal/providers/openai_responses.go — OpenAI Responses/ResponsesStream Modified: - provider.go — Responses()+ResponsesStream() added to Provider interface - helpers.go — BuildOpenAIResponsesBody, parsers, SSE stream reader - circuit_breaker.go — CB wraps Responses, passthrough for stream - server.go — POST /v1/responses route + handleResponses handler - all non-OpenAI providers — stub methods with clear error messages Logging: ResponsesUsage.ToUsage() bridges to models.Usage, feeding same logRequest() -> DB insert -> dashboard WS -> client stats -> cost calc pipeline. No schema or logger changes needed.
82 lines
2.5 KiB
Go
82 lines
2.5 KiB
Go
package providers
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/sony/gobreaker"
|
|
"gophergate/internal/models"
|
|
)
|
|
|
|
type CircuitBreakerProvider struct {
|
|
provider Provider
|
|
cb *gobreaker.CircuitBreaker
|
|
}
|
|
|
|
func NewCircuitBreakerProvider(p Provider) Provider {
|
|
name := p.Name()
|
|
var maxRequests uint32 = 5
|
|
var interval = 60 * time.Second
|
|
var timeout = 5 * time.Minute
|
|
|
|
settings := gobreaker.Settings{
|
|
Name: name,
|
|
MaxRequests: maxRequests,
|
|
Interval: interval,
|
|
Timeout: timeout,
|
|
ReadyToTrip: func(counts gobreaker.Counts) bool {
|
|
// Trip after 3 consecutive failures
|
|
return counts.ConsecutiveFailures > 3
|
|
},
|
|
}
|
|
return &CircuitBreakerProvider{
|
|
provider: p,
|
|
cb: gobreaker.NewCircuitBreaker(settings),
|
|
}
|
|
}
|
|
|
|
func (cbp *CircuitBreakerProvider) Name() string {
|
|
return cbp.provider.Name()
|
|
}
|
|
|
|
func (cbp *CircuitBreakerProvider) ChatCompletion(ctx context.Context, req *models.UnifiedRequest) (*models.ChatCompletionResponse, error) {
|
|
result, err := cbp.cb.Execute(func() (interface{}, error) {
|
|
return cbp.provider.ChatCompletion(ctx, req)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.(*models.ChatCompletionResponse), nil
|
|
}
|
|
|
|
func (cbp *CircuitBreakerProvider) ChatCompletionStream(ctx context.Context, req *models.UnifiedRequest) (<-chan *models.ChatCompletionStreamResponse, error) {
|
|
// Circuit breaker for streaming is tricky. We'll just call the provider directly.
|
|
// Future: Implement a way to track stream failures in the circuit breaker.
|
|
return cbp.provider.ChatCompletionStream(ctx, req)
|
|
}
|
|
|
|
func (cbp *CircuitBreakerProvider) ImageGeneration(ctx context.Context, req *models.ImageGenerationRequest) (*models.ImageGenerationResponse, error) {
|
|
result, err := cbp.cb.Execute(func() (interface{}, error) {
|
|
return cbp.provider.ImageGeneration(ctx, req)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.(*models.ImageGenerationResponse), nil
|
|
}
|
|
|
|
func (cbp *CircuitBreakerProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) {
|
|
result, err := cbp.cb.Execute(func() (interface{}, error) {
|
|
return cbp.provider.Responses(ctx, req)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.(*models.ResponsesResponse), nil
|
|
}
|
|
|
|
func (cbp *CircuitBreakerProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) {
|
|
// Circuit breaker passthrough for streaming (same pattern as ChatCompletionStream)
|
|
return cbp.provider.ResponsesStream(ctx, req)
|
|
}
|