Files
hobokenchicken b06f20f9d8 chore: cleanup dead code and unused files
Deleted 7 unused frontend components (ChatBubble, PetZone, Live2DCat,
BackgroundScene, Clock, Toolbar, types/index.ts).
Deleted unused backend: whisper_stream.py, empty models/ and routers/ dirs.
Removed dead code: Message interface, messages/micError/sendText/addMessage,
conversation_text handler, 3 unused memory.py methods.
Fixed task persistence bug: added 'tasks' to DEFAULT_PREFERENCES.
Stale comment in Live2DStage updated.
2026-06-06 12:05:28 -04:00

423 lines
17 KiB
Python

"""Kira — AI body double backend
Gemini Live API (gemini-3.1-flash-live-preview) for real-time voice.
Task list management via Gemini function calling.
"""
import json
import uuid
import logging
import asyncio
import websockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from services.memory import kira_memory
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kira")
app = FastAPI(title="Kira Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
BASE_SYSTEM_PROMPT = (
"You are Kira, a warm, kind, and encouraging AI body double helping someone with ADHD stay focused and on task. "
"Tone & Voice: "
"Speak in a friendly, enthusiastic \"girly-pop\" tone (e.g., upbeat, supportive, using words like \"love that,\" \"you got this,\" or \"perfect\"). Keep it natural, not overly dramatic or repetitive. "
"You are speaking out loud via voice. Keep your responses very short (ideally 1-3 sentences) to maintain a natural conversational flow. "
"Use emojis sparingly in text formatting, but focus primarily on how it sounds out loud. Never be judgmental. "
"ADHD Coaching Style: "
"Actively check in on the user's progress. "
"Gently remind them to take breaks when appropriate. "
"Celebrate small wins enthusiastically (e.g., finishing even a tiny part of a task). "
"Task List Management: "
"You have tools to manage a task list. When the user mentions tasks, todos, or things to remember, use the tools immediately. "
"Always briefly confirm when you add, update, or remove something (e.g., \"Got it! Added 'do laundry' to your list.\"). "
"When asked what's on the list, read it back to them clearly and concisely. If the list is long, mention the top 3 items first."
)
GEMINI_WS_URL = "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent"
GEMINI_MODEL = "models/gemini-3.1-flash-live-preview"
# ── Gemini tool declarations ──
TOOLS = [
{
"functionDeclarations": [
{
"name": "add_task",
"description": "Add a new task to the user's task list.",
"parameters": {
"type": "OBJECT",
"properties": {
"text": {
"type": "STRING",
"description": "The task description.",
}
},
"required": ["text"],
},
},
{
"name": "remove_task",
"description": "Remove a task from the list by its ID.",
"parameters": {
"type": "OBJECT",
"properties": {
"task_id": {
"type": "STRING",
"description": "The ID of the task to remove.",
}
},
"required": ["task_id"],
},
},
{
"name": "complete_task",
"description": "Mark a task as completed.",
"parameters": {
"type": "OBJECT",
"properties": {
"task_id": {
"type": "STRING",
"description": "The ID of the task to mark complete.",
}
},
"required": ["task_id"],
},
},
{
"name": "get_tasks",
"description": "Get the current task list.",
"parameters": {
"type": "OBJECT",
"properties": {},
},
},
{
"name": "clear_completed_tasks",
"description": "Remove all completed tasks from the list.",
"parameters": {
"type": "OBJECT",
"properties": {},
},
},
]
}
]
def execute_tool(name: str, args: dict, tasks: list[dict]) -> dict:
"""Execute a tool call and return the result."""
if name == "add_task":
text = args.get("text", "").strip()
if not text:
return {"error": "Task text cannot be empty."}
task = {"id": str(uuid.uuid4())[:8], "text": text, "completed": False}
tasks.append(task)
return {"status": "added", "task": task, "total": len(tasks)}
elif name == "remove_task":
task_id = args.get("task_id", "")
for i, t in enumerate(tasks):
if t["id"] == task_id:
removed = tasks.pop(i)
return {"status": "removed", "task": removed, "total": len(tasks)}
return {"error": f"Task {task_id} not found."}
elif name == "complete_task":
task_id = args.get("task_id", "")
for t in tasks:
if t["id"] == task_id:
t["completed"] = True
return {"status": "completed", "task": t}
return {"error": f"Task {task_id} not found."}
elif name == "get_tasks":
return {"tasks": tasks, "total": len(tasks)}
elif name == "clear_completed_tasks":
before = len(tasks)
tasks[:] = [t for t in tasks if not t["completed"]]
return {"status": "cleared", "removed": before - len(tasks), "total": len(tasks)}
return {"error": f"Unknown tool: {name}"}
@app.on_event("startup")
async def startup():
if kira_memory.init():
logger.info("Honcho memory initialized")
else:
logger.info("Honcho memory not configured")
@app.get("/api/health")
async def health():
mem_status = "active" if kira_memory.enabled else "disabled"
return {"status": "ok", "name": "kira", "memory": mem_status}
@app.websocket("/api/ws")
async def gemini_voice_ws(websocket: WebSocket):
"""WebSocket proxy between frontend and Gemini Live API.
Protocol (frontend <-> this proxy):
-> {"type": "audio", "data": "<base64 PCM16 16kHz>"}
-> {"type": "conversation_text", "text": "..."}
-> {"type": "identify", "user_id": "...", "name": "..."}
-> {"type": "ping"}
<- {"type": "audio", "data": "<base64 PCM16 24kHz>"}
<- {"type": "transcript", "role": "user"|"kira", "text": "..."}
<- {"type": "turn_complete"}
<- {"type": "interrupted"}
<- {"type": "tasks", "tasks": [...]}
<- {"type": "error", "message": "..."}
"""
await websocket.accept()
session_id = str(uuid.uuid4())[:8]
user_id = "default-user"
memory_suffix = ""
tasks: list[dict] = []
logger.info(f"[{session_id}] WebSocket connected")
gemini_ws = None
async def send_tasks_to_frontend():
"""Push current task list to frontend."""
await websocket.send_json({"type": "tasks", "tasks": tasks})
try:
# ── Connect to Gemini Live API ──
gemini_url = f"{GEMINI_WS_URL}?key={settings.gemini_api_key}"
gemini_ws = await websockets.connect(gemini_url, max_size=2**24)
# ── Send setup with tools ──
system_prompt = BASE_SYSTEM_PROMPT
setup_msg = {
"setup": {
"model": GEMINI_MODEL,
"generationConfig": {
"responseModalities": ["AUDIO"],
"speechConfig": {
"voiceConfig": {
"prebuiltVoiceConfig": {
"voiceName": "Laomedeia"
}
}
},
},
"systemInstruction": {
"parts": [{"text": system_prompt}]
},
"tools": TOOLS,
}
}
await gemini_ws.send(json.dumps(setup_msg))
logger.info(f"[{session_id}] Connected to Gemini Live API")
# Wait for setup complete
raw = await asyncio.wait_for(gemini_ws.recv(), timeout=10)
setup_resp = json.loads(raw)
if "setupComplete" in setup_resp:
logger.info(f"[{session_id}] Gemini setup complete")
else:
logger.warning(f"[{session_id}] Unexpected setup response: {list(setup_resp.keys())}")
# ── Gemini -> Frontend relay ──
async def relay_gemini():
try:
async for raw in gemini_ws:
msg = json.loads(raw)
if "serverContent" in msg:
sc = msg["serverContent"]
model_turn = sc.get("modelTurn", {})
parts = model_turn.get("parts", [])
for part in parts:
if "text" in part:
await websocket.send_json({
"type": "transcript",
"role": "kira",
"text": part["text"],
})
if "inlineData" in part:
audio_data = part["inlineData"].get("data", "")
if audio_data:
await websocket.send_json({
"type": "audio",
"data": audio_data,
})
if sc.get("turnComplete"):
await websocket.send_json({"type": "turn_complete"})
if sc.get("interrupted"):
await websocket.send_json({"type": "interrupted"})
elif "toolCall" in msg:
# Execute tool calls and send responses back
tool_call = msg["toolCall"]
function_calls = tool_call.get("functionCalls", [])
tool_results = []
for fc in function_calls:
call_id = fc.get("id", "")
fn_name = fc.get("name", "")
fn_args = fc.get("args", {})
logger.info(f"[{session_id}] Tool call: {fn_name}({fn_args})")
result = execute_tool(fn_name, fn_args, tasks)
tool_results.append({
"id": call_id,
"name": fn_name,
"response": result,
})
# Push updated task list to frontend after any mutation
if fn_name in ("add_task", "remove_task", "complete_task", "clear_completed_tasks"):
await send_tasks_to_frontend()
# Persist to Honcho
try:
if kira_memory.enabled:
kira_memory.set_user_preference(user_id, "tasks", json.dumps(tasks))
except Exception:
pass
# Send tool response back to Gemini
if tool_results:
resp = {"toolResponse": {"functionResponses": tool_results}}
await gemini_ws.send(json.dumps(resp))
logger.info(f"[{session_id}] Sent {len(tool_results)} tool responses")
elif "toolCallCancellation" in msg:
pass
elif "error" in msg:
err = msg["error"]
logger.error(f"[{session_id}] Gemini error: {err}")
await websocket.send_json({
"type": "error",
"message": str(err.get("message", err)),
})
except websockets.exceptions.ConnectionClosed:
logger.info(f"[{session_id}] Gemini WS closed")
except Exception as e:
logger.error(f"[{session_id}] Gemini relay error: {e}")
# ── Frontend -> Gemini relay ──
async def relay_frontend():
nonlocal user_id, memory_suffix
try:
while True:
raw = await websocket.receive_text()
msg = json.loads(raw)
msg_type = msg.get("type", "")
if msg_type == "identify":
user_id = msg.get("user_id", "default-user").strip()
user_name = msg.get("name", "").strip()
try:
if user_name and user_id:
kira_memory.set_user_preference(user_id, "name", user_name)
prefs = kira_memory.get_user_preferences(user_id)
if kira_memory.enabled:
kira_memory.ensure_peers(user_id)
kira_memory.ensure_session(session_id)
ctx = kira_memory.build_system_prompt_suffix()
if ctx:
memory_suffix = ctx
except Exception as e:
logger.warning(f"Honcho error during identify: {e}")
prefs = {}
# Load tasks from Honcho
try:
saved_tasks = prefs.get("tasks", "")
if saved_tasks:
tasks.clear()
tasks.extend(json.loads(saved_tasks))
except Exception:
pass
await websocket.send_json({
"type": "identified",
"user_id": user_id,
"preferences": prefs,
})
# Send current tasks on identify
await send_tasks_to_frontend()
continue
if msg_type == "set_preference":
key = msg.get("key", "").strip()
value = msg.get("value", "").strip()
try:
if key and user_id and user_id != "default-user":
kira_memory.set_user_preference(user_id, key, value)
except Exception:
pass
await websocket.send_json({"type": "preference_saved", "key": key, "success": True})
continue
if msg_type == "audio":
audio_b64 = msg.get("data", "")
if audio_b64 and gemini_ws and gemini_ws.state.name == "OPEN":
gemini_msg = {
"realtimeInput": {
"audio": {
"mimeType": "audio/pcm;rate=16000",
"data": audio_b64,
}
}
}
await gemini_ws.send(json.dumps(gemini_msg))
continue
if msg_type == "ping":
await websocket.send_json({"type": "pong"})
if msg_type == "add_task":
text = msg.get("text", "").strip()
if text:
result = execute_tool("add_task", {"text": text}, tasks)
await send_tasks_to_frontend()
# Persist to Honcho
try:
if kira_memory.enabled:
kira_memory.set_user_preference(user_id, "tasks", json.dumps(tasks))
except Exception:
pass
continue
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"[{session_id}] Frontend relay error: {e}")
gemini_task = asyncio.create_task(relay_gemini())
frontend_task = asyncio.create_task(relay_frontend())
done, pending = await asyncio.wait(
[gemini_task, frontend_task],
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
except Exception as e:
logger.error(f"[{session_id}] Connection error: {e}")
finally:
if gemini_ws:
await gemini_ws.close()
logger.info(f"[{session_id}] Disconnected")