diff --git a/backend/main.py b/backend/main.py index 6c0d9c3..2aaabdb 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,8 +1,6 @@ """Kira — AI body double backend -OpenAI Realtime API pipeline: - mic audio → [built-in STT → GPT-4o-mini → built-in TTS] → speaker audio - Single WebSocket, ~300-800ms latency +Hybrid pipeline: gpt-realtime-whisper (streaming STT) → gpt-5.4-nano (LLM) → OpenAI TTS """ import json @@ -15,7 +13,7 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from config import settings -from services.realtime import RealtimeRelay +from services.hybrid import HybridPipeline from services.memory import kira_memory logging.basicConfig(level=logging.INFO) @@ -54,102 +52,92 @@ async def conversation_ws(websocket: WebSocket): identified = False logger.info(f"[{session_id}] WebSocket connected") - # Track conversation for Honcho - pending_transcripts: list[tuple[str, str]] = [] - - # Will be set when Realtime relay is ready - relay_ready = asyncio.Event() - relay: RealtimeRelay | None = None - relay_task: asyncio.Task | None = None + pending_transcripts: list[str] = [] + pipeline: HybridPipeline | None = None + pipeline_task: asyncio.Task | None = None + pipeline_ready = asyncio.Event() audio_queue: asyncio.Queue[bytes] = asyncio.Queue() text_queue: asyncio.Queue[str] = asyncio.Queue() + memory_suffix = "" + async def on_ready(): - relay_ready.set() - logger.info(f"[{session_id}] Realtime relay ready") + pipeline_ready.set() + logger.info(f"[{session_id}] Pipeline ready") + + async def on_transcript_delta(delta: str): + """Streaming partial transcript.""" + await websocket.send_json({"type": "transcript_delta", "text": delta}) + + async def on_transcript_done(full: str): + """Full utterance received.""" + await websocket.send_json({"type": "transcript", "role": "user", "text": full}) async def on_audio_delta(audio_bytes: bytes): - """Forward audio chunks from OpenAI to the client.""" + """Forward TTS audio to client.""" try: audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") - await websocket.send_json({ - "type": "audio", - "data": audio_b64, - }) + await websocket.send_json({"type": "audio", "data": audio_b64}) except Exception: pass - async def on_transcript(text: str): - """Store transcripts for Honcho.""" - pending_transcripts.append(("transcript", text)) - role, content = text.split(": ", 1) - logger.info(f"[{session_id}] {role}: {content}") - await websocket.send_json({ - "type": "transcript", - "role": role, - "text": content, - }) - - async def on_speech_started(): - """Kira started speaking.""" + async def on_speech_start(): await websocket.send_json({"type": "speaking_start"}) - async def on_speech_stopped(): - """Kira finished speaking.""" + async def on_speech_end(): await websocket.send_json({"type": "speaking_end"}) - async def on_interruption(): - """User interrupted — Kira stops speaking.""" - await websocket.send_json({"type": "interruption"}) - async def on_error(msg: str): await websocket.send_json({"type": "error", "message": msg}) - # ── Create and start the Realtime relay ── - relay = RealtimeRelay( + # Create pipeline + pipeline = HybridPipeline( + on_transcript_delta=on_transcript_delta, + on_transcript_done=on_transcript_done, on_audio_delta=on_audio_delta, - on_transcript=on_transcript, - on_speech_started=on_speech_started, - on_speech_stopped=on_speech_stopped, - on_interruption=on_interruption, - on_error=on_error, + on_speech_start=on_speech_start, + on_speech_end=on_speech_end, on_ready=on_ready, + on_error=on_error, + memory_suffix=memory_suffix, ) - relay_task = asyncio.create_task(relay.connect()) + pipeline_task = asyncio.create_task(pipeline.connect()) - # Wait for relay to be ready try: - await asyncio.wait_for(relay_ready.wait(), timeout=15) + await asyncio.wait_for(pipeline_ready.wait(), timeout=15) except asyncio.TimeoutError: - logger.error(f"[{session_id}] Realtime relay failed to connect") + logger.error(f"[{session_id}] Pipeline failed to connect") await websocket.send_json({"type": "error", "message": "Failed to connect to AI"}) - relay_task.cancel() + pipeline_task.cancel() return - # ── Forward audio/text from client to relay ── + # Forward audio/text from client to pipeline async def forward_audio(): - while relay and relay._connected: + while pipeline and pipeline._connected: try: pcm16 = await asyncio.wait_for(audio_queue.get(), timeout=1) - await relay.send_audio(pcm16) + await pipeline.send_audio(pcm16) except asyncio.TimeoutError: continue except Exception: break async def forward_text(): - while relay and relay._connected: + while pipeline and pipeline._connected: try: text = await asyncio.wait_for(text_queue.get(), timeout=1) - await relay.send_text(text) + await pipeline.send_text(text) + # Store in Honcho + if kira_memory.enabled and identified: + kira_memory.store_user_message(text) except asyncio.TimeoutError: continue except Exception: break - fwd_audio_task = asyncio.create_task(forward_audio()) - fwd_text_task = asyncio.create_task(forward_text()) + fwd_audio = asyncio.create_task(forward_audio()) + fwd_text = asyncio.create_task(forward_text()) try: while True: @@ -171,30 +159,16 @@ async def conversation_ws(websocket: WebSocket): kira_memory.ensure_peers(user_id) kira_memory.ensure_session(session_id) - # Inject Honcho context into the Realtime session instructions - memory_suffix = "" + # Build memory context and update pipeline if kira_memory.enabled: try: ctx = kira_memory.build_system_prompt_suffix() if ctx: + pipeline._memory_suffix = ctx memory_suffix = ctx except Exception: pass - if relay and relay._connected and memory_suffix: - await relay._send({ - "type": "session.update", - "session": { - "instructions": ( - "You are Kira, a warm, kind, and encouraging AI body double. " - "Speak in a friendly, girly-pop tone. Help someone with ADHD " - "stay focused. Keep responses short and supportive. " - "Check in, remind breaks, celebrate wins. Never judgmental." - + memory_suffix - ), - }, - }) - await websocket.send_json({ "type": "identified", "user_id": user_id, @@ -210,7 +184,7 @@ async def conversation_ws(websocket: WebSocket): kira_memory.set_user_preference(user_id, key, value) continue - # ── Audio from frontend (PCM16) ── + # ── Audio (PCM16) ── if msg_type == "audio": audio_b64 = msg.get("data", "") if audio_b64: @@ -223,9 +197,6 @@ async def conversation_ws(websocket: WebSocket): text = msg.get("text", "").strip() if text: await text_queue.put(text) - # Also store in Honcho immediately - if kira_memory.enabled and identified: - kira_memory.store_user_message(text) continue if msg_type == "ping": @@ -236,19 +207,9 @@ async def conversation_ws(websocket: WebSocket): except Exception as e: logger.error(f"[{session_id}] Error: {e}") finally: - # Store pending transcripts in Honcho - if kira_memory.enabled and identified: - for _, transcript_text in pending_transcripts: - if transcript_text.startswith("user: "): - content = transcript_text[6:] - kira_memory.store_user_message(content) - elif transcript_text.startswith("assistant: "): - content = transcript_text[11:] - kira_memory.store_kira_message(content) - - fwd_audio_task.cancel() - fwd_text_task.cancel() - if relay: - await relay.disconnect() - if relay_task: - relay_task.cancel() + fwd_audio.cancel() + fwd_text.cancel() + if pipeline: + await pipeline.disconnect() + if pipeline_task: + pipeline_task.cancel() diff --git a/backend/services/hybrid.py b/backend/services/hybrid.py new file mode 100644 index 0000000..2b4ad4a --- /dev/null +++ b/backend/services/hybrid.py @@ -0,0 +1,224 @@ +"""Hybrid pipeline: streaming STT (gpt-realtime-whisper) + cheap LLM + TTS. + +Uses gpt-realtime-whisper for low-latency streaming transcription, +gpt-5.4-nano as the brain, and OpenAI TTS for voice output. +""" + +import json +import base64 +import logging +import asyncio +from typing import Callable, Awaitable +from openai import AsyncOpenAI +from config import settings + +logger = logging.getLogger("kira.hybrid") + +# ─── System instructions for Kira's personality ─── + +KIRA_INSTRUCTIONS = ( + "You are Kira, a warm, kind, and encouraging AI body double. " + "You speak in a friendly, girly-pop tone. You are helping someone with ADHD " + "stay focused and on task. Keep responses short, supportive, and uplifting. " + "Check in on them. Remind them to take breaks. Celebrate small wins. " + "Use occasional emoji but don't overdo it. Never be judgmental." +) + + +class HybridPipeline: + """Streaming STT via gpt-realtime-whisper → gpt-5.4-nano LLM → OpenAI TTS.""" + + def __init__( + self, + on_transcript_delta: Callable[[str], Awaitable[None]], + on_transcript_done: Callable[[str], Awaitable[None]], + on_audio_delta: Callable[[bytes], Awaitable[None]], + on_speech_start: Callable[[], Awaitable[None]], + on_speech_end: Callable[[], Awaitable[None]], + on_ready: Callable[[], Awaitable[None]], + on_error: Callable[[str], Awaitable[None]], + memory_suffix: str = "", + ): + self._on_transcript_delta = on_transcript_delta + self._on_transcript_done = on_transcript_done + self._on_audio_delta = on_audio_delta + self._on_speech_start = on_speech_start + self._on_speech_end = on_speech_end + self._on_ready = on_ready + self._on_error = on_error + self._memory_suffix = memory_suffix + self._openai = None + self._conn = None + self._connected = False + self._transcript_buffer = "" + + async def connect(self): + """Connect to gpt-realtime-whisper via OpenAI Realtime API.""" + if self._connected: + return + + try: + self._openai = AsyncOpenAI(api_key=settings.openai_api_key) + + logger.info("Connecting to gpt-realtime-whisper...") + async with self._openai.beta.realtime.connect( + model="gpt-realtime-whisper", + ) as conn: + self._conn = conn + self._connected = True + logger.info("Connected to gpt-realtime-whisper") + + # Configure session for transcription + await self._send({ + "type": "session.update", + "session": { + "input_audio_format": "pcm16", + "input_audio_transcription": { + "enabled": True, + }, + "turn_detection": { + "type": "server_vad", + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": 600, + }, + }, + }) + + await self._on_ready() + + # Listen for transcription events + while self._connected: + try: + event = await conn.recv() + await self._handle_event(event) + except Exception as e: + if self._connected: + logger.warning(f"recv error: {e}") + break + + except Exception as e: + logger.error(f"Connection error: {e}") + await self._on_error(str(e)) + finally: + self._connected = False + self._conn = None + + async def _handle_event(self, event): + """Process events from gpt-realtime-whisper.""" + event_type = getattr(event, "type", None) or (event.get("type") if isinstance(event, dict) else "") + + if event_type == "input_audio_buffer.speech_started": + self._transcript_buffer = "" + + elif event_type == "input_audio_buffer.speech_stopped": + if self._transcript_buffer.strip(): + await self._process_transcript(self._transcript_buffer.strip()) + self._transcript_buffer = "" + + elif event_type == "input_audio_buffer.transcription_delta": + delta_text = self._get_field(event, "delta", "") + if delta_text: + self._transcript_buffer += delta_text + + elif event_type == "conversation.item.created": + item = self._get_field(event, "item", {}) + content = self._get_field(item, "content", []) + for part in (content or []): + part_type = self._get_field(part, "type", "") + part_transcript = self._get_field(part, "transcript", "") + if part_type == "transcript" and part_transcript: + self._transcript_buffer = part_transcript + await self._on_transcript_delta(part_transcript) + + elif event_type == "error": + err = self._get_field(event, "error", {}) + msg = self._get_field(err, "message", str(event)) + logger.warning(f"Whisper error: {msg}") + + async def _process_transcript(self, transcript: str): + """Full utterance received. Call LLM, then TTS.""" + await self._on_transcript_done(transcript) + logger.info(f"User: {transcript}") + + # Build system prompt with optional memory context + system_content = KIRA_INSTRUCTIONS + if self._memory_suffix: + system_content += self._memory_suffix + + # Call gpt-5.4-nano + try: + resp = await self._openai.chat.completions.create( + model="gpt-5.4-nano", + messages=[ + {"role": "system", "content": system_content}, + {"role": "user", "content": transcript}, + ], + max_tokens=300, + temperature=0.7, + ) + kira_text = resp.choices[0].message.content or "Mhm, I'm here!" + except Exception as e: + logger.error(f"LLM error: {e}") + kira_text = "Sorry, let me try that again!" + await self._on_error(str(e)) + + logger.info(f"Kira: {kira_text}") + + # Call TTS + await self._on_speech_start() + try: + tts_resp = await self._openai.audio.speech.create( + model="tts-1", + voice="nova", + input=kira_text, + response_format="opus", + ) + audio_bytes = tts_resp.content + if audio_bytes: + await self._on_audio_delta(audio_bytes) + except Exception as e: + logger.error(f"TTS error: {e}") + + await self._on_speech_end() + + async def send_audio(self, pcm16_bytes: bytes): + """Send PCM16 audio chunk for transcription.""" + if not self._connected or not self._conn: + return + try: + audio_b64 = base64.b64encode(pcm16_bytes).decode("utf-8") + await self._send({ + "type": "input_audio_buffer.append", + "audio": audio_b64, + }) + except Exception as e: + logger.warning(f"Send audio error: {e}") + + async def send_text(self, text: str): + """Process text input directly (no transcription needed).""" + await self._process_transcript(text) + + async def _send(self, data: dict): + try: + await self._conn.send(data) + except Exception as e: + logger.warning(f"Send error: {e}") + + async def disconnect(self): + self._connected = False + if self._conn: + try: + await self._conn.close() + except Exception: + pass + self._conn = None + + @staticmethod + def _get_field(obj, field: str, default=None): + """Get a field from either an object or dict.""" + if hasattr(obj, field): + return getattr(obj, field, default) + if isinstance(obj, dict): + return obj.get(field, default) + return default diff --git a/backend/services/realtime.py b/backend/services/realtime.py deleted file mode 100644 index 018862f..0000000 --- a/backend/services/realtime.py +++ /dev/null @@ -1,192 +0,0 @@ -"""OpenAI Realtime API relay service. - -Manages a WebSocket connection to OpenAI's Realtime API and relays -audio/text events between the client and OpenAI. -""" - -import json -import logging -from typing import Callable, Awaitable -from config import settings - -logger = logging.getLogger("kira.realtime") - -# ─── System instructions for Kira's personality ─── - -KIRA_INSTRUCTIONS = ( - "You are Kira, a warm, kind, and encouraging AI body double. " - "You speak in a friendly, girly-pop tone. You are helping someone with ADHD " - "stay focused and on task. Keep responses short, supportive, and uplifting. " - "Check in on them. Remind them to take breaks. Celebrate small wins. " - "Use occasional emoji but don't overdo it. Never be judgmental. " - "You remember things about them between conversations." -) - - -class RealtimeRelay: - """Relays audio/text between a client WS and OpenAI Realtime API.""" - - def __init__( - self, - on_audio_delta: Callable[[bytes], Awaitable[None]], - on_transcript: Callable[[str], Awaitable[None]], - on_speech_started: Callable[[], Awaitable[None]], - on_speech_stopped: Callable[[], Awaitable[None]], - on_interruption: Callable[[], Awaitable[None]], - on_error: Callable[[str], Awaitable[None]], - on_ready: Callable[[], Awaitable[None]], - ): - self._on_audio_delta = on_audio_delta - self._on_transcript = on_transcript - self._on_speech_started = on_speech_started - self._on_speech_stopped = on_speech_stopped - self._on_interruption = on_interruption - self._on_error = on_error - self._on_ready = on_ready - self._conn = None - self._connected = False - - async def connect(self): - """Open a WebSocket to OpenAI Realtime API.""" - if self._connected: - return - - try: - from openai import AsyncOpenAI - - client = AsyncOpenAI(api_key=settings.openai_api_key) - - logger.info("Connecting to OpenAI Realtime API...") - async with client.beta.realtime.connect( - model="gpt-realtime-2", - extra_headers={"OpenAI-Beta": ""}, - ) as conn: - self._conn = conn - self._connected = True - logger.info("Connected to OpenAI Realtime API") - - # Configure session - await self._send({ - "type": "session.update", - "session": { - "instructions": KIRA_INSTRUCTIONS, - "voice": "alloy", - "input_audio_transcription": {"enabled": True}, - "turn_detection": { - "type": "server_vad", - "threshold": 0.5, - "prefix_padding_ms": 300, - "silence_duration_ms": 600, - }, - }, - }) - - await self._on_ready() - - # Listen for events - while self._connected: - try: - event = await conn.recv() - await self._handle_event(event) - except Exception as e: - if self._connected: - logger.warning(f"Realtime recv error: {e}") - break - - except ImportError: - logger.error("openai[realtime] not installed — run: pip install 'openai[realtime]'") - await self._on_error("Missing openai[realtime] dependency") - except Exception as e: - logger.error(f"Realtime connection error: {e}") - await self._on_error(str(e)) - finally: - self._connected = False - self._conn = None - - async def _handle_event(self, event): - """Process an event from the OpenAI Realtime API.""" - event_type = getattr(event, "type", None) or event.get("type", "") - - if event_type == "response.audio.delta": - audio_b64 = getattr(event, "delta", None) or event.get("delta", "") - if audio_b64: - import base64 - audio_bytes = base64.b64decode(audio_b64) - await self._on_audio_delta(audio_bytes) - - elif event_type == "response.audio_buffer.speech_started": - await self._on_speech_started() - - elif event_type == "response.audio_buffer.speech_stopped": - await self._on_speech_stopped() - - elif event_type == "input_audio_buffer.speech_started": - # User started speaking — interrupt Kira - await self._on_interruption() - - elif event_type == "conversation.item.created": - item = getattr(event, "item", None) or event.get("item", {}) - role = getattr(item, "role", None) or item.get("role", "") - content = getattr(item, "content", None) or item.get("content", []) - for part in (content or []): - part_type = getattr(part, "type", None) or part.get("type", "") - part_text = getattr(part, "text", None) or part.get("text", "") - if part_type == "text" and part_text and role == "assistant": - await self._on_transcript(f"assistant: {part_text}") - part_transcript = getattr(part, "transcript", None) or part.get("transcript", "") - if part_type == "transcript" and part_transcript and role == "user": - await self._on_transcript(f"user: {part_transcript}") - - elif event_type == "error": - err = getattr(event, "error", None) or event.get("error", {}) - msg = getattr(err, "message", None) or err.get("message", str(event)) - logger.warning(f"Realtime API error: {msg}") - await self._on_error(msg) - - async def send_audio(self, pcm16_bytes: bytes): - """Send PCM16 audio chunk to OpenAI.""" - if not self._connected or not self._conn: - return - try: - import base64 - audio_b64 = base64.b64encode(pcm16_bytes).decode("utf-8") - await self._send({ - "type": "input_audio_buffer.append", - "audio": audio_b64, - }) - except Exception as e: - logger.warning(f"Failed to send audio: {e}") - - async def send_text(self, text: str): - """Send a text message to OpenAI and trigger a response.""" - if not self._connected or not self._conn: - return - try: - await self._send({ - "type": "conversation.item.create", - "item": { - "type": "message", - "role": "user", - "content": [{"type": "input_text", "text": text}], - }, - }) - await self._send({"type": "response.create"}) - except Exception as e: - logger.warning(f"Failed to send text: {e}") - - async def _send(self, data: dict): - """Send a JSON event to the Realtime API.""" - try: - await self._conn.send(data) - except Exception as e: - logger.warning(f"Realtime send error: {e}") - - async def disconnect(self): - """Close the Realtime connection.""" - self._connected = False - if self._conn: - try: - await self._conn.close() - except Exception: - pass - self._conn = None diff --git a/frontend/src/hooks/useConversation.ts b/frontend/src/hooks/useConversation.ts index 35a85d1..8afd108 100644 --- a/frontend/src/hooks/useConversation.ts +++ b/frontend/src/hooks/useConversation.ts @@ -142,6 +142,10 @@ export function useConversation() { addMessage(msg.role === 'user' ? 'user' : 'kira', msg.text); break; + case 'transcript_delta': + // Streaming partial transcript — could show as typing indicator + break; + case 'speaking_start': setIsKiraSpeaking(true); break;