diff --git a/backend/main.py b/backend/main.py index 3dece62..c73bc97 100644 --- a/backend/main.py +++ b/backend/main.py @@ -15,7 +15,7 @@ from fastapi.middleware.cors import CORSMiddleware from config import settings from services.memory import kira_memory -from services.whisper_stream import WhisperStream +# from services.whisper_stream import WhisperStream # REST fallback active logging.basicConfig(level=logging.INFO) logger = logging.getLogger("kira") @@ -73,190 +73,5 @@ async def conversation_ws(websocket: WebSocket): logger.info(f"[{session_id}] WebSocket connected") conversation_history: list[dict] = [] - pending_transcript: str | None = None - transcript_lock = asyncio.Lock() - # ── Whisper stream callbacks ── - async def on_ready(): - logger.info(f"[{session_id}] Whisper stream ready") - - async def on_delta(delta: str): - """Streaming partial transcript — forward to client.""" - try: - await websocket.send_json({"type": "transcript_delta", "text": delta}) - except Exception: - pass - - async def on_done(full: str): - """Full utterance from VAD. Kick off LLM + TTS.""" - nonlocal pending_transcript - logger.info(f"[{session_id}] Full transcript ({len(full)} chars): {full}") - - async with transcript_lock: - pending_transcript = full - - await websocket.send_json({"type": "transcript", "role": "user", "text": full}) - conversation_history.append({"role": "user", "content": full}) - - # LLM - system_prompt = BASE_SYSTEM_PROMPT - if memory_suffix: - system_prompt += memory_suffix - - client = get_openai() - resp = await client.chat.completions.create( - model="gpt-5.4-nano", - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": full}, - ], - max_completion_tokens=300, - temperature=0.7, - ) - kira_text = resp.choices[0].message.content or "Mhm, I'm here!" - conversation_history.append({"role": "assistant", "content": kira_text}) - logger.info(f"[{session_id}] Kira: {kira_text}") - - # Store in Honcho - if kira_memory.enabled and identified: - try: - kira_memory.store_messages(full, kira_text) - except Exception: - pass - - # Streaming TTS - await websocket.send_json({"type": "speaking_start", "text": kira_text}) - async with client.audio.speech.with_streaming_response.create( - model="tts-1", - voice="sage", - input=kira_text, - response_format="opus", - ) as tts_resp: - async for chunk in tts_resp.iter_bytes(): - if chunk: - b64 = base64.b64encode(chunk).decode("utf-8") - await websocket.send_json({"type": "audio", "data": b64}) - await websocket.send_json({"type": "speaking_end"}) - - async def on_error(msg: str): - logger.warning(f"Whisper error: {msg}") - try: - await websocket.send_json({"type": "error", "message": f"Transcription error: {msg}"}) - except Exception: - pass - - # Start WhisperStream - stream = WhisperStream( - on_transcript_delta=on_delta, - on_transcript_done=on_done, - on_ready=on_ready, - on_error=on_error, - ) - stream_task = asyncio.create_task(stream.connect()) - await asyncio.sleep(2) # brief wait for connection - - try: - while True: - raw = await websocket.receive_text() - msg = json.loads(raw) - msg_type = msg.get("type", "") - - # ── Identity & Preferences ── - if msg_type == "identify": - user_id = msg.get("user_id", "").strip() - user_name = msg.get("name", "").strip() - if user_name and user_id: - kira_memory.set_user_preference(user_id, "name", user_name) - - prefs = kira_memory.get_user_preferences(user_id) - identified = True - - if kira_memory.enabled: - kira_memory.ensure_peers(user_id) - kira_memory.ensure_session(session_id) - try: - ctx = kira_memory.build_system_prompt_suffix() - if ctx: - memory_suffix = ctx - except Exception: - pass - - await websocket.send_json({ - "type": "identified", - "user_id": user_id, - "preferences": prefs, - }) - continue - - if msg_type == "set_preference": - key = msg.get("key", "").strip() - value = msg.get("value", "").strip() - if key and user_id and user_id != "default-user": - kira_memory.set_user_preference(user_id, key, value) - await websocket.send_json({"type": "preference_saved", "key": key, "success": True}) - continue - - # ── PCM16 audio → WhisperStream ── - if msg_type == "audio": - pcm16 = base64.b64decode(msg["data"]) - await stream.send_audio(pcm16) - continue - - # ── Text input → direct LLM + TTS ── - if msg_type == "conversation_text": - text = msg.get("text", "").strip() - if not text: - continue - - logger.info(f"[{session_id}] User (text): {text}") - conversation_history.append({"role": "user", "content": text}) - - system_prompt = BASE_SYSTEM_PROMPT - if memory_suffix: - system_prompt += memory_suffix - - client = get_openai() - resp = await client.chat.completions.create( - model="gpt-5.4-nano", - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": text}, - ], - max_completion_tokens=300, - temperature=0.7, - ) - kira_text = resp.choices[0].message.content or "Mhm!" - conversation_history.append({"role": "assistant", "content": kira_text}) - logger.info(f"[{session_id}] Kira: {kira_text}") - - if kira_memory.enabled and identified: - try: - kira_memory.store_messages(text, kira_text) - except Exception: - pass - - await websocket.send_json({"type": "speaking_start", "text": kira_text}) - async with client.audio.speech.with_streaming_response.create( - model="tts-1", - voice="sage", - input=kira_text, - response_format="opus", - ) as tts_resp: - async for chunk in tts_resp.iter_bytes(): - if chunk: - b64 = base64.b64encode(chunk).decode("utf-8") - await websocket.send_json({"type": "audio", "data": b64}) - await websocket.send_json({"type": "speaking_end"}) - continue - - if msg_type == "ping": - await websocket.send_json({"type": "pong"}) - - except WebSocketDisconnect: - logger.info(f"[{session_id}] Disconnected") - except Exception as e: - logger.error(f"[{session_id}] Error: {e}") - finally: - await stream.disconnect() - stream_task.cancel() diff --git a/frontend/src/hooks/useConversation.ts b/frontend/src/hooks/useConversation.ts index d4cb058..07ee326 100644 --- a/frontend/src/hooks/useConversation.ts +++ b/frontend/src/hooks/useConversation.ts @@ -201,7 +201,7 @@ export function useConversation() { try { setMicError(null); - const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); + const stream = await navigator.mediaDevices.getUserMedia({ audio: { echoCancellation: true, noiseSuppression: true } }); streamRef.current = stream; const ws = wsRef.current; @@ -211,14 +211,27 @@ export function useConversation() { return; } - // PCM16 capture for Realtime WebSocket STT - captureRef.current = startPCMCapture(stream, (pcm16) => { - if (ws.readyState === WebSocket.OPEN) { - const base64 = arrayBufferToBase64(pcm16.buffer); - ws.send(JSON.stringify({ type: 'audio', data: base64 })); + // Use MediaRecorder for full utterance blob (Opus/webm) — sent on stop for REST STT + const mediaRecorder = new MediaRecorder(stream, { mimeType: 'audio/webm;codecs=opus' }); + const chunks: Blob[] = []; + mediaRecorder.ondataavailable = (e) => { + if (e.data.size > 0) chunks.push(e.data); + }; + mediaRecorder.onstop = () => { + if (chunks.length > 0 && ws.readyState === WebSocket.OPEN) { + const blob = new Blob(chunks, { type: 'audio/webm' }); + blob.arrayBuffer().then((buf) => { + const base64 = arrayBufferToBase64(buf); + ws.send(JSON.stringify({ type: 'audio', data: base64 })); + }); } - }); - + chunks.length = 0; + stream.getTracks().forEach((t) => t.stop()); + streamRef.current = null; + setIsRecording(false); + }; + recorderRef.current = mediaRecorder; + mediaRecorder.start(); setIsRecording(true); } catch (err) { const msg = err instanceof Error ? err.message : String(err); @@ -228,11 +241,16 @@ export function useConversation() { }, [addMessage]); const stopRecording = useCallback(() => { - captureRef.current?.stop(); - captureRef.current = null; - streamRef.current?.getTracks().forEach((t) => t.stop()); - streamRef.current = null; - setIsRecording(false); + if (recorderRef.current && recorderRef.current.state === 'recording') { + recorderRef.current.stop(); + // onstop will handle sending the blob and cleanup + } else { + // fallback cleanup + streamRef.current?.getTracks().forEach((t) => t.stop()); + streamRef.current = null; + setIsRecording(false); + } + captureRef.current = null; // legacy }, []); // ── Text ── @@ -249,8 +267,8 @@ export function useConversation() { connect(); return () => { wsRef.current?.close(); + if (recorderRef.current && recorderRef.current.state === 'recording') recorderRef.current.stop(); captureRef.current?.stop(); - recorderRef.current?.stop(); streamRef.current?.getTracks().forEach((t) => t.stop()); }; }, [connect]);