feat: cheapest pipeline — gpt-4o-mini-transcribe + gpt-5.4-nano + TTS

Simple 3-step chat completions pipeline at ~/usr/bin/bash.019/min total.
Streams PCM16 audio from frontend, transcribes on release,
generates response via gpt-5.4-nano, speaks via OpenAI TTS.

Cost breakdown:
  gpt-4o-mini-transcribe: /usr/bin/bash.003/min
  gpt-5.4-nano:          ~/usr/bin/bash.001/min
  OpenAI TTS (nova):     /usr/bin/bash.015/min
  Total:                 ~/usr/bin/bash.019/min (~/usr/bin/bash.57/day at 30min)
This commit is contained in:
2026-06-04 13:51:35 -04:00
parent 66e799a655
commit f2a5416408
3 changed files with 167 additions and 356 deletions
+162 -120
View File
@@ -1,6 +1,7 @@
"""Kira — AI body double backend
Hybrid pipeline: gpt-realtime-whisper (streaming STT) → gpt-5.4-nano (LLM) → OpenAI TTS
Cheapest pipeline: gpt-4o-mini-transcribe STT → gpt-5.4-nano LLM → OpenAI TTS
~$0.019/min total, simple 3-step chat completions.
"""
import json
@@ -13,7 +14,6 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from services.hybrid import HybridPipeline
from services.memory import kira_memory
logging.basicConfig(level=logging.INFO)
@@ -29,6 +29,25 @@ app.add_middleware(
allow_headers=["*"],
)
# System prompt
BASE_SYSTEM_PROMPT = (
"You are Kira, a warm, kind, and encouraging AI body double. "
"You speak in a friendly, girly-pop tone. You are helping someone with ADHD "
"stay focused and on task. Keep responses short, supportive, and uplifting. "
"Check in on them. Remind them to take breaks. Celebrate small wins. "
"Use occasional emoji but don't overdo it. Never be judgmental."
)
_openai = None
def get_openai():
global _openai
if _openai is None:
from openai import AsyncOpenAI
_openai = AsyncOpenAI(api_key=settings.openai_api_key)
return _openai
@app.on_event("startup")
async def startup():
@@ -44,6 +63,69 @@ async def health():
return {"status": "ok", "name": "kira", "memory": mem_status}
def build_system_prompt(user_id: str) -> str:
prompt = BASE_SYSTEM_PROMPT
if kira_memory.enabled:
try:
kira_memory.ensure_peers(user_id)
suffix = kira_memory.build_system_prompt_suffix()
if suffix:
prompt += suffix
except Exception as e:
logger.warning(f"Memory context failed: {e}")
return prompt
async def run_conversation(text: str, user_id: str) -> str:
"""STT → LLM → TTS using the cheapest models."""
system_prompt = build_system_prompt(user_id)
client = get_openai()
# LLM
resp = await client.chat.completions.create(
model="gpt-5.4-nano",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": text},
],
max_tokens=300,
temperature=0.7,
)
kira_text = resp.choices[0].message.content or "Mhm, I'm here!"
return kira_text
async def transcribe_audio(audio_bytes: bytes) -> str | None:
"""Transcribe audio bytes using cheapest STT model."""
client = get_openai()
try:
transcript = await client.audio.transcriptions.create(
model="gpt-4o-mini-transcribe",
file=("audio.webm", audio_bytes, "audio/webm"),
response_format="text",
)
return transcript.strip() if transcript and transcript.strip() else None
except Exception as e:
logger.warning(f"STT error: {e}")
return None
async def synthesize_speech(text: str) -> bytes:
"""Generate TTS audio from text."""
client = get_openai()
try:
resp = await client.audio.speech.create(
model="tts-1",
voice="nova",
input=text,
response_format="opus",
)
return resp.content
except Exception as e:
logger.warning(f"TTS error: {e}")
return b""
@app.websocket("/api/ws")
async def conversation_ws(websocket: WebSocket):
await websocket.accept()
@@ -52,92 +134,8 @@ async def conversation_ws(websocket: WebSocket):
identified = False
logger.info(f"[{session_id}] WebSocket connected")
pending_transcripts: list[str] = []
pipeline: HybridPipeline | None = None
pipeline_task: asyncio.Task | None = None
pipeline_ready = asyncio.Event()
audio_queue: asyncio.Queue[bytes] = asyncio.Queue()
text_queue: asyncio.Queue[str] = asyncio.Queue()
memory_suffix = ""
async def on_ready():
pipeline_ready.set()
logger.info(f"[{session_id}] Pipeline ready")
async def on_transcript_delta(delta: str):
"""Streaming partial transcript."""
await websocket.send_json({"type": "transcript_delta", "text": delta})
async def on_transcript_done(full: str):
"""Full utterance received."""
await websocket.send_json({"type": "transcript", "role": "user", "text": full})
async def on_audio_delta(audio_bytes: bytes):
"""Forward TTS audio to client."""
try:
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
await websocket.send_json({"type": "audio", "data": audio_b64})
except Exception:
pass
async def on_speech_start():
await websocket.send_json({"type": "speaking_start"})
async def on_speech_end():
await websocket.send_json({"type": "speaking_end"})
async def on_error(msg: str):
await websocket.send_json({"type": "error", "message": msg})
# Create pipeline
pipeline = HybridPipeline(
on_transcript_delta=on_transcript_delta,
on_transcript_done=on_transcript_done,
on_audio_delta=on_audio_delta,
on_speech_start=on_speech_start,
on_speech_end=on_speech_end,
on_ready=on_ready,
on_error=on_error,
memory_suffix=memory_suffix,
)
pipeline_task = asyncio.create_task(pipeline.connect())
try:
await asyncio.wait_for(pipeline_ready.wait(), timeout=15)
except asyncio.TimeoutError:
logger.error(f"[{session_id}] Pipeline failed to connect")
await websocket.send_json({"type": "error", "message": "Failed to connect to AI"})
pipeline_task.cancel()
return
# Forward audio/text from client to pipeline
async def forward_audio():
while pipeline and pipeline._connected:
try:
pcm16 = await asyncio.wait_for(audio_queue.get(), timeout=1)
await pipeline.send_audio(pcm16)
except asyncio.TimeoutError:
continue
except Exception:
break
async def forward_text():
while pipeline and pipeline._connected:
try:
text = await asyncio.wait_for(text_queue.get(), timeout=1)
await pipeline.send_text(text)
# Store in Honcho
if kira_memory.enabled and identified:
kira_memory.store_user_message(text)
except asyncio.TimeoutError:
continue
except Exception:
break
fwd_audio = asyncio.create_task(forward_audio())
fwd_text = asyncio.create_task(forward_text())
audio_buffer = bytearray()
conversation_history: list[dict] = []
try:
while True:
@@ -145,7 +143,7 @@ async def conversation_ws(websocket: WebSocket):
msg = json.loads(raw)
msg_type = msg.get("type", "")
# ── Identity ──
# ── Identity & Preferences ──
if msg_type == "identify":
user_id = msg.get("user_id", "").strip()
user_name = msg.get("name", "").strip()
@@ -159,16 +157,6 @@ async def conversation_ws(websocket: WebSocket):
kira_memory.ensure_peers(user_id)
kira_memory.ensure_session(session_id)
# Build memory context and update pipeline
if kira_memory.enabled:
try:
ctx = kira_memory.build_system_prompt_suffix()
if ctx:
pipeline._memory_suffix = ctx
memory_suffix = ctx
except Exception:
pass
await websocket.send_json({
"type": "identified",
"user_id": user_id,
@@ -176,40 +164,94 @@ async def conversation_ws(websocket: WebSocket):
})
continue
# ── Preferences ──
if msg_type == "set_preference":
key = msg.get("key", "").strip()
value = msg.get("value", "").strip()
if key and user_id and user_id != "default-user":
kira_memory.set_user_preference(user_id, key, value)
await websocket.send_json({
"type": "preference_saved",
"key": key,
"success": True,
})
continue
# ── Audio (PCM16) ──
# ── Conversation ──
if msg_type == "audio":
audio_b64 = msg.get("data", "")
if audio_b64:
pcm16 = base64.b64decode(audio_b64)
await audio_queue.put(pcm16)
continue
# Accumulate PCM16 audio chunks
chunk = base64.b64decode(msg["data"])
audio_buffer.extend(chunk)
# ── Text input ──
if msg_type == "conversation_text":
text = msg.get("text", "").strip()
if text:
await text_queue.put(text)
continue
elif msg_type == "transcribe":
if not audio_buffer:
await websocket.send_json({"type": "error", "message": "No audio data"})
continue
if msg_type == "ping":
logger.info(f"[{session_id}] Transcribing {len(audio_buffer)} bytes...")
# 1. STT
transcript = await transcribe_audio(bytes(audio_buffer))
audio_buffer.clear()
if not transcript:
await websocket.send_json({"type": "error", "message": "Could not transcribe"})
continue
await websocket.send_json({"type": "transcript", "role": "user", "text": transcript})
conversation_history.append({"role": "user", "content": transcript})
# 2. LLM
logger.info(f"[{session_id}] User: {transcript}")
kira_text = await run_conversation(transcript, user_id)
conversation_history.append({"role": "assistant", "content": kira_text})
logger.info(f"[{session_id}] Kira: {kira_text}")
# Store in Honcho
if kira_memory.enabled and identified:
try:
kira_memory.store_messages(transcript, kira_text)
except Exception:
pass
# 3. TTS
await websocket.send_json({"type": "speaking_start", "text": kira_text})
audio_bytes = await synthesize_speech(kira_text)
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
await websocket.send_json({"type": "audio", "data": audio_b64, "text": kira_text})
await websocket.send_json({"type": "speaking_end"})
elif msg_type == "conversation_text":
user_text = msg.get("text", "").strip()
if not user_text:
continue
conversation_history.append({"role": "user", "content": user_text})
logger.info(f"[{session_id}] User (text): {user_text}")
kira_text = await run_conversation(user_text, user_id)
conversation_history.append({"role": "assistant", "content": kira_text})
logger.info(f"[{session_id}] Kira: {kira_text}")
if kira_memory.enabled and identified:
try:
kira_memory.store_messages(user_text, kira_text)
except Exception:
pass
await websocket.send_json({"type": "speaking_start", "text": kira_text})
audio_bytes = await synthesize_speech(kira_text)
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
await websocket.send_json({"type": "audio", "data": audio_b64, "text": kira_text})
await websocket.send_json({"type": "speaking_end"})
elif msg_type == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
logger.info(f"[{session_id}] Disconnected")
except Exception as e:
logger.error(f"[{session_id}] Error: {e}")
finally:
fwd_audio.cancel()
fwd_text.cancel()
if pipeline:
await pipeline.disconnect()
if pipeline_task:
pipeline_task.cancel()
try:
await websocket.send_json({"type": "error", "message": str(e)})
except Exception:
pass
-236
View File
@@ -1,236 +0,0 @@
"""Hybrid pipeline: streaming STT (gpt-realtime-whisper) + cheap LLM + TTS.
Uses gpt-realtime-whisper for low-latency streaming transcription,
gpt-5.4-nano as the brain, and OpenAI TTS for voice output.
"""
import json
import base64
import logging
import asyncio
from typing import Callable, Awaitable
from openai import AsyncOpenAI
from config import settings
logger = logging.getLogger("kira.hybrid")
# ─── System instructions for Kira's personality ───
KIRA_INSTRUCTIONS = (
"You are Kira, a warm, kind, and encouraging AI body double. "
"You speak in a friendly, girly-pop tone. You are helping someone with ADHD "
"stay focused and on task. Keep responses short, supportive, and uplifting. "
"Check in on them. Remind them to take breaks. Celebrate small wins. "
"Use occasional emoji but don't overdo it. Never be judgmental."
)
class HybridPipeline:
"""Streaming STT via gpt-realtime-whisper → gpt-5.4-nano LLM → OpenAI TTS."""
def __init__(
self,
on_transcript_delta: Callable[[str], Awaitable[None]],
on_transcript_done: Callable[[str], Awaitable[None]],
on_audio_delta: Callable[[bytes], Awaitable[None]],
on_speech_start: Callable[[], Awaitable[None]],
on_speech_end: Callable[[], Awaitable[None]],
on_ready: Callable[[], Awaitable[None]],
on_error: Callable[[str], Awaitable[None]],
memory_suffix: str = "",
):
self._on_transcript_delta = on_transcript_delta
self._on_transcript_done = on_transcript_done
self._on_audio_delta = on_audio_delta
self._on_speech_start = on_speech_start
self._on_speech_end = on_speech_end
self._on_ready = on_ready
self._on_error = on_error
self._memory_suffix = memory_suffix
self._openai = None
self._conn = None
self._connected = False
self._transcript_buffer = ""
async def connect(self):
"""Connect to gpt-realtime-whisper via OpenAI Realtime API."""
if self._connected:
return
try:
import websockets
self._openai = AsyncOpenAI(api_key=settings.openai_api_key)
logger.info("Connecting to gpt-realtime-whisper...")
# Connect directly via websockets to avoid the OpenAI-Beta header
url = f"wss://api.openai.com/v1/realtime?model=gpt-realtime-whisper"
ws = await websockets.connect(
url,
additional_headers={
"Authorization": f"Bearer {settings.openai_api_key}",
},
)
async with ws as conn:
self._conn = conn
self._connected = True
logger.info("Connected to gpt-realtime-whisper")
# Configure session for transcription
await self._send({
"type": "session.update",
"session": {
"input_audio_format": "pcm16",
"input_audio_transcription": {
"enabled": True,
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 600,
},
},
})
await self._on_ready()
# Listen for transcription events
while self._connected:
try:
raw = await conn.recv()
if isinstance(raw, (str, bytes)):
data = json.loads(raw if isinstance(raw, str) else raw.decode())
await self._handle_event(data)
except Exception as e:
if self._connected:
logger.warning(f"recv error: {e}")
break
except Exception as e:
logger.error(f"Connection error: {e}")
await self._on_error(str(e))
finally:
self._connected = False
self._conn = None
async def _handle_event(self, event):
"""Process events from gpt-realtime-whisper."""
event_type = getattr(event, "type", None) or (event.get("type") if isinstance(event, dict) else "")
if event_type == "input_audio_buffer.speech_started":
self._transcript_buffer = ""
elif event_type == "input_audio_buffer.speech_stopped":
if self._transcript_buffer.strip():
await self._process_transcript(self._transcript_buffer.strip())
self._transcript_buffer = ""
elif event_type == "input_audio_buffer.transcription_delta":
delta_text = self._get_field(event, "delta", "")
if delta_text:
self._transcript_buffer += delta_text
elif event_type == "conversation.item.created":
item = self._get_field(event, "item", {})
content = self._get_field(item, "content", [])
for part in (content or []):
part_type = self._get_field(part, "type", "")
part_transcript = self._get_field(part, "transcript", "")
if part_type == "transcript" and part_transcript:
self._transcript_buffer = part_transcript
await self._on_transcript_delta(part_transcript)
elif event_type == "error":
err = self._get_field(event, "error", {})
msg = self._get_field(err, "message", str(event))
logger.warning(f"Whisper error: {msg}")
async def _process_transcript(self, transcript: str):
"""Full utterance received. Call LLM, then TTS."""
await self._on_transcript_done(transcript)
logger.info(f"User: {transcript}")
# Build system prompt with optional memory context
system_content = KIRA_INSTRUCTIONS
if self._memory_suffix:
system_content += self._memory_suffix
# Call gpt-5.4-nano
try:
resp = await self._openai.chat.completions.create(
model="gpt-5.4-nano",
messages=[
{"role": "system", "content": system_content},
{"role": "user", "content": transcript},
],
max_tokens=300,
temperature=0.7,
)
kira_text = resp.choices[0].message.content or "Mhm, I'm here!"
except Exception as e:
logger.error(f"LLM error: {e}")
kira_text = "Sorry, let me try that again!"
await self._on_error(str(e))
logger.info(f"Kira: {kira_text}")
# Call TTS
await self._on_speech_start()
try:
tts_resp = await self._openai.audio.speech.create(
model="tts-1",
voice="nova",
input=kira_text,
response_format="opus",
)
audio_bytes = tts_resp.content
if audio_bytes:
await self._on_audio_delta(audio_bytes)
except Exception as e:
logger.error(f"TTS error: {e}")
await self._on_speech_end()
async def send_audio(self, pcm16_bytes: bytes):
"""Send PCM16 audio chunk for transcription."""
if not self._connected or not self._conn:
return
try:
audio_b64 = base64.b64encode(pcm16_bytes).decode("utf-8")
await self._send({
"type": "input_audio_buffer.append",
"audio": audio_b64,
})
except Exception as e:
logger.warning(f"Send audio error: {e}")
async def send_text(self, text: str):
"""Process text input directly (no transcription needed)."""
await self._process_transcript(text)
async def _send(self, data: dict):
try:
await self._conn.send(json.dumps(data))
except Exception as e:
logger.warning(f"Send error: {e}")
async def disconnect(self):
self._connected = False
if self._conn:
try:
await self._conn.close()
except Exception:
pass
self._conn = None
@staticmethod
def _get_field(obj, field: str, default=None):
"""Get a field from either an object or dict."""
if hasattr(obj, field):
return getattr(obj, field, default)
if isinstance(obj, dict):
return obj.get(field, default)
return default