feat: hybrid pipeline — gpt-realtime-whisper + gpt-5.4-nano + TTS

Hybrid approach gives streaming STT at ~/usr/bin/bash.017/min + cheap brain
at ~/usr/bin/bash.001/min + TTS at ~/usr/bin/bash.015/min = ~/usr/bin/bash.033/min total.

- gpt-realtime-whisper handles streaming transcription with VAD
- gpt-5.4-nano handles response generation (chat completions)
- OpenAI TTS (nova) for voice output
- Server VAD detects utterance boundaries
- Honcho memory context injected into system prompt
- Removed old full Realtime relay service
This commit is contained in:
2026-06-04 13:48:06 -04:00
parent 1c15d42e06
commit 274d04ea10
4 changed files with 281 additions and 284 deletions
+53 -92
View File
@@ -1,8 +1,6 @@
"""Kira — AI body double backend
OpenAI Realtime API pipeline:
mic audio → [built-in STT → GPT-4o-mini → built-in TTS] → speaker audio
Single WebSocket, ~300-800ms latency
Hybrid pipeline: gpt-realtime-whisper (streaming STT) → gpt-5.4-nano (LLM) → OpenAI TTS
"""
import json
@@ -15,7 +13,7 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from services.realtime import RealtimeRelay
from services.hybrid import HybridPipeline
from services.memory import kira_memory
logging.basicConfig(level=logging.INFO)
@@ -54,102 +52,92 @@ async def conversation_ws(websocket: WebSocket):
identified = False
logger.info(f"[{session_id}] WebSocket connected")
# Track conversation for Honcho
pending_transcripts: list[tuple[str, str]] = []
# Will be set when Realtime relay is ready
relay_ready = asyncio.Event()
relay: RealtimeRelay | None = None
relay_task: asyncio.Task | None = None
pending_transcripts: list[str] = []
pipeline: HybridPipeline | None = None
pipeline_task: asyncio.Task | None = None
pipeline_ready = asyncio.Event()
audio_queue: asyncio.Queue[bytes] = asyncio.Queue()
text_queue: asyncio.Queue[str] = asyncio.Queue()
memory_suffix = ""
async def on_ready():
relay_ready.set()
logger.info(f"[{session_id}] Realtime relay ready")
pipeline_ready.set()
logger.info(f"[{session_id}] Pipeline ready")
async def on_transcript_delta(delta: str):
"""Streaming partial transcript."""
await websocket.send_json({"type": "transcript_delta", "text": delta})
async def on_transcript_done(full: str):
"""Full utterance received."""
await websocket.send_json({"type": "transcript", "role": "user", "text": full})
async def on_audio_delta(audio_bytes: bytes):
"""Forward audio chunks from OpenAI to the client."""
"""Forward TTS audio to client."""
try:
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
await websocket.send_json({
"type": "audio",
"data": audio_b64,
})
await websocket.send_json({"type": "audio", "data": audio_b64})
except Exception:
pass
async def on_transcript(text: str):
"""Store transcripts for Honcho."""
pending_transcripts.append(("transcript", text))
role, content = text.split(": ", 1)
logger.info(f"[{session_id}] {role}: {content}")
await websocket.send_json({
"type": "transcript",
"role": role,
"text": content,
})
async def on_speech_started():
"""Kira started speaking."""
async def on_speech_start():
await websocket.send_json({"type": "speaking_start"})
async def on_speech_stopped():
"""Kira finished speaking."""
async def on_speech_end():
await websocket.send_json({"type": "speaking_end"})
async def on_interruption():
"""User interrupted — Kira stops speaking."""
await websocket.send_json({"type": "interruption"})
async def on_error(msg: str):
await websocket.send_json({"type": "error", "message": msg})
# ── Create and start the Realtime relay ──
relay = RealtimeRelay(
# Create pipeline
pipeline = HybridPipeline(
on_transcript_delta=on_transcript_delta,
on_transcript_done=on_transcript_done,
on_audio_delta=on_audio_delta,
on_transcript=on_transcript,
on_speech_started=on_speech_started,
on_speech_stopped=on_speech_stopped,
on_interruption=on_interruption,
on_error=on_error,
on_speech_start=on_speech_start,
on_speech_end=on_speech_end,
on_ready=on_ready,
on_error=on_error,
memory_suffix=memory_suffix,
)
relay_task = asyncio.create_task(relay.connect())
pipeline_task = asyncio.create_task(pipeline.connect())
# Wait for relay to be ready
try:
await asyncio.wait_for(relay_ready.wait(), timeout=15)
await asyncio.wait_for(pipeline_ready.wait(), timeout=15)
except asyncio.TimeoutError:
logger.error(f"[{session_id}] Realtime relay failed to connect")
logger.error(f"[{session_id}] Pipeline failed to connect")
await websocket.send_json({"type": "error", "message": "Failed to connect to AI"})
relay_task.cancel()
pipeline_task.cancel()
return
# ── Forward audio/text from client to relay ──
# Forward audio/text from client to pipeline
async def forward_audio():
while relay and relay._connected:
while pipeline and pipeline._connected:
try:
pcm16 = await asyncio.wait_for(audio_queue.get(), timeout=1)
await relay.send_audio(pcm16)
await pipeline.send_audio(pcm16)
except asyncio.TimeoutError:
continue
except Exception:
break
async def forward_text():
while relay and relay._connected:
while pipeline and pipeline._connected:
try:
text = await asyncio.wait_for(text_queue.get(), timeout=1)
await relay.send_text(text)
await pipeline.send_text(text)
# Store in Honcho
if kira_memory.enabled and identified:
kira_memory.store_user_message(text)
except asyncio.TimeoutError:
continue
except Exception:
break
fwd_audio_task = asyncio.create_task(forward_audio())
fwd_text_task = asyncio.create_task(forward_text())
fwd_audio = asyncio.create_task(forward_audio())
fwd_text = asyncio.create_task(forward_text())
try:
while True:
@@ -171,30 +159,16 @@ async def conversation_ws(websocket: WebSocket):
kira_memory.ensure_peers(user_id)
kira_memory.ensure_session(session_id)
# Inject Honcho context into the Realtime session instructions
memory_suffix = ""
# Build memory context and update pipeline
if kira_memory.enabled:
try:
ctx = kira_memory.build_system_prompt_suffix()
if ctx:
pipeline._memory_suffix = ctx
memory_suffix = ctx
except Exception:
pass
if relay and relay._connected and memory_suffix:
await relay._send({
"type": "session.update",
"session": {
"instructions": (
"You are Kira, a warm, kind, and encouraging AI body double. "
"Speak in a friendly, girly-pop tone. Help someone with ADHD "
"stay focused. Keep responses short and supportive. "
"Check in, remind breaks, celebrate wins. Never judgmental."
+ memory_suffix
),
},
})
await websocket.send_json({
"type": "identified",
"user_id": user_id,
@@ -210,7 +184,7 @@ async def conversation_ws(websocket: WebSocket):
kira_memory.set_user_preference(user_id, key, value)
continue
# ── Audio from frontend (PCM16) ──
# ── Audio (PCM16) ──
if msg_type == "audio":
audio_b64 = msg.get("data", "")
if audio_b64:
@@ -223,9 +197,6 @@ async def conversation_ws(websocket: WebSocket):
text = msg.get("text", "").strip()
if text:
await text_queue.put(text)
# Also store in Honcho immediately
if kira_memory.enabled and identified:
kira_memory.store_user_message(text)
continue
if msg_type == "ping":
@@ -236,19 +207,9 @@ async def conversation_ws(websocket: WebSocket):
except Exception as e:
logger.error(f"[{session_id}] Error: {e}")
finally:
# Store pending transcripts in Honcho
if kira_memory.enabled and identified:
for _, transcript_text in pending_transcripts:
if transcript_text.startswith("user: "):
content = transcript_text[6:]
kira_memory.store_user_message(content)
elif transcript_text.startswith("assistant: "):
content = transcript_text[11:]
kira_memory.store_kira_message(content)
fwd_audio_task.cancel()
fwd_text_task.cancel()
if relay:
await relay.disconnect()
if relay_task:
relay_task.cancel()
fwd_audio.cancel()
fwd_text.cancel()
if pipeline:
await pipeline.disconnect()
if pipeline_task:
pipeline_task.cancel()
+224
View File
@@ -0,0 +1,224 @@
"""Hybrid pipeline: streaming STT (gpt-realtime-whisper) + cheap LLM + TTS.
Uses gpt-realtime-whisper for low-latency streaming transcription,
gpt-5.4-nano as the brain, and OpenAI TTS for voice output.
"""
import json
import base64
import logging
import asyncio
from typing import Callable, Awaitable
from openai import AsyncOpenAI
from config import settings
logger = logging.getLogger("kira.hybrid")
# ─── System instructions for Kira's personality ───
KIRA_INSTRUCTIONS = (
"You are Kira, a warm, kind, and encouraging AI body double. "
"You speak in a friendly, girly-pop tone. You are helping someone with ADHD "
"stay focused and on task. Keep responses short, supportive, and uplifting. "
"Check in on them. Remind them to take breaks. Celebrate small wins. "
"Use occasional emoji but don't overdo it. Never be judgmental."
)
class HybridPipeline:
"""Streaming STT via gpt-realtime-whisper → gpt-5.4-nano LLM → OpenAI TTS."""
def __init__(
self,
on_transcript_delta: Callable[[str], Awaitable[None]],
on_transcript_done: Callable[[str], Awaitable[None]],
on_audio_delta: Callable[[bytes], Awaitable[None]],
on_speech_start: Callable[[], Awaitable[None]],
on_speech_end: Callable[[], Awaitable[None]],
on_ready: Callable[[], Awaitable[None]],
on_error: Callable[[str], Awaitable[None]],
memory_suffix: str = "",
):
self._on_transcript_delta = on_transcript_delta
self._on_transcript_done = on_transcript_done
self._on_audio_delta = on_audio_delta
self._on_speech_start = on_speech_start
self._on_speech_end = on_speech_end
self._on_ready = on_ready
self._on_error = on_error
self._memory_suffix = memory_suffix
self._openai = None
self._conn = None
self._connected = False
self._transcript_buffer = ""
async def connect(self):
"""Connect to gpt-realtime-whisper via OpenAI Realtime API."""
if self._connected:
return
try:
self._openai = AsyncOpenAI(api_key=settings.openai_api_key)
logger.info("Connecting to gpt-realtime-whisper...")
async with self._openai.beta.realtime.connect(
model="gpt-realtime-whisper",
) as conn:
self._conn = conn
self._connected = True
logger.info("Connected to gpt-realtime-whisper")
# Configure session for transcription
await self._send({
"type": "session.update",
"session": {
"input_audio_format": "pcm16",
"input_audio_transcription": {
"enabled": True,
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 600,
},
},
})
await self._on_ready()
# Listen for transcription events
while self._connected:
try:
event = await conn.recv()
await self._handle_event(event)
except Exception as e:
if self._connected:
logger.warning(f"recv error: {e}")
break
except Exception as e:
logger.error(f"Connection error: {e}")
await self._on_error(str(e))
finally:
self._connected = False
self._conn = None
async def _handle_event(self, event):
"""Process events from gpt-realtime-whisper."""
event_type = getattr(event, "type", None) or (event.get("type") if isinstance(event, dict) else "")
if event_type == "input_audio_buffer.speech_started":
self._transcript_buffer = ""
elif event_type == "input_audio_buffer.speech_stopped":
if self._transcript_buffer.strip():
await self._process_transcript(self._transcript_buffer.strip())
self._transcript_buffer = ""
elif event_type == "input_audio_buffer.transcription_delta":
delta_text = self._get_field(event, "delta", "")
if delta_text:
self._transcript_buffer += delta_text
elif event_type == "conversation.item.created":
item = self._get_field(event, "item", {})
content = self._get_field(item, "content", [])
for part in (content or []):
part_type = self._get_field(part, "type", "")
part_transcript = self._get_field(part, "transcript", "")
if part_type == "transcript" and part_transcript:
self._transcript_buffer = part_transcript
await self._on_transcript_delta(part_transcript)
elif event_type == "error":
err = self._get_field(event, "error", {})
msg = self._get_field(err, "message", str(event))
logger.warning(f"Whisper error: {msg}")
async def _process_transcript(self, transcript: str):
"""Full utterance received. Call LLM, then TTS."""
await self._on_transcript_done(transcript)
logger.info(f"User: {transcript}")
# Build system prompt with optional memory context
system_content = KIRA_INSTRUCTIONS
if self._memory_suffix:
system_content += self._memory_suffix
# Call gpt-5.4-nano
try:
resp = await self._openai.chat.completions.create(
model="gpt-5.4-nano",
messages=[
{"role": "system", "content": system_content},
{"role": "user", "content": transcript},
],
max_tokens=300,
temperature=0.7,
)
kira_text = resp.choices[0].message.content or "Mhm, I'm here!"
except Exception as e:
logger.error(f"LLM error: {e}")
kira_text = "Sorry, let me try that again!"
await self._on_error(str(e))
logger.info(f"Kira: {kira_text}")
# Call TTS
await self._on_speech_start()
try:
tts_resp = await self._openai.audio.speech.create(
model="tts-1",
voice="nova",
input=kira_text,
response_format="opus",
)
audio_bytes = tts_resp.content
if audio_bytes:
await self._on_audio_delta(audio_bytes)
except Exception as e:
logger.error(f"TTS error: {e}")
await self._on_speech_end()
async def send_audio(self, pcm16_bytes: bytes):
"""Send PCM16 audio chunk for transcription."""
if not self._connected or not self._conn:
return
try:
audio_b64 = base64.b64encode(pcm16_bytes).decode("utf-8")
await self._send({
"type": "input_audio_buffer.append",
"audio": audio_b64,
})
except Exception as e:
logger.warning(f"Send audio error: {e}")
async def send_text(self, text: str):
"""Process text input directly (no transcription needed)."""
await self._process_transcript(text)
async def _send(self, data: dict):
try:
await self._conn.send(data)
except Exception as e:
logger.warning(f"Send error: {e}")
async def disconnect(self):
self._connected = False
if self._conn:
try:
await self._conn.close()
except Exception:
pass
self._conn = None
@staticmethod
def _get_field(obj, field: str, default=None):
"""Get a field from either an object or dict."""
if hasattr(obj, field):
return getattr(obj, field, default)
if isinstance(obj, dict):
return obj.get(field, default)
return default
-192
View File
@@ -1,192 +0,0 @@
"""OpenAI Realtime API relay service.
Manages a WebSocket connection to OpenAI's Realtime API and relays
audio/text events between the client and OpenAI.
"""
import json
import logging
from typing import Callable, Awaitable
from config import settings
logger = logging.getLogger("kira.realtime")
# ─── System instructions for Kira's personality ───
KIRA_INSTRUCTIONS = (
"You are Kira, a warm, kind, and encouraging AI body double. "
"You speak in a friendly, girly-pop tone. You are helping someone with ADHD "
"stay focused and on task. Keep responses short, supportive, and uplifting. "
"Check in on them. Remind them to take breaks. Celebrate small wins. "
"Use occasional emoji but don't overdo it. Never be judgmental. "
"You remember things about them between conversations."
)
class RealtimeRelay:
"""Relays audio/text between a client WS and OpenAI Realtime API."""
def __init__(
self,
on_audio_delta: Callable[[bytes], Awaitable[None]],
on_transcript: Callable[[str], Awaitable[None]],
on_speech_started: Callable[[], Awaitable[None]],
on_speech_stopped: Callable[[], Awaitable[None]],
on_interruption: Callable[[], Awaitable[None]],
on_error: Callable[[str], Awaitable[None]],
on_ready: Callable[[], Awaitable[None]],
):
self._on_audio_delta = on_audio_delta
self._on_transcript = on_transcript
self._on_speech_started = on_speech_started
self._on_speech_stopped = on_speech_stopped
self._on_interruption = on_interruption
self._on_error = on_error
self._on_ready = on_ready
self._conn = None
self._connected = False
async def connect(self):
"""Open a WebSocket to OpenAI Realtime API."""
if self._connected:
return
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=settings.openai_api_key)
logger.info("Connecting to OpenAI Realtime API...")
async with client.beta.realtime.connect(
model="gpt-realtime-2",
extra_headers={"OpenAI-Beta": ""},
) as conn:
self._conn = conn
self._connected = True
logger.info("Connected to OpenAI Realtime API")
# Configure session
await self._send({
"type": "session.update",
"session": {
"instructions": KIRA_INSTRUCTIONS,
"voice": "alloy",
"input_audio_transcription": {"enabled": True},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 600,
},
},
})
await self._on_ready()
# Listen for events
while self._connected:
try:
event = await conn.recv()
await self._handle_event(event)
except Exception as e:
if self._connected:
logger.warning(f"Realtime recv error: {e}")
break
except ImportError:
logger.error("openai[realtime] not installed — run: pip install 'openai[realtime]'")
await self._on_error("Missing openai[realtime] dependency")
except Exception as e:
logger.error(f"Realtime connection error: {e}")
await self._on_error(str(e))
finally:
self._connected = False
self._conn = None
async def _handle_event(self, event):
"""Process an event from the OpenAI Realtime API."""
event_type = getattr(event, "type", None) or event.get("type", "")
if event_type == "response.audio.delta":
audio_b64 = getattr(event, "delta", None) or event.get("delta", "")
if audio_b64:
import base64
audio_bytes = base64.b64decode(audio_b64)
await self._on_audio_delta(audio_bytes)
elif event_type == "response.audio_buffer.speech_started":
await self._on_speech_started()
elif event_type == "response.audio_buffer.speech_stopped":
await self._on_speech_stopped()
elif event_type == "input_audio_buffer.speech_started":
# User started speaking — interrupt Kira
await self._on_interruption()
elif event_type == "conversation.item.created":
item = getattr(event, "item", None) or event.get("item", {})
role = getattr(item, "role", None) or item.get("role", "")
content = getattr(item, "content", None) or item.get("content", [])
for part in (content or []):
part_type = getattr(part, "type", None) or part.get("type", "")
part_text = getattr(part, "text", None) or part.get("text", "")
if part_type == "text" and part_text and role == "assistant":
await self._on_transcript(f"assistant: {part_text}")
part_transcript = getattr(part, "transcript", None) or part.get("transcript", "")
if part_type == "transcript" and part_transcript and role == "user":
await self._on_transcript(f"user: {part_transcript}")
elif event_type == "error":
err = getattr(event, "error", None) or event.get("error", {})
msg = getattr(err, "message", None) or err.get("message", str(event))
logger.warning(f"Realtime API error: {msg}")
await self._on_error(msg)
async def send_audio(self, pcm16_bytes: bytes):
"""Send PCM16 audio chunk to OpenAI."""
if not self._connected or not self._conn:
return
try:
import base64
audio_b64 = base64.b64encode(pcm16_bytes).decode("utf-8")
await self._send({
"type": "input_audio_buffer.append",
"audio": audio_b64,
})
except Exception as e:
logger.warning(f"Failed to send audio: {e}")
async def send_text(self, text: str):
"""Send a text message to OpenAI and trigger a response."""
if not self._connected or not self._conn:
return
try:
await self._send({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": text}],
},
})
await self._send({"type": "response.create"})
except Exception as e:
logger.warning(f"Failed to send text: {e}")
async def _send(self, data: dict):
"""Send a JSON event to the Realtime API."""
try:
await self._conn.send(data)
except Exception as e:
logger.warning(f"Realtime send error: {e}")
async def disconnect(self):
"""Close the Realtime connection."""
self._connected = False
if self._conn:
try:
await self._conn.close()
except Exception:
pass
self._conn = None