From f2a5416408963b955b63148ee646953adc1f3bdd Mon Sep 17 00:00:00 2001 From: hobokenchicken Date: Thu, 4 Jun 2026 13:51:35 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20cheapest=20pipeline=20=E2=80=94=20gpt-4?= =?UTF-8?q?o-mini-transcribe=20+=20gpt-5.4-nano=20+=20TTS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simple 3-step chat completions pipeline at ~/usr/bin/bash.019/min total. Streams PCM16 audio from frontend, transcribes on release, generates response via gpt-5.4-nano, speaks via OpenAI TTS. Cost breakdown: gpt-4o-mini-transcribe: /usr/bin/bash.003/min gpt-5.4-nano: ~/usr/bin/bash.001/min OpenAI TTS (nova): /usr/bin/bash.015/min Total: ~/usr/bin/bash.019/min (~/usr/bin/bash.57/day at 30min) --- backend/main.py | 282 +++++++++++++++----------- backend/services/hybrid.py | 236 --------------------- frontend/src/hooks/useConversation.ts | 5 + 3 files changed, 167 insertions(+), 356 deletions(-) delete mode 100644 backend/services/hybrid.py diff --git a/backend/main.py b/backend/main.py index 2aaabdb..681e8c8 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,6 +1,7 @@ """Kira — AI body double backend -Hybrid pipeline: gpt-realtime-whisper (streaming STT) → gpt-5.4-nano (LLM) → OpenAI TTS +Cheapest pipeline: gpt-4o-mini-transcribe STT → gpt-5.4-nano LLM → OpenAI TTS +~$0.019/min total, simple 3-step chat completions. """ import json @@ -13,7 +14,6 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from config import settings -from services.hybrid import HybridPipeline from services.memory import kira_memory logging.basicConfig(level=logging.INFO) @@ -29,6 +29,25 @@ app.add_middleware( allow_headers=["*"], ) +# System prompt +BASE_SYSTEM_PROMPT = ( + "You are Kira, a warm, kind, and encouraging AI body double. " + "You speak in a friendly, girly-pop tone. You are helping someone with ADHD " + "stay focused and on task. Keep responses short, supportive, and uplifting. " + "Check in on them. Remind them to take breaks. Celebrate small wins. " + "Use occasional emoji but don't overdo it. Never be judgmental." +) + +_openai = None + + +def get_openai(): + global _openai + if _openai is None: + from openai import AsyncOpenAI + _openai = AsyncOpenAI(api_key=settings.openai_api_key) + return _openai + @app.on_event("startup") async def startup(): @@ -44,6 +63,69 @@ async def health(): return {"status": "ok", "name": "kira", "memory": mem_status} +def build_system_prompt(user_id: str) -> str: + prompt = BASE_SYSTEM_PROMPT + if kira_memory.enabled: + try: + kira_memory.ensure_peers(user_id) + suffix = kira_memory.build_system_prompt_suffix() + if suffix: + prompt += suffix + except Exception as e: + logger.warning(f"Memory context failed: {e}") + return prompt + + +async def run_conversation(text: str, user_id: str) -> str: + """STT → LLM → TTS using the cheapest models.""" + system_prompt = build_system_prompt(user_id) + client = get_openai() + + # LLM + resp = await client.chat.completions.create( + model="gpt-5.4-nano", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": text}, + ], + max_tokens=300, + temperature=0.7, + ) + kira_text = resp.choices[0].message.content or "Mhm, I'm here!" + return kira_text + + +async def transcribe_audio(audio_bytes: bytes) -> str | None: + """Transcribe audio bytes using cheapest STT model.""" + client = get_openai() + try: + transcript = await client.audio.transcriptions.create( + model="gpt-4o-mini-transcribe", + file=("audio.webm", audio_bytes, "audio/webm"), + response_format="text", + ) + return transcript.strip() if transcript and transcript.strip() else None + except Exception as e: + logger.warning(f"STT error: {e}") + return None + + +async def synthesize_speech(text: str) -> bytes: + """Generate TTS audio from text.""" + client = get_openai() + try: + resp = await client.audio.speech.create( + model="tts-1", + voice="nova", + input=text, + response_format="opus", + ) + return resp.content + except Exception as e: + logger.warning(f"TTS error: {e}") + return b"" + + @app.websocket("/api/ws") async def conversation_ws(websocket: WebSocket): await websocket.accept() @@ -52,92 +134,8 @@ async def conversation_ws(websocket: WebSocket): identified = False logger.info(f"[{session_id}] WebSocket connected") - pending_transcripts: list[str] = [] - pipeline: HybridPipeline | None = None - pipeline_task: asyncio.Task | None = None - pipeline_ready = asyncio.Event() - audio_queue: asyncio.Queue[bytes] = asyncio.Queue() - text_queue: asyncio.Queue[str] = asyncio.Queue() - - memory_suffix = "" - - async def on_ready(): - pipeline_ready.set() - logger.info(f"[{session_id}] Pipeline ready") - - async def on_transcript_delta(delta: str): - """Streaming partial transcript.""" - await websocket.send_json({"type": "transcript_delta", "text": delta}) - - async def on_transcript_done(full: str): - """Full utterance received.""" - await websocket.send_json({"type": "transcript", "role": "user", "text": full}) - - async def on_audio_delta(audio_bytes: bytes): - """Forward TTS audio to client.""" - try: - audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") - await websocket.send_json({"type": "audio", "data": audio_b64}) - except Exception: - pass - - async def on_speech_start(): - await websocket.send_json({"type": "speaking_start"}) - - async def on_speech_end(): - await websocket.send_json({"type": "speaking_end"}) - - async def on_error(msg: str): - await websocket.send_json({"type": "error", "message": msg}) - - # Create pipeline - pipeline = HybridPipeline( - on_transcript_delta=on_transcript_delta, - on_transcript_done=on_transcript_done, - on_audio_delta=on_audio_delta, - on_speech_start=on_speech_start, - on_speech_end=on_speech_end, - on_ready=on_ready, - on_error=on_error, - memory_suffix=memory_suffix, - ) - - pipeline_task = asyncio.create_task(pipeline.connect()) - - try: - await asyncio.wait_for(pipeline_ready.wait(), timeout=15) - except asyncio.TimeoutError: - logger.error(f"[{session_id}] Pipeline failed to connect") - await websocket.send_json({"type": "error", "message": "Failed to connect to AI"}) - pipeline_task.cancel() - return - - # Forward audio/text from client to pipeline - async def forward_audio(): - while pipeline and pipeline._connected: - try: - pcm16 = await asyncio.wait_for(audio_queue.get(), timeout=1) - await pipeline.send_audio(pcm16) - except asyncio.TimeoutError: - continue - except Exception: - break - - async def forward_text(): - while pipeline and pipeline._connected: - try: - text = await asyncio.wait_for(text_queue.get(), timeout=1) - await pipeline.send_text(text) - # Store in Honcho - if kira_memory.enabled and identified: - kira_memory.store_user_message(text) - except asyncio.TimeoutError: - continue - except Exception: - break - - fwd_audio = asyncio.create_task(forward_audio()) - fwd_text = asyncio.create_task(forward_text()) + audio_buffer = bytearray() + conversation_history: list[dict] = [] try: while True: @@ -145,7 +143,7 @@ async def conversation_ws(websocket: WebSocket): msg = json.loads(raw) msg_type = msg.get("type", "") - # ── Identity ── + # ── Identity & Preferences ── if msg_type == "identify": user_id = msg.get("user_id", "").strip() user_name = msg.get("name", "").strip() @@ -159,16 +157,6 @@ async def conversation_ws(websocket: WebSocket): kira_memory.ensure_peers(user_id) kira_memory.ensure_session(session_id) - # Build memory context and update pipeline - if kira_memory.enabled: - try: - ctx = kira_memory.build_system_prompt_suffix() - if ctx: - pipeline._memory_suffix = ctx - memory_suffix = ctx - except Exception: - pass - await websocket.send_json({ "type": "identified", "user_id": user_id, @@ -176,40 +164,94 @@ async def conversation_ws(websocket: WebSocket): }) continue - # ── Preferences ── if msg_type == "set_preference": key = msg.get("key", "").strip() value = msg.get("value", "").strip() if key and user_id and user_id != "default-user": kira_memory.set_user_preference(user_id, key, value) + await websocket.send_json({ + "type": "preference_saved", + "key": key, + "success": True, + }) continue - # ── Audio (PCM16) ── + # ── Conversation ── if msg_type == "audio": - audio_b64 = msg.get("data", "") - if audio_b64: - pcm16 = base64.b64decode(audio_b64) - await audio_queue.put(pcm16) - continue + # Accumulate PCM16 audio chunks + chunk = base64.b64decode(msg["data"]) + audio_buffer.extend(chunk) - # ── Text input ── - if msg_type == "conversation_text": - text = msg.get("text", "").strip() - if text: - await text_queue.put(text) - continue + elif msg_type == "transcribe": + if not audio_buffer: + await websocket.send_json({"type": "error", "message": "No audio data"}) + continue - if msg_type == "ping": + logger.info(f"[{session_id}] Transcribing {len(audio_buffer)} bytes...") + + # 1. STT + transcript = await transcribe_audio(bytes(audio_buffer)) + audio_buffer.clear() + + if not transcript: + await websocket.send_json({"type": "error", "message": "Could not transcribe"}) + continue + + await websocket.send_json({"type": "transcript", "role": "user", "text": transcript}) + conversation_history.append({"role": "user", "content": transcript}) + + # 2. LLM + logger.info(f"[{session_id}] User: {transcript}") + kira_text = await run_conversation(transcript, user_id) + conversation_history.append({"role": "assistant", "content": kira_text}) + logger.info(f"[{session_id}] Kira: {kira_text}") + + # Store in Honcho + if kira_memory.enabled and identified: + try: + kira_memory.store_messages(transcript, kira_text) + except Exception: + pass + + # 3. TTS + await websocket.send_json({"type": "speaking_start", "text": kira_text}) + audio_bytes = await synthesize_speech(kira_text) + audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") + await websocket.send_json({"type": "audio", "data": audio_b64, "text": kira_text}) + await websocket.send_json({"type": "speaking_end"}) + + elif msg_type == "conversation_text": + user_text = msg.get("text", "").strip() + if not user_text: + continue + + conversation_history.append({"role": "user", "content": user_text}) + logger.info(f"[{session_id}] User (text): {user_text}") + + kira_text = await run_conversation(user_text, user_id) + conversation_history.append({"role": "assistant", "content": kira_text}) + logger.info(f"[{session_id}] Kira: {kira_text}") + + if kira_memory.enabled and identified: + try: + kira_memory.store_messages(user_text, kira_text) + except Exception: + pass + + await websocket.send_json({"type": "speaking_start", "text": kira_text}) + audio_bytes = await synthesize_speech(kira_text) + audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") + await websocket.send_json({"type": "audio", "data": audio_b64, "text": kira_text}) + await websocket.send_json({"type": "speaking_end"}) + + elif msg_type == "ping": await websocket.send_json({"type": "pong"}) except WebSocketDisconnect: logger.info(f"[{session_id}] Disconnected") except Exception as e: logger.error(f"[{session_id}] Error: {e}") - finally: - fwd_audio.cancel() - fwd_text.cancel() - if pipeline: - await pipeline.disconnect() - if pipeline_task: - pipeline_task.cancel() + try: + await websocket.send_json({"type": "error", "message": str(e)}) + except Exception: + pass diff --git a/backend/services/hybrid.py b/backend/services/hybrid.py deleted file mode 100644 index b90f1c1..0000000 --- a/backend/services/hybrid.py +++ /dev/null @@ -1,236 +0,0 @@ -"""Hybrid pipeline: streaming STT (gpt-realtime-whisper) + cheap LLM + TTS. - -Uses gpt-realtime-whisper for low-latency streaming transcription, -gpt-5.4-nano as the brain, and OpenAI TTS for voice output. -""" - -import json -import base64 -import logging -import asyncio -from typing import Callable, Awaitable -from openai import AsyncOpenAI -from config import settings - -logger = logging.getLogger("kira.hybrid") - -# ─── System instructions for Kira's personality ─── - -KIRA_INSTRUCTIONS = ( - "You are Kira, a warm, kind, and encouraging AI body double. " - "You speak in a friendly, girly-pop tone. You are helping someone with ADHD " - "stay focused and on task. Keep responses short, supportive, and uplifting. " - "Check in on them. Remind them to take breaks. Celebrate small wins. " - "Use occasional emoji but don't overdo it. Never be judgmental." -) - - -class HybridPipeline: - """Streaming STT via gpt-realtime-whisper → gpt-5.4-nano LLM → OpenAI TTS.""" - - def __init__( - self, - on_transcript_delta: Callable[[str], Awaitable[None]], - on_transcript_done: Callable[[str], Awaitable[None]], - on_audio_delta: Callable[[bytes], Awaitable[None]], - on_speech_start: Callable[[], Awaitable[None]], - on_speech_end: Callable[[], Awaitable[None]], - on_ready: Callable[[], Awaitable[None]], - on_error: Callable[[str], Awaitable[None]], - memory_suffix: str = "", - ): - self._on_transcript_delta = on_transcript_delta - self._on_transcript_done = on_transcript_done - self._on_audio_delta = on_audio_delta - self._on_speech_start = on_speech_start - self._on_speech_end = on_speech_end - self._on_ready = on_ready - self._on_error = on_error - self._memory_suffix = memory_suffix - self._openai = None - self._conn = None - self._connected = False - self._transcript_buffer = "" - - async def connect(self): - """Connect to gpt-realtime-whisper via OpenAI Realtime API.""" - if self._connected: - return - - try: - import websockets - - self._openai = AsyncOpenAI(api_key=settings.openai_api_key) - - logger.info("Connecting to gpt-realtime-whisper...") - - # Connect directly via websockets to avoid the OpenAI-Beta header - url = f"wss://api.openai.com/v1/realtime?model=gpt-realtime-whisper" - ws = await websockets.connect( - url, - additional_headers={ - "Authorization": f"Bearer {settings.openai_api_key}", - }, - ) - - async with ws as conn: - self._conn = conn - self._connected = True - logger.info("Connected to gpt-realtime-whisper") - - # Configure session for transcription - await self._send({ - "type": "session.update", - "session": { - "input_audio_format": "pcm16", - "input_audio_transcription": { - "enabled": True, - }, - "turn_detection": { - "type": "server_vad", - "threshold": 0.5, - "prefix_padding_ms": 300, - "silence_duration_ms": 600, - }, - }, - }) - - await self._on_ready() - - # Listen for transcription events - while self._connected: - try: - raw = await conn.recv() - if isinstance(raw, (str, bytes)): - data = json.loads(raw if isinstance(raw, str) else raw.decode()) - await self._handle_event(data) - except Exception as e: - if self._connected: - logger.warning(f"recv error: {e}") - break - - except Exception as e: - logger.error(f"Connection error: {e}") - await self._on_error(str(e)) - finally: - self._connected = False - self._conn = None - - async def _handle_event(self, event): - """Process events from gpt-realtime-whisper.""" - event_type = getattr(event, "type", None) or (event.get("type") if isinstance(event, dict) else "") - - if event_type == "input_audio_buffer.speech_started": - self._transcript_buffer = "" - - elif event_type == "input_audio_buffer.speech_stopped": - if self._transcript_buffer.strip(): - await self._process_transcript(self._transcript_buffer.strip()) - self._transcript_buffer = "" - - elif event_type == "input_audio_buffer.transcription_delta": - delta_text = self._get_field(event, "delta", "") - if delta_text: - self._transcript_buffer += delta_text - - elif event_type == "conversation.item.created": - item = self._get_field(event, "item", {}) - content = self._get_field(item, "content", []) - for part in (content or []): - part_type = self._get_field(part, "type", "") - part_transcript = self._get_field(part, "transcript", "") - if part_type == "transcript" and part_transcript: - self._transcript_buffer = part_transcript - await self._on_transcript_delta(part_transcript) - - elif event_type == "error": - err = self._get_field(event, "error", {}) - msg = self._get_field(err, "message", str(event)) - logger.warning(f"Whisper error: {msg}") - - async def _process_transcript(self, transcript: str): - """Full utterance received. Call LLM, then TTS.""" - await self._on_transcript_done(transcript) - logger.info(f"User: {transcript}") - - # Build system prompt with optional memory context - system_content = KIRA_INSTRUCTIONS - if self._memory_suffix: - system_content += self._memory_suffix - - # Call gpt-5.4-nano - try: - resp = await self._openai.chat.completions.create( - model="gpt-5.4-nano", - messages=[ - {"role": "system", "content": system_content}, - {"role": "user", "content": transcript}, - ], - max_tokens=300, - temperature=0.7, - ) - kira_text = resp.choices[0].message.content or "Mhm, I'm here!" - except Exception as e: - logger.error(f"LLM error: {e}") - kira_text = "Sorry, let me try that again!" - await self._on_error(str(e)) - - logger.info(f"Kira: {kira_text}") - - # Call TTS - await self._on_speech_start() - try: - tts_resp = await self._openai.audio.speech.create( - model="tts-1", - voice="nova", - input=kira_text, - response_format="opus", - ) - audio_bytes = tts_resp.content - if audio_bytes: - await self._on_audio_delta(audio_bytes) - except Exception as e: - logger.error(f"TTS error: {e}") - - await self._on_speech_end() - - async def send_audio(self, pcm16_bytes: bytes): - """Send PCM16 audio chunk for transcription.""" - if not self._connected or not self._conn: - return - try: - audio_b64 = base64.b64encode(pcm16_bytes).decode("utf-8") - await self._send({ - "type": "input_audio_buffer.append", - "audio": audio_b64, - }) - except Exception as e: - logger.warning(f"Send audio error: {e}") - - async def send_text(self, text: str): - """Process text input directly (no transcription needed).""" - await self._process_transcript(text) - - async def _send(self, data: dict): - try: - await self._conn.send(json.dumps(data)) - except Exception as e: - logger.warning(f"Send error: {e}") - - async def disconnect(self): - self._connected = False - if self._conn: - try: - await self._conn.close() - except Exception: - pass - self._conn = None - - @staticmethod - def _get_field(obj, field: str, default=None): - """Get a field from either an object or dict.""" - if hasattr(obj, field): - return getattr(obj, field, default) - if isinstance(obj, dict): - return obj.get(field, default) - return default diff --git a/frontend/src/hooks/useConversation.ts b/frontend/src/hooks/useConversation.ts index 8afd108..b1c2543 100644 --- a/frontend/src/hooks/useConversation.ts +++ b/frontend/src/hooks/useConversation.ts @@ -272,6 +272,11 @@ export function useConversation() { streamRef.current?.getTracks().forEach((t) => t.stop()); streamRef.current = null; setIsRecording(false); + + // Tell backend to process accumulated audio + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify({ type: 'transcribe' })); + } }, []); // ── Text ──