b097d58f13
Manual task input box in TaskList component. Tasks persisted to Honcho as JSON preference on every mutation. Tasks loaded from Honcho on identify (session reconnect).
443 lines
18 KiB
Python
443 lines
18 KiB
Python
"""Kira — AI body double backend
|
|
|
|
Gemini Live API (gemini-3.1-flash-live-preview) for real-time voice.
|
|
Task list management via Gemini function calling.
|
|
"""
|
|
|
|
import json
|
|
import uuid
|
|
import logging
|
|
import asyncio
|
|
|
|
import websockets
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
|
from config import settings
|
|
from services.memory import kira_memory
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("kira")
|
|
|
|
app = FastAPI(title="Kira Backend")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
BASE_SYSTEM_PROMPT = (
|
|
"You are Kira, a warm, kind, and encouraging AI body double. "
|
|
"You speak in a friendly, girly-pop tone. You are helping someone with ADHD "
|
|
"stay focused and on task. Keep responses short, supportive, and uplifting. "
|
|
"Check in on them. Remind them to take breaks. Celebrate small wins. "
|
|
"Use occasional emoji but don't overdo it. Never be judgmental. "
|
|
"You are speaking out loud via voice, so keep natural conversational flow. "
|
|
"You have tools to manage a task list for the user. When they mention tasks, "
|
|
"todos, things to do, or things to remember, use the task tools to help. "
|
|
"Always confirm when you add or remove something. When asked what's on the "
|
|
"list, read it back to them."
|
|
)
|
|
|
|
GEMINI_WS_URL = "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent"
|
|
GEMINI_MODEL = "models/gemini-3.1-flash-live-preview"
|
|
|
|
# ── Gemini tool declarations ──
|
|
TOOLS = [
|
|
{
|
|
"functionDeclarations": [
|
|
{
|
|
"name": "add_task",
|
|
"description": "Add a new task to the user's task list.",
|
|
"parameters": {
|
|
"type": "OBJECT",
|
|
"properties": {
|
|
"text": {
|
|
"type": "STRING",
|
|
"description": "The task description.",
|
|
}
|
|
},
|
|
"required": ["text"],
|
|
},
|
|
},
|
|
{
|
|
"name": "remove_task",
|
|
"description": "Remove a task from the list by its ID.",
|
|
"parameters": {
|
|
"type": "OBJECT",
|
|
"properties": {
|
|
"task_id": {
|
|
"type": "STRING",
|
|
"description": "The ID of the task to remove.",
|
|
}
|
|
},
|
|
"required": ["task_id"],
|
|
},
|
|
},
|
|
{
|
|
"name": "complete_task",
|
|
"description": "Mark a task as completed.",
|
|
"parameters": {
|
|
"type": "OBJECT",
|
|
"properties": {
|
|
"task_id": {
|
|
"type": "STRING",
|
|
"description": "The ID of the task to mark complete.",
|
|
}
|
|
},
|
|
"required": ["task_id"],
|
|
},
|
|
},
|
|
{
|
|
"name": "get_tasks",
|
|
"description": "Get the current task list.",
|
|
"parameters": {
|
|
"type": "OBJECT",
|
|
"properties": {},
|
|
},
|
|
},
|
|
{
|
|
"name": "clear_completed_tasks",
|
|
"description": "Remove all completed tasks from the list.",
|
|
"parameters": {
|
|
"type": "OBJECT",
|
|
"properties": {},
|
|
},
|
|
},
|
|
]
|
|
}
|
|
]
|
|
|
|
|
|
def execute_tool(name: str, args: dict, tasks: list[dict]) -> dict:
|
|
"""Execute a tool call and return the result."""
|
|
if name == "add_task":
|
|
text = args.get("text", "").strip()
|
|
if not text:
|
|
return {"error": "Task text cannot be empty."}
|
|
task = {"id": str(uuid.uuid4())[:8], "text": text, "completed": False}
|
|
tasks.append(task)
|
|
return {"status": "added", "task": task, "total": len(tasks)}
|
|
|
|
elif name == "remove_task":
|
|
task_id = args.get("task_id", "")
|
|
for i, t in enumerate(tasks):
|
|
if t["id"] == task_id:
|
|
removed = tasks.pop(i)
|
|
return {"status": "removed", "task": removed, "total": len(tasks)}
|
|
return {"error": f"Task {task_id} not found."}
|
|
|
|
elif name == "complete_task":
|
|
task_id = args.get("task_id", "")
|
|
for t in tasks:
|
|
if t["id"] == task_id:
|
|
t["completed"] = True
|
|
return {"status": "completed", "task": t}
|
|
return {"error": f"Task {task_id} not found."}
|
|
|
|
elif name == "get_tasks":
|
|
return {"tasks": tasks, "total": len(tasks)}
|
|
|
|
elif name == "clear_completed_tasks":
|
|
before = len(tasks)
|
|
tasks[:] = [t for t in tasks if not t["completed"]]
|
|
return {"status": "cleared", "removed": before - len(tasks), "total": len(tasks)}
|
|
|
|
return {"error": f"Unknown tool: {name}"}
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
if kira_memory.init():
|
|
logger.info("Honcho memory initialized")
|
|
else:
|
|
logger.info("Honcho memory not configured")
|
|
|
|
|
|
@app.get("/api/health")
|
|
async def health():
|
|
mem_status = "active" if kira_memory.enabled else "disabled"
|
|
return {"status": "ok", "name": "kira", "memory": mem_status}
|
|
|
|
|
|
@app.websocket("/api/ws")
|
|
async def gemini_voice_ws(websocket: WebSocket):
|
|
"""WebSocket proxy between frontend and Gemini Live API.
|
|
|
|
Protocol (frontend <-> this proxy):
|
|
-> {"type": "audio", "data": "<base64 PCM16 16kHz>"}
|
|
-> {"type": "conversation_text", "text": "..."}
|
|
-> {"type": "identify", "user_id": "...", "name": "..."}
|
|
-> {"type": "ping"}
|
|
<- {"type": "audio", "data": "<base64 PCM16 24kHz>"}
|
|
<- {"type": "transcript", "role": "user"|"kira", "text": "..."}
|
|
<- {"type": "turn_complete"}
|
|
<- {"type": "interrupted"}
|
|
<- {"type": "tasks", "tasks": [...]}
|
|
<- {"type": "error", "message": "..."}
|
|
"""
|
|
await websocket.accept()
|
|
session_id = str(uuid.uuid4())[:8]
|
|
user_id = "default-user"
|
|
memory_suffix = ""
|
|
tasks: list[dict] = []
|
|
logger.info(f"[{session_id}] WebSocket connected")
|
|
|
|
gemini_ws = None
|
|
|
|
async def send_tasks_to_frontend():
|
|
"""Push current task list to frontend."""
|
|
await websocket.send_json({"type": "tasks", "tasks": tasks})
|
|
|
|
try:
|
|
# ── Connect to Gemini Live API ──
|
|
gemini_url = f"{GEMINI_WS_URL}?key={settings.gemini_api_key}"
|
|
gemini_ws = await websockets.connect(gemini_url, max_size=2**24)
|
|
|
|
# ── Send setup with tools ──
|
|
system_prompt = BASE_SYSTEM_PROMPT
|
|
setup_msg = {
|
|
"setup": {
|
|
"model": GEMINI_MODEL,
|
|
"generationConfig": {
|
|
"responseModalities": ["AUDIO"],
|
|
"speechConfig": {
|
|
"voiceConfig": {
|
|
"prebuiltVoiceConfig": {
|
|
"voiceName": "Laomedeia"
|
|
}
|
|
}
|
|
},
|
|
},
|
|
"systemInstruction": {
|
|
"parts": [{"text": system_prompt}]
|
|
},
|
|
"tools": TOOLS,
|
|
}
|
|
}
|
|
await gemini_ws.send(json.dumps(setup_msg))
|
|
logger.info(f"[{session_id}] Connected to Gemini Live API")
|
|
|
|
# Wait for setup complete
|
|
raw = await asyncio.wait_for(gemini_ws.recv(), timeout=10)
|
|
setup_resp = json.loads(raw)
|
|
if "setupComplete" in setup_resp:
|
|
logger.info(f"[{session_id}] Gemini setup complete")
|
|
else:
|
|
logger.warning(f"[{session_id}] Unexpected setup response: {list(setup_resp.keys())}")
|
|
|
|
# ── Gemini -> Frontend relay ──
|
|
async def relay_gemini():
|
|
try:
|
|
async for raw in gemini_ws:
|
|
msg = json.loads(raw)
|
|
|
|
if "serverContent" in msg:
|
|
sc = msg["serverContent"]
|
|
model_turn = sc.get("modelTurn", {})
|
|
parts = model_turn.get("parts", [])
|
|
|
|
for part in parts:
|
|
if "text" in part:
|
|
await websocket.send_json({
|
|
"type": "transcript",
|
|
"role": "kira",
|
|
"text": part["text"],
|
|
})
|
|
|
|
if "inlineData" in part:
|
|
audio_data = part["inlineData"].get("data", "")
|
|
if audio_data:
|
|
await websocket.send_json({
|
|
"type": "audio",
|
|
"data": audio_data,
|
|
})
|
|
|
|
if sc.get("turnComplete"):
|
|
await websocket.send_json({"type": "turn_complete"})
|
|
|
|
if sc.get("interrupted"):
|
|
await websocket.send_json({"type": "interrupted"})
|
|
|
|
elif "toolCall" in msg:
|
|
# Execute tool calls and send responses back
|
|
tool_call = msg["toolCall"]
|
|
function_calls = tool_call.get("functionCalls", [])
|
|
tool_results = []
|
|
|
|
for fc in function_calls:
|
|
call_id = fc.get("id", "")
|
|
fn_name = fc.get("name", "")
|
|
fn_args = fc.get("args", {})
|
|
logger.info(f"[{session_id}] Tool call: {fn_name}({fn_args})")
|
|
|
|
result = execute_tool(fn_name, fn_args, tasks)
|
|
tool_results.append({
|
|
"id": call_id,
|
|
"name": fn_name,
|
|
"response": result,
|
|
})
|
|
|
|
# Push updated task list to frontend after any mutation
|
|
if fn_name in ("add_task", "remove_task", "complete_task", "clear_completed_tasks"):
|
|
await send_tasks_to_frontend()
|
|
# Persist to Honcho
|
|
try:
|
|
if kira_memory.enabled:
|
|
kira_memory.set_user_preference(user_id, "tasks", json.dumps(tasks))
|
|
except Exception:
|
|
pass
|
|
|
|
# Send tool response back to Gemini
|
|
if tool_results:
|
|
resp = {"toolResponse": {"functionResponses": tool_results}}
|
|
await gemini_ws.send(json.dumps(resp))
|
|
logger.info(f"[{session_id}] Sent {len(tool_results)} tool responses")
|
|
|
|
elif "toolCallCancellation" in msg:
|
|
pass
|
|
|
|
elif "error" in msg:
|
|
err = msg["error"]
|
|
logger.error(f"[{session_id}] Gemini error: {err}")
|
|
await websocket.send_json({
|
|
"type": "error",
|
|
"message": str(err.get("message", err)),
|
|
})
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
logger.info(f"[{session_id}] Gemini WS closed")
|
|
except Exception as e:
|
|
logger.error(f"[{session_id}] Gemini relay error: {e}")
|
|
|
|
# ── Frontend -> Gemini relay ──
|
|
async def relay_frontend():
|
|
nonlocal user_id, memory_suffix
|
|
try:
|
|
while True:
|
|
raw = await websocket.receive_text()
|
|
msg = json.loads(raw)
|
|
msg_type = msg.get("type", "")
|
|
|
|
if msg_type == "identify":
|
|
user_id = msg.get("user_id", "default-user").strip()
|
|
user_name = msg.get("name", "").strip()
|
|
try:
|
|
if user_name and user_id:
|
|
kira_memory.set_user_preference(user_id, "name", user_name)
|
|
prefs = kira_memory.get_user_preferences(user_id)
|
|
if kira_memory.enabled:
|
|
kira_memory.ensure_peers(user_id)
|
|
kira_memory.ensure_session(session_id)
|
|
ctx = kira_memory.build_system_prompt_suffix()
|
|
if ctx:
|
|
memory_suffix = ctx
|
|
except Exception as e:
|
|
logger.warning(f"Honcho error during identify: {e}")
|
|
prefs = {}
|
|
# Load tasks from Honcho
|
|
try:
|
|
saved_tasks = prefs.get("tasks", "")
|
|
if saved_tasks:
|
|
tasks.clear()
|
|
tasks.extend(json.loads(saved_tasks))
|
|
except Exception:
|
|
pass
|
|
await websocket.send_json({
|
|
"type": "identified",
|
|
"user_id": user_id,
|
|
"preferences": prefs,
|
|
})
|
|
# Send current tasks on identify
|
|
await send_tasks_to_frontend()
|
|
continue
|
|
|
|
if msg_type == "set_preference":
|
|
key = msg.get("key", "").strip()
|
|
value = msg.get("value", "").strip()
|
|
try:
|
|
if key and user_id and user_id != "default-user":
|
|
kira_memory.set_user_preference(user_id, key, value)
|
|
except Exception:
|
|
pass
|
|
await websocket.send_json({"type": "preference_saved", "key": key, "success": True})
|
|
continue
|
|
|
|
if msg_type == "audio":
|
|
audio_b64 = msg.get("data", "")
|
|
if audio_b64 and gemini_ws and gemini_ws.state.name == "OPEN":
|
|
gemini_msg = {
|
|
"realtimeInput": {
|
|
"audio": {
|
|
"mimeType": "audio/pcm;rate=16000",
|
|
"data": audio_b64,
|
|
}
|
|
}
|
|
}
|
|
await gemini_ws.send(json.dumps(gemini_msg))
|
|
continue
|
|
|
|
if msg_type == "conversation_text":
|
|
text = msg.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
logger.info(f"[{session_id}] User (text): {text}")
|
|
if gemini_ws and gemini_ws.state.name == "OPEN":
|
|
user_part = {"text": text}
|
|
if memory_suffix:
|
|
user_part = {"text": f"[Context: {memory_suffix}]\n{text}"}
|
|
gemini_msg = {
|
|
"clientContent": {
|
|
"turns": [{"role": "user", "parts": [user_part]}],
|
|
"turnComplete": True,
|
|
}
|
|
}
|
|
await gemini_ws.send(json.dumps(gemini_msg))
|
|
await websocket.send_json({
|
|
"type": "transcript",
|
|
"role": "user",
|
|
"text": text,
|
|
})
|
|
continue
|
|
|
|
if msg_type == "ping":
|
|
await websocket.send_json({"type": "pong"})
|
|
|
|
if msg_type == "add_task":
|
|
text = msg.get("text", "").strip()
|
|
if text:
|
|
result = execute_tool("add_task", {"text": text}, tasks)
|
|
await send_tasks_to_frontend()
|
|
# Persist to Honcho
|
|
try:
|
|
if kira_memory.enabled:
|
|
kira_memory.set_user_preference(user_id, "tasks", json.dumps(tasks))
|
|
except Exception:
|
|
pass
|
|
continue
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"[{session_id}] Frontend relay error: {e}")
|
|
|
|
gemini_task = asyncio.create_task(relay_gemini())
|
|
frontend_task = asyncio.create_task(relay_frontend())
|
|
|
|
done, pending = await asyncio.wait(
|
|
[gemini_task, frontend_task],
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
for t in pending:
|
|
t.cancel()
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{session_id}] Connection error: {e}")
|
|
finally:
|
|
if gemini_ws:
|
|
await gemini_ws.close()
|
|
logger.info(f"[{session_id}] Disconnected")
|