diff --git a/internal/models/responses.go b/internal/models/responses.go new file mode 100644 index 00000000..4d8477eb --- /dev/null +++ b/internal/models/responses.go @@ -0,0 +1,141 @@ +package models + +import "encoding/json" + +// Responses API request types + +// ResponsesRequest maps to POST /v1/responses body (OpenAI Responses API format). +// The `input` field can be a string or an array of message objects. +type ResponsesRequest struct { + Model string `json:"model"` + Input json.RawMessage `json:"input"` // string or []ResponseInputMessage + Instructions string `json:"instructions,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + MaxOutputTokens *uint32 `json:"max_output_tokens,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + Stream *bool `json:"stream,omitempty"` + Tools json.RawMessage `json:"tools,omitempty"` + ToolChoice json.RawMessage `json:"tool_choice,omitempty"` + Store *bool `json:"store,omitempty"` +} + +// ResponseInputMessage represents a single message in the input array. +type ResponseInputMessage struct { + Role string `json:"role"` + Content json.RawMessage `json:"content"` // string or []ContentPart +} + +// Responses API response types + +// ResponsesResponse maps to OpenAI /v1/responses response. +type ResponsesResponse struct { + ID string `json:"id"` + Object string `json:"object"` + Model string `json:"model"` + Output []ResponsesOutputItem `json:"output"` + Usage *ResponsesUsage `json:"usage,omitempty"` +} + +// ResponsesOutputItem represents an item in the output array. +// For messages: type="message", role, content[]. +// For function calls: type="function_call", id, name, arguments, status. +type ResponsesOutputItem struct { + Type string `json:"type"` + Role string `json:"role,omitempty"` + Content []ResponsesOutputContent `json:"content,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` + Status string `json:"status,omitempty"` +} + +// ResponsesOutputContent represents content parts within an output message. +type ResponsesOutputContent struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Annotations []json.RawMessage `json:"annotations,omitempty"` +} + +// ResponsesUsage maps to the usage block in Responses API. +type ResponsesUsage struct { + InputTokens uint32 `json:"input_tokens"` + OutputTokens uint32 `json:"output_tokens"` + TotalTokens uint32 `json:"total_tokens"` + InputTokensDetails *ResponsesInputTokensDetails `json:"input_tokens_details,omitempty"` + OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"` +} + +// ResponsesInputTokensDetails maps input token details. +type ResponsesInputTokensDetails struct { + CachedTokens uint32 `json:"cached_tokens"` +} + +// ResponsesOutputTokensDetails maps output token details. +type ResponsesOutputTokensDetails struct { + ReasoningTokens uint32 `json:"reasoning_tokens"` +} + +// ToUsage converts ResponsesUsage to the unified Usage model. +func (u *ResponsesUsage) ToUsage() *Usage { + usage := &Usage{ + PromptTokens: u.InputTokens, + CompletionTokens: u.OutputTokens, + TotalTokens: u.TotalTokens, + } + if u.InputTokensDetails != nil && u.InputTokensDetails.CachedTokens > 0 { + usage.CacheReadTokens = &u.InputTokensDetails.CachedTokens + } + if u.OutputTokensDetails != nil && u.OutputTokensDetails.ReasoningTokens > 0 { + usage.ReasoningTokens = &u.OutputTokensDetails.ReasoningTokens + } + return usage +} + +// ResponsesStreamChunk represents an SSE chunk from the Responses streaming endpoint. +type ResponsesStreamChunk struct { + Type string `json:"type"` + Response *ResponsesStreamPayload `json:"response,omitempty"` + Item *ResponsesStreamPayloadItem `json:"item,omitempty"` + Delta *ResponsesStreamDelta `json:"delta,omitempty"` +} + +// ResponsesStreamPayload represents the "response" field in some SSE chunks. +type ResponsesStreamPayload struct { + Object string `json:"object"` + ID string `json:"id"` + Model string `json:"model"` + Usage *ResponsesUsage `json:"usage,omitempty"` +} + +// ResponsesStreamPayloadItem represents the "item" field in SSE chunks. +type ResponsesStreamPayloadItem struct { + Type string `json:"type"` + Role string `json:"role,omitempty"` + Content []ResponsesOutputContent `json:"content,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` +} + +// ResponsesStreamDelta represents a content delta in streaming. +type ResponsesStreamDelta struct { + ContentIndex int `json:"content_index"` + Type string `json:"type"` + Text string `json:"text,omitempty"` +} + +// UnifiedResponsesRequest is the internal unified format for Responses API. +type UnifiedResponsesRequest struct { + ClientID string + Model string + Input string // normalized input text + InputMessages []ResponseInputMessage // structured input messages (if provided as array) + Instructions string + Temperature *float64 + MaxOutputTokens *uint32 + TopP *float64 + Stream bool + Tools json.RawMessage + ToolChoice json.RawMessage + Store bool +} diff --git a/internal/providers/circuit_breaker.go b/internal/providers/circuit_breaker.go index 0c9af99d..bcfb23c3 100644 --- a/internal/providers/circuit_breaker.go +++ b/internal/providers/circuit_breaker.go @@ -64,3 +64,18 @@ func (cbp *CircuitBreakerProvider) ImageGeneration(ctx context.Context, req *mod } return result.(*models.ImageGenerationResponse), nil } + +func (cbp *CircuitBreakerProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) { + result, err := cbp.cb.Execute(func() (interface{}, error) { + return cbp.provider.Responses(ctx, req) + }) + if err != nil { + return nil, err + } + return result.(*models.ResponsesResponse), nil +} + +func (cbp *CircuitBreakerProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) { + // Circuit breaker passthrough for streaming (same pattern as ChatCompletionStream) + return cbp.provider.ResponsesStream(ctx, req) +} diff --git a/internal/providers/deepseek.go b/internal/providers/deepseek.go index 994c5f1f..a9e4070f 100644 --- a/internal/providers/deepseek.go +++ b/internal/providers/deepseek.go @@ -223,3 +223,11 @@ func StreamDeepSeek(ctx io.ReadCloser, ch chan<- *models.ChatCompletionStreamRes func (p *DeepSeekProvider) ImageGeneration(ctx context.Context, req *models.ImageGenerationRequest) (*models.ImageGenerationResponse, error) { return nil, fmt.Errorf("deepseek does not support image generation") } + +func (p *DeepSeekProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) { + return nil, fmt.Errorf("responses API not supported by deepseek") +} + +func (p *DeepSeekProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) { + return nil, fmt.Errorf("responses API not supported by deepseek") +} diff --git a/internal/providers/gemini.go b/internal/providers/gemini.go index 7c743a04..157f00dd 100644 --- a/internal/providers/gemini.go +++ b/internal/providers/gemini.go @@ -185,6 +185,14 @@ func sizeToGeminiAspectRatio(size string) string { } } +func (p *GeminiProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) { + return nil, fmt.Errorf("responses API not supported by gemini") +} + +func (p *GeminiProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) { + return nil, fmt.Errorf("responses API not supported by gemini") +} + func (p *GeminiProvider) ChatCompletion(ctx context.Context, req *models.UnifiedRequest) (*models.ChatCompletionResponse, error) { // Gemini mapping var contents []GeminiContent diff --git a/internal/providers/grok.go b/internal/providers/grok.go index 6d127e78..b3a9460f 100644 --- a/internal/providers/grok.go +++ b/internal/providers/grok.go @@ -98,3 +98,11 @@ func (p *GrokProvider) ChatCompletionStream(ctx context.Context, req *models.Uni func (p *GrokProvider) ImageGeneration(ctx context.Context, req *models.ImageGenerationRequest) (*models.ImageGenerationResponse, error) { return nil, fmt.Errorf("grok does not support image generation") } + +func (p *GrokProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) { + return nil, fmt.Errorf("responses API not supported by grok") +} + +func (p *GrokProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) { + return nil, fmt.Errorf("responses API not supported by grok") +} diff --git a/internal/providers/helpers.go b/internal/providers/helpers.go index a0369aea..a21250ad 100644 --- a/internal/providers/helpers.go +++ b/internal/providers/helpers.go @@ -133,6 +133,133 @@ func BuildOpenAIBody(request *models.UnifiedRequest, messagesJSON []interface{}, return body } +// BuildOpenAIResponsesBody builds the request body for the Responses API endpoint. +func BuildOpenAIResponsesBody(req *models.ResponsesRequest, stream bool) map[string]interface{} { + body := map[string]interface{}{ + "model": req.Model, + "stream": stream, + } + + // The input field can be a string or a structured array. + // Try to preserve the original format. + if req.Input != nil { + // Try as string first + var inputStr string + if err := json.Unmarshal(req.Input, &inputStr); err == nil { + body["input"] = inputStr + } else { + // Try as array of messages + var inputArr []interface{} + if err := json.Unmarshal(req.Input, &inputArr); err == nil { + body["input"] = inputArr + } + } + } + + if req.Instructions != "" { + body["instructions"] = req.Instructions + } + if req.Temperature != nil { + body["temperature"] = *req.Temperature + } + if req.MaxOutputTokens != nil { + body["max_output_tokens"] = *req.MaxOutputTokens + } + if req.TopP != nil { + body["top_p"] = *req.TopP + } + if req.Tools != nil { + var tools interface{} + if err := json.Unmarshal(req.Tools, &tools); err == nil { + body["tools"] = tools + } + } + if req.ToolChoice != nil { + var toolChoice interface{} + if err := json.Unmarshal(req.ToolChoice, &toolChoice); err == nil { + body["tool_choice"] = toolChoice + } + } + if req.Store != nil { + body["store"] = *req.Store + } + + if stream { + body["stream_options"] = map[string]interface{}{ + "include_usage": true, + } + } + + return body +} + +// ParseOpenAIResponsesResponse parses a raw JSON map into a ResponsesResponse. +func ParseOpenAIResponsesResponse(respJSON map[string]interface{}, model string) (*models.ResponsesResponse, error) { + data, err := json.Marshal(respJSON) + if err != nil { + return nil, err + } + + var resp models.ResponsesResponse + if err := json.Unmarshal(data, &resp); err != nil { + return nil, err + } + + // Re-parse usage with the detailed tokens + if usageData, ok := respJSON["usage"]; ok { + var responsesUsage models.ResponsesUsage + usageBytes, _ := json.Marshal(usageData) + if err := json.Unmarshal(usageBytes, &responsesUsage); err == nil { + resp.Usage = &responsesUsage + } + } + + return &resp, nil +} + +// ParseOpenAIResponsesStreamChunk parses a single SSE line into a ResponsesStreamChunk. +// Returns the chunk, whether this is the [DONE] signal, and any error. +func ParseOpenAIResponsesStreamChunk(line string) (*models.ResponsesStreamChunk, bool, error) { + if line == "" { + return nil, false, nil + } + if !strings.HasPrefix(line, "data: ") { + return nil, false, nil + } + + data := strings.TrimPrefix(line, "data: ") + if data == "[DONE]" { + return nil, true, nil + } + + var chunk models.ResponsesStreamChunk + if err := json.Unmarshal([]byte(data), &chunk); err != nil { + return nil, false, fmt.Errorf("failed to unmarshal responses stream chunk: %w", err) + } + + return &chunk, false, nil +} + +// StreamOpenAIResponses reads SSE chunks from the body and sends them to the channel. +func StreamOpenAIResponses(ctx io.ReadCloser, ch chan<- *models.ResponsesStreamChunk) error { + defer ctx.Close() + scanner := bufio.NewScanner(ctx) + for scanner.Scan() { + line := scanner.Text() + chunk, done, err := ParseOpenAIResponsesStreamChunk(line) + if err != nil { + return err + } + if done { + break + } + if chunk != nil { + ch <- chunk + } + } + return scanner.Err() +} + type openAIUsage struct { PromptTokens uint32 `json:"prompt_tokens"` CompletionTokens uint32 `json:"completion_tokens"` diff --git a/internal/providers/moonshot.go b/internal/providers/moonshot.go index d937a311..cf36a03c 100644 --- a/internal/providers/moonshot.go +++ b/internal/providers/moonshot.go @@ -117,3 +117,11 @@ func (p *MoonshotProvider) ChatCompletionStream(ctx context.Context, req *models func (p *MoonshotProvider) ImageGeneration(ctx context.Context, req *models.ImageGenerationRequest) (*models.ImageGenerationResponse, error) { return nil, fmt.Errorf("moonshot does not support image generation") } + +func (p *MoonshotProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) { + return nil, fmt.Errorf("responses API not supported by moonshot") +} + +func (p *MoonshotProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) { + return nil, fmt.Errorf("responses API not supported by moonshot") +} diff --git a/internal/providers/ollama.go b/internal/providers/ollama.go index 528b8581..641fb210 100644 --- a/internal/providers/ollama.go +++ b/internal/providers/ollama.go @@ -253,3 +253,11 @@ func StreamOllama(ctx io.ReadCloser, ch chan<- *models.ChatCompletionStreamRespo func (p *OllamaProvider) ImageGeneration(ctx context.Context, req *models.ImageGenerationRequest) (*models.ImageGenerationResponse, error) { return nil, fmt.Errorf("ollama does not support image generation") } + +func (p *OllamaProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) { + return nil, fmt.Errorf("responses API not supported by ollama") +} + +func (p *OllamaProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) { + return nil, fmt.Errorf("responses API not supported by ollama") +} diff --git a/internal/providers/openai_responses.go b/internal/providers/openai_responses.go new file mode 100644 index 00000000..f8829eb2 --- /dev/null +++ b/internal/providers/openai_responses.go @@ -0,0 +1,70 @@ +package providers + +import ( + "context" + "encoding/json" + "fmt" + + "gophergate/internal/models" +) + +// Responses sends a non-streaming request to OpenAI's /v1/responses endpoint. +func (p *OpenAIProvider) Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) { + // Determine if streaming was requested + stream := req.Stream != nil && *req.Stream + + body := BuildOpenAIResponsesBody(req, stream) + + resp, err := p.client.R(). + SetContext(ctx). + SetHeader("Authorization", "Bearer "+p.apiKey). + SetBody(body). + Post(fmt.Sprintf("%s/responses", p.config.BaseURL)) + + if err != nil { + return nil, fmt.Errorf("responses request failed: %w", err) + } + + if !resp.IsSuccess() { + return nil, fmt.Errorf("OpenAI Responses API error (%d): %s", resp.StatusCode(), resp.String()) + } + + var respJSON map[string]interface{} + if err := json.Unmarshal(resp.Body(), &respJSON); err != nil { + return nil, fmt.Errorf("failed to parse responses response: %w", err) + } + + return ParseOpenAIResponsesResponse(respJSON, req.Model) +} + +// ResponsesStream sends a streaming request to OpenAI's /v1/responses endpoint. +func (p *OpenAIProvider) ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) { + body := BuildOpenAIResponsesBody(req, true) + + resp, err := p.client.R(). + SetContext(ctx). + SetHeader("Authorization", "Bearer "+p.apiKey). + SetBody(body). + SetDoNotParseResponse(true). + Post(fmt.Sprintf("%s/responses", p.config.BaseURL)) + + if err != nil { + return nil, fmt.Errorf("responses stream request failed: %w", err) + } + + if !resp.IsSuccess() { + return nil, fmt.Errorf("OpenAI Responses API error (%d): %s", resp.StatusCode(), resp.String()) + } + + ch := make(chan *models.ResponsesStreamChunk) + + go func() { + defer close(ch) + err := StreamOpenAIResponses(resp.RawBody(), ch) + if err != nil { + fmt.Printf("Responses stream error: %v\n", err) + } + }() + + return ch, nil +} diff --git a/internal/providers/provider.go b/internal/providers/provider.go index a43b1bd4..2ba7abde 100644 --- a/internal/providers/provider.go +++ b/internal/providers/provider.go @@ -11,4 +11,6 @@ type Provider interface { ChatCompletion(ctx context.Context, req *models.UnifiedRequest) (*models.ChatCompletionResponse, error) ChatCompletionStream(ctx context.Context, req *models.UnifiedRequest) (<-chan *models.ChatCompletionStreamResponse, error) ImageGeneration(ctx context.Context, req *models.ImageGenerationRequest) (*models.ImageGenerationResponse, error) + Responses(ctx context.Context, req *models.ResponsesRequest) (*models.ResponsesResponse, error) + ResponsesStream(ctx context.Context, req *models.ResponsesRequest) (<-chan *models.ResponsesStreamChunk, error) } diff --git a/internal/server/server.go b/internal/server/server.go index 6b975a2c..ec4c1453 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -189,7 +189,7 @@ func (s *Server) setupRoutes() { v1.POST("/chat/completions", s.handleChatCompletions) v1.POST("/images/generations", s.handleImageGenerations) v1.GET("/models", s.handleListModels) - v1.GET("/responses", s.handleListResponses) + v1.POST("/responses", s.handleResponses) } // Dashboard API Group @@ -246,9 +246,117 @@ func (s *Server) setupRoutes() { }) } -func (s *Server) handleListResponses(c *gin.Context) { - // This is a placeholder for the /v1/responses endpoint - c.JSON(http.StatusOK, gin.H{"data": []interface{}{}}) +func (s *Server) handleResponses(c *gin.Context) { + startTime := time.Now() + var req models.ResponsesRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Select provider based on model name + providerName := "openai" // default for Responses API + modelLower := strings.ToLower(req.Model) + if strings.HasPrefix(modelLower, "gemini/") || strings.Contains(modelLower, "gemini") || strings.HasPrefix(modelLower, "google/") { + providerName = "gemini" + } else if strings.HasPrefix(modelLower, "deepseek/") || (strings.Contains(modelLower, "deepseek") && !strings.Contains(modelLower, "ollama")) { + providerName = "deepseek" + } else if strings.HasPrefix(modelLower, "moonshot/") || strings.Contains(modelLower, "kimi") || strings.Contains(modelLower, "moonshot") { + providerName = "moonshot" + } else if strings.HasPrefix(modelLower, "grok/") || strings.Contains(modelLower, "grok") { + providerName = "grok" + } else if strings.HasPrefix(modelLower, "ollama/") || + strings.Contains(modelLower, "glm-") || + strings.Contains(modelLower, "qwen") || + strings.Contains(modelLower, "gemma") || + strings.Contains(modelLower, "llama") || + strings.Contains(modelLower, "mistral") || + strings.Contains(modelLower, "phi") || + strings.Contains(modelLower, "yi") || + strings.Contains(modelLower, "codellama") || + strings.Contains(modelLower, "command-r") { + providerName = "ollama" + } + + provider, ok := s.providers[providerName] + if !ok { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Provider %s not enabled or supported", providerName)}) + return + } + + // Strip common prefixes from model name + modelID := req.Model + prefixes := []string{"gemini/", "google/", "openai/", "deepseek/", "moonshot/", "grok/", "ollama/"} + for _, p := range prefixes { + if strings.HasPrefix(modelID, p) { + modelID = strings.TrimPrefix(modelID, p) + break + } + } + + // Use the stripped model name for the actual API call + req.Model = modelID + + clientID := "default" + if auth, ok := c.Get("auth"); ok { + if authInfo, ok := auth.(models.AuthInfo); ok { + clientID = authInfo.ClientID + } + } + + stream := req.Stream != nil && *req.Stream + + if stream { + ch, err := provider.ResponsesStream(c.Request.Context(), &req) + if err != nil { + s.logRequest(startTime, clientID, providerName, req.Model, nil, err, false) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + + var lastUsage *models.ResponsesUsage + c.Stream(func(w io.Writer) bool { + chunk, ok := <-ch + if !ok { + fmt.Fprintf(w, "data: [DONE]\n\n") + if lastUsage != nil { + s.logRequest(startTime, clientID, providerName, req.Model, lastUsage.ToUsage(), nil, false) + } else { + s.logRequest(startTime, clientID, providerName, req.Model, nil, nil, false) + } + return false + } + // Capture usage from the response payload in streaming chunks + if chunk.Response != nil && chunk.Response.Usage != nil { + lastUsage = chunk.Response.Usage + } + data, err := json.Marshal(chunk) + if err != nil { + return false + } + fmt.Fprintf(w, "data: %s\n\n", data) + return true + }) + return + } + + resp, err := provider.Responses(c.Request.Context(), &req) + if err != nil { + s.logRequest(startTime, clientID, providerName, req.Model, nil, err, false) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + if resp.Usage != nil { + s.logRequest(startTime, clientID, providerName, req.Model, resp.Usage.ToUsage(), nil, false) + } else { + s.logRequest(startTime, clientID, providerName, req.Model, nil, nil, false) + } + c.JSON(http.StatusOK, resp) } func (s *Server) handleListModels(c *gin.Context) {