feat(audio): Gemini Live API replaces Whisper+GPT+ElevenLabs

Single WebSocket proxy: frontend PCM16 16kHz → backend → Gemini Live API
Gemini returns PCM16 24kHz audio + text. Playback via Web Audio API queue.
Removed OpenAI/DeepSeek deps. Model: gemini-3.1-flash-live-preview.
Voice: Aoede. Streaming bidirectional audio with silence gating.
This commit is contained in:
2026-06-05 23:36:29 -04:00
parent d2bde65645
commit 83a990e838
6 changed files with 331 additions and 286 deletions
+202 -153
View File
@@ -1,6 +1,7 @@
"""Kira — AI body double backend
REST STT (gpt-4o-transcribe) → gpt-5.4-nano + Honcho → streaming TTS (sage)
Gemini Live API (gemini-3.1-flash-live-preview) for real-time voice.
Text chat still goes through Gemini generateContent REST endpoint.
"""
import json
@@ -8,7 +9,9 @@ import base64
import uuid
import logging
import asyncio
import struct
import websockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
@@ -33,18 +36,12 @@ BASE_SYSTEM_PROMPT = (
"You speak in a friendly, girly-pop tone. You are helping someone with ADHD "
"stay focused and on task. Keep responses short, supportive, and uplifting. "
"Check in on them. Remind them to take breaks. Celebrate small wins. "
"Use occasional emoji but don't overdo it. Never be judgmental."
"Use occasional emoji but don't overdo it. Never be judgmental. "
"You are speaking out loud via voice, so keep natural conversational flow."
)
_openai = None
def get_openai():
global _openai
if _openai is None:
from openai import AsyncOpenAI
_openai = AsyncOpenAI(api_key=settings.openai_api_key)
return _openai
GEMINI_WS_URL = "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.BidiGenerateContent"
GEMINI_MODEL = "models/gemini-3.1-flash-live-preview"
@app.on_event("startup")
@@ -61,171 +58,223 @@ async def health():
return {"status": "ok", "name": "kira", "memory": mem_status}
async def transcribe_audio(client, audio_b64: str) -> str | None:
"""REST transcription via gpt-4o-transcribe (full utterance blob)."""
try:
audio_bytes = base64.b64decode(audio_b64)
import io
audio_file = io.BytesIO(audio_bytes)
audio_file.name = "audio.webm"
transcript = await client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=audio_file,
)
return transcript.text.strip() if transcript.text else None
except Exception as e:
logger.error(f"Transcription error: {e}")
return None
async def get_kira_response(client, user_text: str, memory_suffix: str) -> str:
"""Get Kira's response from gpt-5.4-nano."""
system_prompt = BASE_SYSTEM_PROMPT
if memory_suffix:
system_prompt += memory_suffix
resp = await client.chat.completions.create(
model="gpt-5.4-nano",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text},
],
max_completion_tokens=300,
temperature=0.7,
)
return resp.choices[0].message.content or "Mhm, I'm here! ✨"
async def stream_tts(client, text: str, websocket: WebSocket):
"""Stream TTS audio as Opus chunks over WebSocket."""
await websocket.send_json({"type": "speaking_start", "text": text})
async with client.audio.speech.with_streaming_response.create(
model="tts-1",
voice="sage",
input=text,
response_format="opus",
) as tts_resp:
async for chunk in tts_resp.iter_bytes():
if chunk:
b64 = base64.b64encode(chunk).decode("utf-8")
await websocket.send_json({"type": "audio", "data": b64})
await websocket.send_json({"type": "speaking_end"})
@app.websocket("/api/ws")
async def conversation_ws(websocket: WebSocket):
async def gemini_voice_ws(websocket: WebSocket):
"""WebSocket proxy between frontend and Gemini Live API.
Protocol (frontend ↔ this proxy):
{"type": "audio", "data": "<base64 PCM16 16kHz>"}
{"type": "conversation_text", "text": "..."}
{"type": "identify", "user_id": "...", "name": "..."}
{"type": "ping"}
{"type": "audio", "data": "<base64 PCM16 24kHz>"}
{"type": "transcript", "role": "user"|"kira", "text": "..."}
{"type": "turn_complete"}
{"type": "interrupted"}
{"type": "error", "message": "..."}
"""
await websocket.accept()
session_id = str(uuid.uuid4())[:8]
user_id = "default-user"
identified = False
memory_suffix = ""
logger.info(f"[{session_id}] WebSocket connected")
conversation_history: list[dict] = []
gemini_ws = None
gemini_task = None
frontend_task = None
try:
while True:
raw = await websocket.receive_text()
msg = json.loads(raw)
msg_type = msg.get("type", "")
# ── Connect to Gemini Live API ──
gemini_url = f"{GEMINI_WS_URL}?key={settings.gemini_api_key}"
gemini_ws = await websockets.connect(gemini_url, max_size=2**24)
# ── Identity & Preferences ──
if msg_type == "identify":
user_id = msg.get("user_id", "").strip()
user_name = msg.get("name", "").strip()
if user_name and user_id:
kira_memory.set_user_preference(user_id, "name", user_name)
# ── Send setup ──
system_prompt = BASE_SYSTEM_PROMPT
setup_msg = {
"setup": {
"model": GEMINI_MODEL,
"generationConfig": {
"responseModalities": ["AUDIO", "TEXT"],
"speechConfig": {
"voiceConfig": {
"prebuiltVoiceConfig": {
"voiceName": "Aoede"
}
}
},
},
"systemInstruction": {
"parts": [{"text": system_prompt}]
},
}
}
await gemini_ws.send(json.dumps(setup_msg))
logger.info(f"[{session_id}] Connected to Gemini Live API")
prefs = kira_memory.get_user_preferences(user_id)
identified = True
# Wait for setup complete
raw = await asyncio.wait_for(gemini_ws.recv(), timeout=10)
setup_resp = json.loads(raw)
if "setupComplete" in setup_resp:
logger.info(f"[{session_id}] Gemini setup complete")
else:
logger.warning(f"[{session_id}] Unexpected setup response: {list(setup_resp.keys())}")
if kira_memory.enabled:
kira_memory.ensure_peers(user_id)
kira_memory.ensure_session(session_id)
try:
ctx = kira_memory.build_system_prompt_suffix()
if ctx:
memory_suffix = ctx
except Exception:
# ── Gemini → Frontend relay ──
async def relay_gemini():
try:
async for raw in gemini_ws:
msg = json.loads(raw)
if "serverContent" in msg:
sc = msg["serverContent"]
model_turn = sc.get("modelTurn", {})
parts = model_turn.get("parts", [])
for part in parts:
# Text response
if "text" in part:
await websocket.send_json({
"type": "transcript",
"role": "kira",
"text": part["text"],
})
# Audio response (PCM16 24kHz)
if "inlineData" in part:
audio_data = part["inlineData"].get("data", "")
if audio_data:
await websocket.send_json({
"type": "audio",
"data": audio_data,
})
# Turn complete
if sc.get("turnComplete"):
await websocket.send_json({"type": "turn_complete"})
# Interrupted
if sc.get("interrupted"):
await websocket.send_json({"type": "interrupted"})
elif "toolCall" in msg:
pass # future: tool use
elif "toolCallCancellation" in msg:
pass
await websocket.send_json({
"type": "identified",
"user_id": user_id,
"preferences": prefs,
})
continue
elif "error" in msg:
err = msg["error"]
logger.error(f"[{session_id}] Gemini error: {err}")
await websocket.send_json({
"type": "error",
"message": str(err.get("message", err)),
})
if msg_type == "set_preference":
key = msg.get("key", "").strip()
value = msg.get("value", "").strip()
if key and user_id and user_id != "default-user":
kira_memory.set_user_preference(user_id, key, value)
await websocket.send_json({"type": "preference_saved", "key": key, "success": True})
continue
except websockets.exceptions.ConnectionClosed:
logger.info(f"[{session_id}] Gemini WS closed")
except Exception as e:
logger.error(f"[{session_id}] Gemini relay error: {e}")
# ── Audio (full webm/opus blob from MediaRecorder) → REST STT → LLM → TTS ──
if msg_type == "audio":
audio_b64 = msg.get("data", "")
if not audio_b64:
continue
# ── Frontend → Gemini relay ──
async def relay_frontend():
nonlocal user_id, memory_suffix
try:
while True:
raw = await websocket.receive_text()
msg = json.loads(raw)
msg_type = msg.get("type", "")
client = get_openai()
if msg_type == "identify":
user_id = msg.get("user_id", "default-user").strip()
user_name = msg.get("name", "").strip()
if user_name and user_id:
kira_memory.set_user_preference(user_id, "name", user_name)
prefs = kira_memory.get_user_preferences(user_id)
if kira_memory.enabled:
kira_memory.ensure_peers(user_id)
kira_memory.ensure_session(session_id)
try:
ctx = kira_memory.build_system_prompt_suffix()
if ctx:
memory_suffix = ctx
except Exception:
pass
await websocket.send_json({
"type": "identified",
"user_id": user_id,
"preferences": prefs,
})
continue
# STT (REST)
transcript = await transcribe_audio(client, audio_b64)
if not transcript:
await websocket.send_json({"type": "transcript", "role": "user", "text": "(could not transcribe)"})
await websocket.send_json({"type": "error", "message": "Could not transcribe audio"})
continue
if msg_type == "set_preference":
key = msg.get("key", "").strip()
value = msg.get("value", "").strip()
if key and user_id and user_id != "default-user":
kira_memory.set_user_preference(user_id, key, value)
await websocket.send_json({"type": "preference_saved", "key": key, "success": True})
continue
logger.info(f"[{session_id}] User: {transcript}")
await websocket.send_json({"type": "transcript_delta", "text": transcript})
await websocket.send_json({"type": "transcript", "role": "user", "text": transcript})
conversation_history.append({"role": "user", "content": transcript})
if msg_type == "audio":
# Forward PCM16 audio to Gemini as realtimeInput
audio_b64 = msg.get("data", "")
if audio_b64 and gemini_ws and gemini_ws.state.name == "OPEN":
gemini_msg = {
"realtimeInput": {
"audio": {
"mimeType": "audio/pcm;rate=16000",
"data": audio_b64,
}
}
}
await gemini_ws.send(json.dumps(gemini_msg))
continue
# LLM
kira_text = await get_kira_response(client, transcript, memory_suffix)
conversation_history.append({"role": "assistant", "content": kira_text})
logger.info(f"[{session_id}] Kira: {kira_text}")
if msg_type == "conversation_text":
text = msg.get("text", "").strip()
if not text:
continue
logger.info(f"[{session_id}] User (text): {text}")
# Send as a text turn to Gemini
if gemini_ws and gemini_ws.state.name == "OPEN":
user_part = {"text": text}
if memory_suffix:
user_part = {"text": f"[Context: {memory_suffix}]\n{text}"}
gemini_msg = {
"clientContent": {
"turns": [{"role": "user", "parts": [user_part]}],
"turnComplete": True,
}
}
await gemini_ws.send(json.dumps(gemini_msg))
await websocket.send_json({
"type": "transcript",
"role": "user",
"text": text,
})
continue
# Store in Honcho
if kira_memory.enabled and identified:
try:
kira_memory.store_messages(transcript, kira_text)
except Exception:
pass
if msg_type == "ping":
await websocket.send_json({"type": "pong"})
# TTS (streaming)
await stream_tts(client, kira_text, websocket)
continue
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"[{session_id}] Frontend relay error: {e}")
# ── Text input → direct LLM + TTS ──
if msg_type == "conversation_text":
text = msg.get("text", "").strip()
if not text:
continue
gemini_task = asyncio.create_task(relay_gemini())
frontend_task = asyncio.create_task(relay_frontend())
logger.info(f"[{session_id}] User (text): {text}")
conversation_history.append({"role": "user", "content": text})
# Wait for either to finish
done, pending = await asyncio.wait(
[gemini_task, frontend_task],
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
client = get_openai()
kira_text = await get_kira_response(client, text, memory_suffix)
conversation_history.append({"role": "assistant", "content": kira_text})
logger.info(f"[{session_id}] Kira: {kira_text}")
if kira_memory.enabled and identified:
try:
kira_memory.store_messages(text, kira_text)
except Exception:
pass
await stream_tts(client, kira_text, websocket)
continue
if msg_type == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
logger.info(f"[{session_id}] Disconnected")
except Exception as e:
logger.error(f"[{session_id}] Error: {e}")
logger.error(f"[{session_id}] Connection error: {e}")
finally:
if gemini_ws:
await gemini_ws.close()
logger.info(f"[{session_id}] Disconnected")