"""OpenAI Realtime API relay service. Manages a WebSocket connection to OpenAI's Realtime API and relays audio/text events between the client and OpenAI. """ import json import logging from typing import Callable, Awaitable from config import settings logger = logging.getLogger("kira.realtime") # ─── System instructions for Kira's personality ─── KIRA_INSTRUCTIONS = ( "You are Kira, a warm, kind, and encouraging AI body double. " "You speak in a friendly, girly-pop tone. You are helping someone with ADHD " "stay focused and on task. Keep responses short, supportive, and uplifting. " "Check in on them. Remind them to take breaks. Celebrate small wins. " "Use occasional emoji but don't overdo it. Never be judgmental. " "You remember things about them between conversations." ) class RealtimeRelay: """Relays audio/text between a client WS and OpenAI Realtime API.""" def __init__( self, on_audio_delta: Callable[[bytes], Awaitable[None]], on_transcript: Callable[[str], Awaitable[None]], on_speech_started: Callable[[], Awaitable[None]], on_speech_stopped: Callable[[], Awaitable[None]], on_interruption: Callable[[], Awaitable[None]], on_error: Callable[[str], Awaitable[None]], on_ready: Callable[[], Awaitable[None]], ): self._on_audio_delta = on_audio_delta self._on_transcript = on_transcript self._on_speech_started = on_speech_started self._on_speech_stopped = on_speech_stopped self._on_interruption = on_interruption self._on_error = on_error self._on_ready = on_ready self._conn = None self._connected = False async def connect(self): """Open a WebSocket to OpenAI Realtime API.""" if self._connected: return try: from openai import AsyncOpenAI client = AsyncOpenAI(api_key=settings.openai_api_key) logger.info("Connecting to OpenAI Realtime API...") async with client.beta.realtime.connect( model="gpt-realtime-2", extra_headers={"OpenAI-Beta": ""}, ) as conn: self._conn = conn self._connected = True logger.info("Connected to OpenAI Realtime API") # Configure session await self._send({ "type": "session.update", "session": { "instructions": KIRA_INSTRUCTIONS, "voice": "alloy", "input_audio_transcription": {"enabled": True}, "turn_detection": { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 600, }, }, }) await self._on_ready() # Listen for events while self._connected: try: event = await conn.recv() await self._handle_event(event) except Exception as e: if self._connected: logger.warning(f"Realtime recv error: {e}") break except ImportError: logger.error("openai[realtime] not installed — run: pip install 'openai[realtime]'") await self._on_error("Missing openai[realtime] dependency") except Exception as e: logger.error(f"Realtime connection error: {e}") await self._on_error(str(e)) finally: self._connected = False self._conn = None async def _handle_event(self, event): """Process an event from the OpenAI Realtime API.""" event_type = getattr(event, "type", None) or event.get("type", "") if event_type == "response.audio.delta": audio_b64 = getattr(event, "delta", None) or event.get("delta", "") if audio_b64: import base64 audio_bytes = base64.b64decode(audio_b64) await self._on_audio_delta(audio_bytes) elif event_type == "response.audio_buffer.speech_started": await self._on_speech_started() elif event_type == "response.audio_buffer.speech_stopped": await self._on_speech_stopped() elif event_type == "input_audio_buffer.speech_started": # User started speaking — interrupt Kira await self._on_interruption() elif event_type == "conversation.item.created": item = getattr(event, "item", None) or event.get("item", {}) role = getattr(item, "role", None) or item.get("role", "") content = getattr(item, "content", None) or item.get("content", []) for part in (content or []): part_type = getattr(part, "type", None) or part.get("type", "") part_text = getattr(part, "text", None) or part.get("text", "") if part_type == "text" and part_text and role == "assistant": await self._on_transcript(f"assistant: {part_text}") part_transcript = getattr(part, "transcript", None) or part.get("transcript", "") if part_type == "transcript" and part_transcript and role == "user": await self._on_transcript(f"user: {part_transcript}") elif event_type == "error": err = getattr(event, "error", None) or event.get("error", {}) msg = getattr(err, "message", None) or err.get("message", str(event)) logger.warning(f"Realtime API error: {msg}") await self._on_error(msg) async def send_audio(self, pcm16_bytes: bytes): """Send PCM16 audio chunk to OpenAI.""" if not self._connected or not self._conn: return try: import base64 audio_b64 = base64.b64encode(pcm16_bytes).decode("utf-8") await self._send({ "type": "input_audio_buffer.append", "audio": audio_b64, }) except Exception as e: logger.warning(f"Failed to send audio: {e}") async def send_text(self, text: str): """Send a text message to OpenAI and trigger a response.""" if not self._connected or not self._conn: return try: await self._send({ "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": text}], }, }) await self._send({"type": "response.create"}) except Exception as e: logger.warning(f"Failed to send text: {e}") async def _send(self, data: dict): """Send a JSON event to the Realtime API.""" try: await self._conn.send(data) except Exception as e: logger.warning(f"Realtime send error: {e}") async def disconnect(self): """Close the Realtime connection.""" self._connected = False if self._conn: try: await self._conn.close() except Exception: pass self._conn = None