feat: OpenAI Realtime API pipeline
Replaced the 3-step sequential pipeline (Whisper STT → DeepSeek LLM → OpenAI TTS) with a single OpenAI Realtime API WebSocket using gpt-4o-mini-realtime-preview. - ~300-800ms latency vs 1-3s - Server VAD for automatic turn detection - Streaming audio chunks during playback - Interruptions: user can speak over Kira mid-response - Honcho memory still injected into session instructions - Frontend captures PCM16 mono 24kHz via AudioContext - Backend relays client ↔ OpenAI Realtime API - Supports both voice (PCM16) and text input
This commit is contained in:
+183
-172
@@ -1,26 +1,21 @@
|
||||
"""Kira — AI body double backend
|
||||
|
||||
Real-time speech-to-speech pipeline:
|
||||
mic audio → Whisper API → text → DeepSeek LLM → response text → OpenAI TTS → audio
|
||||
|
||||
Honcho memory integration:
|
||||
Cross-session user context injected into LLM prompts,
|
||||
conversation exchanges stored for continuous learning.
|
||||
User preferences (name, scene, outfit, accessory) persisted in peer metadata.
|
||||
OpenAI Realtime API pipeline:
|
||||
mic audio → [built-in STT → GPT-4o-mini → built-in TTS] → speaker audio
|
||||
Single WebSocket, ~300-800ms latency
|
||||
"""
|
||||
|
||||
import json
|
||||
import base64
|
||||
import uuid
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from config import settings
|
||||
from services.stt import transcribe_audio
|
||||
from services.llm import get_kira_response
|
||||
from services.tts import synthesize_speech
|
||||
from services.realtime import RealtimeRelay
|
||||
from services.memory import kira_memory
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@@ -36,24 +31,13 @@ app.add_middleware(
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# ─── Base system prompt (static part) ───
|
||||
BASE_SYSTEM_PROMPT = (
|
||||
"You are Kira, a warm, kind, and encouraging AI body double. "
|
||||
"You speak in a friendly, girly-pop tone. You are helping someone with ADHD "
|
||||
"stay focused and on task. Keep responses short, supportive, and uplifting. "
|
||||
"Check in on them. Remind them to take breaks. Celebrate small wins. "
|
||||
"Use occasional emoji but don't overdo it. Never be judgmental. "
|
||||
"You remember things about them between conversations."
|
||||
)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
"""Initialize Honcho memory on app startup."""
|
||||
if kira_memory.init():
|
||||
logger.info("Honcho memory initialized")
|
||||
else:
|
||||
logger.info("Honcho memory not configured — running without memory")
|
||||
logger.info("Honcho memory not configured")
|
||||
|
||||
|
||||
@app.get("/api/health")
|
||||
@@ -62,61 +46,6 @@ async def health():
|
||||
return {"status": "ok", "name": "kira", "memory": mem_status}
|
||||
|
||||
|
||||
def build_system_prompt(user_id: str) -> dict:
|
||||
"""Build system prompt with Honcho memory context injected."""
|
||||
base = BASE_SYSTEM_PROMPT
|
||||
|
||||
if kira_memory.enabled:
|
||||
try:
|
||||
kira_memory.ensure_peers(user_id)
|
||||
memory_suffix = kira_memory.build_system_prompt_suffix()
|
||||
if memory_suffix:
|
||||
base += memory_suffix
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to build memory context: {e}")
|
||||
|
||||
return {"role": "system", "content": base}
|
||||
|
||||
|
||||
def handle_identify(msg: dict, session_id: str) -> dict | None:
|
||||
"""Handle user identification. Returns user preferences or None."""
|
||||
user_id = msg.get("user_id", "").strip()
|
||||
if not user_id:
|
||||
return {"type": "error", "message": "user_id is required"}
|
||||
|
||||
user_name = msg.get("name", "").strip()
|
||||
if user_name:
|
||||
kira_memory.set_user_preference(user_id, "name", user_name)
|
||||
|
||||
prefs = kira_memory.get_user_preferences(user_id)
|
||||
logger.info(f"[{session_id}] Identified as {user_id} (name={user_name or prefs.get('name', '')})")
|
||||
|
||||
return {
|
||||
"type": "identified",
|
||||
"user_id": user_id,
|
||||
"preferences": prefs,
|
||||
}
|
||||
|
||||
|
||||
def handle_set_preference(msg: dict, session_id: str, user_id: str) -> dict | None:
|
||||
"""Handle preference update. Returns success status."""
|
||||
if not user_id or user_id == "default-user":
|
||||
return {"type": "error", "message": "Must identify first"}
|
||||
|
||||
key = msg.get("key", "").strip()
|
||||
value = msg.get("value", "").strip()
|
||||
|
||||
if not key:
|
||||
return {"type": "error", "message": "key is required"}
|
||||
|
||||
ok = kira_memory.set_user_preference(user_id, key, value)
|
||||
return {
|
||||
"type": "preference_saved",
|
||||
"key": key,
|
||||
"success": ok,
|
||||
}
|
||||
|
||||
|
||||
@app.websocket("/api/ws")
|
||||
async def conversation_ws(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
@@ -125,8 +54,102 @@ async def conversation_ws(websocket: WebSocket):
|
||||
identified = False
|
||||
logger.info(f"[{session_id}] WebSocket connected")
|
||||
|
||||
audio_buffer = bytearray()
|
||||
conversation_history: list[dict] = []
|
||||
# Track conversation for Honcho
|
||||
pending_transcripts: list[tuple[str, str]] = []
|
||||
|
||||
# Will be set when Realtime relay is ready
|
||||
relay_ready = asyncio.Event()
|
||||
relay: RealtimeRelay | None = None
|
||||
relay_task: asyncio.Task | None = None
|
||||
audio_queue: asyncio.Queue[bytes] = asyncio.Queue()
|
||||
text_queue: asyncio.Queue[str] = asyncio.Queue()
|
||||
|
||||
async def on_ready():
|
||||
relay_ready.set()
|
||||
logger.info(f"[{session_id}] Realtime relay ready")
|
||||
|
||||
async def on_audio_delta(audio_bytes: bytes):
|
||||
"""Forward audio chunks from OpenAI to the client."""
|
||||
try:
|
||||
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
|
||||
await websocket.send_json({
|
||||
"type": "audio",
|
||||
"data": audio_b64,
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def on_transcript(text: str):
|
||||
"""Store transcripts for Honcho."""
|
||||
pending_transcripts.append(("transcript", text))
|
||||
role, content = text.split(": ", 1)
|
||||
logger.info(f"[{session_id}] {role}: {content}")
|
||||
await websocket.send_json({
|
||||
"type": "transcript",
|
||||
"role": role,
|
||||
"text": content,
|
||||
})
|
||||
|
||||
async def on_speech_started():
|
||||
"""Kira started speaking."""
|
||||
await websocket.send_json({"type": "speaking_start"})
|
||||
|
||||
async def on_speech_stopped():
|
||||
"""Kira finished speaking."""
|
||||
await websocket.send_json({"type": "speaking_end"})
|
||||
|
||||
async def on_interruption():
|
||||
"""User interrupted — Kira stops speaking."""
|
||||
await websocket.send_json({"type": "interruption"})
|
||||
|
||||
async def on_error(msg: str):
|
||||
await websocket.send_json({"type": "error", "message": msg})
|
||||
|
||||
# ── Create and start the Realtime relay ──
|
||||
relay = RealtimeRelay(
|
||||
on_audio_delta=on_audio_delta,
|
||||
on_transcript=on_transcript,
|
||||
on_speech_started=on_speech_started,
|
||||
on_speech_stopped=on_speech_stopped,
|
||||
on_interruption=on_interruption,
|
||||
on_error=on_error,
|
||||
on_ready=on_ready,
|
||||
)
|
||||
|
||||
relay_task = asyncio.create_task(relay.connect())
|
||||
|
||||
# Wait for relay to be ready
|
||||
try:
|
||||
await asyncio.wait_for(relay_ready.wait(), timeout=15)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"[{session_id}] Realtime relay failed to connect")
|
||||
await websocket.send_json({"type": "error", "message": "Failed to connect to AI"})
|
||||
relay_task.cancel()
|
||||
return
|
||||
|
||||
# ── Forward audio/text from client to relay ──
|
||||
async def forward_audio():
|
||||
while relay and relay._connected:
|
||||
try:
|
||||
pcm16 = await asyncio.wait_for(audio_queue.get(), timeout=1)
|
||||
await relay.send_audio(pcm16)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception:
|
||||
break
|
||||
|
||||
async def forward_text():
|
||||
while relay and relay._connected:
|
||||
try:
|
||||
text = await asyncio.wait_for(text_queue.get(), timeout=1)
|
||||
await relay.send_text(text)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception:
|
||||
break
|
||||
|
||||
fwd_audio_task = asyncio.create_task(forward_audio())
|
||||
fwd_text_task = asyncio.create_task(forward_text())
|
||||
|
||||
try:
|
||||
while True:
|
||||
@@ -134,110 +157,98 @@ async def conversation_ws(websocket: WebSocket):
|
||||
msg = json.loads(raw)
|
||||
msg_type = msg.get("type", "")
|
||||
|
||||
# ── Identity & Preferences ──
|
||||
|
||||
# ── Identity ──
|
||||
if msg_type == "identify":
|
||||
response = handle_identify(msg, session_id)
|
||||
if response:
|
||||
await websocket.send_json(response)
|
||||
if response["type"] == "identified":
|
||||
user_id = response["user_id"]
|
||||
identified = True
|
||||
# Set up Honcho for this user
|
||||
if kira_memory.enabled:
|
||||
try:
|
||||
kira_memory.ensure_peers(user_id)
|
||||
kira_memory.ensure_session(session_id)
|
||||
logger.info(f"[{session_id}] Honcho session ready for {user_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[{session_id}] Honcho setup failed: {e}")
|
||||
user_id = msg.get("user_id", "").strip()
|
||||
user_name = msg.get("name", "").strip()
|
||||
if user_name and user_id:
|
||||
kira_memory.set_user_preference(user_id, "name", user_name)
|
||||
|
||||
prefs = kira_memory.get_user_preferences(user_id)
|
||||
identified = True
|
||||
|
||||
if kira_memory.enabled:
|
||||
kira_memory.ensure_peers(user_id)
|
||||
kira_memory.ensure_session(session_id)
|
||||
|
||||
# Inject Honcho context into the Realtime session instructions
|
||||
memory_suffix = ""
|
||||
if kira_memory.enabled:
|
||||
try:
|
||||
ctx = kira_memory.build_system_prompt_suffix()
|
||||
if ctx:
|
||||
memory_suffix = ctx
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if relay and relay._connected and memory_suffix:
|
||||
await relay._send({
|
||||
"type": "session.update",
|
||||
"session": {
|
||||
"instructions": (
|
||||
"You are Kira, a warm, kind, and encouraging AI body double. "
|
||||
"Speak in a friendly, girly-pop tone. Help someone with ADHD "
|
||||
"stay focused. Keep responses short and supportive. "
|
||||
"Check in, remind breaks, celebrate wins. Never judgmental."
|
||||
+ memory_suffix
|
||||
),
|
||||
},
|
||||
})
|
||||
|
||||
await websocket.send_json({
|
||||
"type": "identified",
|
||||
"user_id": user_id,
|
||||
"preferences": prefs,
|
||||
})
|
||||
continue
|
||||
|
||||
# ── Preferences ──
|
||||
if msg_type == "set_preference":
|
||||
response = handle_set_preference(msg, session_id, user_id)
|
||||
if response:
|
||||
await websocket.send_json(response)
|
||||
key = msg.get("key", "").strip()
|
||||
value = msg.get("value", "").strip()
|
||||
if key and user_id and user_id != "default-user":
|
||||
kira_memory.set_user_preference(user_id, key, value)
|
||||
continue
|
||||
|
||||
# ── Conversation ──
|
||||
# ── Audio from frontend (PCM16) ──
|
||||
if msg_type == "audio":
|
||||
audio_b64 = msg.get("data", "")
|
||||
if audio_b64:
|
||||
pcm16 = base64.b64decode(audio_b64)
|
||||
await audio_queue.put(pcm16)
|
||||
continue
|
||||
|
||||
system_prompt = build_system_prompt(user_id)
|
||||
# ── Text input ──
|
||||
if msg_type == "conversation_text":
|
||||
text = msg.get("text", "").strip()
|
||||
if text:
|
||||
await text_queue.put(text)
|
||||
# Also store in Honcho immediately
|
||||
if kira_memory.enabled and identified:
|
||||
kira_memory.store_user_message(text)
|
||||
continue
|
||||
|
||||
if msg_type == "audio_chunk":
|
||||
chunk = base64.b64decode(msg["data"])
|
||||
audio_buffer.extend(chunk)
|
||||
|
||||
elif msg_type == "transcribe":
|
||||
if not audio_buffer:
|
||||
await websocket.send_json({"type": "error", "message": "No audio data"})
|
||||
continue
|
||||
|
||||
logger.info(f"[{session_id}] Transcribing {len(audio_buffer)} bytes...")
|
||||
|
||||
transcript = await transcribe_audio(bytes(audio_buffer))
|
||||
audio_buffer.clear()
|
||||
|
||||
if not transcript:
|
||||
await websocket.send_json({"type": "error", "message": "Could not transcribe audio"})
|
||||
continue
|
||||
|
||||
await websocket.send_json({"type": "transcript", "text": transcript})
|
||||
|
||||
logger.info(f"[{session_id}] User: {transcript}")
|
||||
conversation_history.append({"role": "user", "content": transcript})
|
||||
|
||||
messages = [system_prompt] + conversation_history[-10:]
|
||||
kira_text = await get_kira_response(messages)
|
||||
|
||||
conversation_history.append({"role": "assistant", "content": kira_text})
|
||||
logger.info(f"[{session_id}] Kira: {kira_text}")
|
||||
|
||||
if kira_memory.enabled and identified:
|
||||
try:
|
||||
kira_memory.store_messages(transcript, kira_text)
|
||||
except Exception as e:
|
||||
logger.warning(f"[{session_id}] Failed to store messages: {e}")
|
||||
|
||||
await websocket.send_json({"type": "speaking_start", "text": kira_text})
|
||||
audio_bytes = await synthesize_speech(kira_text)
|
||||
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
|
||||
await websocket.send_json({"type": "audio", "data": audio_b64, "text": kira_text})
|
||||
await websocket.send_json({"type": "speaking_end"})
|
||||
|
||||
elif msg_type == "ping":
|
||||
if msg_type == "ping":
|
||||
await websocket.send_json({"type": "pong"})
|
||||
|
||||
elif msg_type == "conversation_text":
|
||||
user_text = msg.get("text", "").strip()
|
||||
if not user_text:
|
||||
continue
|
||||
|
||||
logger.info(f"[{session_id}] User (text): {user_text}")
|
||||
conversation_history.append({"role": "user", "content": user_text})
|
||||
|
||||
messages = [system_prompt] + conversation_history[-10:]
|
||||
kira_text = await get_kira_response(messages)
|
||||
|
||||
conversation_history.append({"role": "assistant", "content": kira_text})
|
||||
logger.info(f"[{session_id}] Kira: {kira_text}")
|
||||
|
||||
if kira_memory.enabled and identified:
|
||||
try:
|
||||
kira_memory.store_messages(user_text, kira_text)
|
||||
except Exception as e:
|
||||
logger.warning(f"[{session_id}] Failed to store messages: {e}")
|
||||
|
||||
await websocket.send_json({"type": "speaking_start", "text": kira_text})
|
||||
audio_bytes = await synthesize_speech(kira_text)
|
||||
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
|
||||
await websocket.send_json({"type": "audio", "data": audio_b64, "text": kira_text})
|
||||
await websocket.send_json({"type": "speaking_end"})
|
||||
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"[{session_id}] Disconnected")
|
||||
except Exception as e:
|
||||
logger.error(f"[{session_id}] Error: {e}")
|
||||
try:
|
||||
await websocket.send_json({"type": "error", "message": str(e)})
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
# Store pending transcripts in Honcho
|
||||
if kira_memory.enabled and identified:
|
||||
for _, transcript_text in pending_transcripts:
|
||||
if transcript_text.startswith("user: "):
|
||||
content = transcript_text[6:]
|
||||
kira_memory.store_user_message(content)
|
||||
elif transcript_text.startswith("assistant: "):
|
||||
content = transcript_text[11:]
|
||||
kira_memory.store_kira_message(content)
|
||||
|
||||
fwd_audio_task.cancel()
|
||||
fwd_text_task.cancel()
|
||||
if relay:
|
||||
await relay.disconnect()
|
||||
if relay_task:
|
||||
relay_task.cancel()
|
||||
|
||||
Reference in New Issue
Block a user