191b7ad9b5
- URL now uses ?model=gpt-realtime-whisper (was invalid gpt-4o-mini-realtime-preview) - Cleaned session.update (removed modalities that may not apply) - Expanded _handle to catch input_audio_transcription.delta and .completed events - on_error now forwards transcription errors to frontend client - Per AUDIT + PLAN item 1
159 lines
5.6 KiB
Python
159 lines
5.6 KiB
Python
"""Realtime streaming transcription service via gpt-realtime-whisper.
|
|
|
|
Connects to OpenAI Realtime API via WebSocket, configures the session
|
|
for pure transcription (no model responses), and streams word-level
|
|
transcript deltas back. Full utterances are then processed by the
|
|
cheap LLM + TTS pipeline.
|
|
"""
|
|
|
|
import json
|
|
import base64
|
|
import logging
|
|
import asyncio
|
|
from typing import Callable, Awaitable
|
|
from config import settings
|
|
|
|
logger = logging.getLogger("kira.whisper")
|
|
|
|
|
|
class WhisperStream:
|
|
"""Streaming transcription via gpt-realtime-whisper over WebSocket."""
|
|
|
|
def __init__(
|
|
self,
|
|
on_transcript_delta: Callable[[str], Awaitable[None]],
|
|
on_transcript_done: Callable[[str], Awaitable[None]],
|
|
on_ready: Callable[[], Awaitable[None]],
|
|
on_error: Callable[[str], Awaitable[None]],
|
|
):
|
|
self._on_delta = on_transcript_delta
|
|
self._on_done = on_transcript_done
|
|
self._on_ready = on_ready
|
|
self._on_error = on_error
|
|
self._conn = None
|
|
self._connected = False
|
|
self._transcript = ""
|
|
|
|
async def connect(self):
|
|
if self._connected:
|
|
return
|
|
|
|
try:
|
|
import websockets
|
|
|
|
url = "wss://api.openai.com/v1/realtime?model=gpt-realtime-whisper"
|
|
ws = await websockets.connect(
|
|
url,
|
|
additional_headers={
|
|
"Authorization": f"Bearer {settings.openai_api_key}",
|
|
},
|
|
)
|
|
|
|
async with ws as conn:
|
|
self._conn = conn
|
|
self._connected = True
|
|
logger.info("Connected to Realtime transcription session")
|
|
|
|
# Configure: transcribe only with gpt-realtime-whisper, no model responses
|
|
await self._send({
|
|
"type": "session.update",
|
|
"session": {
|
|
"input_audio_format": "pcm16",
|
|
"input_audio_transcription": {
|
|
"model": "gpt-realtime-whisper",
|
|
"enabled": True,
|
|
},
|
|
"turn_detection": {
|
|
"type": "server_vad",
|
|
"threshold": 0.5,
|
|
"prefix_padding_ms": 300,
|
|
"silence_duration_ms": 600,
|
|
},
|
|
},
|
|
})
|
|
|
|
await self._on_ready()
|
|
|
|
while self._connected:
|
|
try:
|
|
raw = await conn.recv()
|
|
if isinstance(raw, (str, bytes)):
|
|
data = json.loads(raw if isinstance(raw, str) else raw.decode())
|
|
await self._handle(data)
|
|
except Exception as e:
|
|
if self._connected:
|
|
logger.warning(f"recv: {e}")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Whisper stream error: {e}")
|
|
await self._on_error(str(e))
|
|
finally:
|
|
self._connected = False
|
|
self._conn = None
|
|
|
|
async def _handle(self, data: dict):
|
|
et = data.get("type", "")
|
|
|
|
if et == "input_audio_buffer.speech_started":
|
|
self._transcript = ""
|
|
logger.debug("speech_started")
|
|
|
|
elif et in ("conversation.item.input_audio_transcription.delta", "input_audio_buffer.transcription.delta"):
|
|
# Partial streaming transcript
|
|
delta = data.get("delta", "") or data.get("transcript", "")
|
|
if delta:
|
|
self._transcript = delta # or append if cumulative
|
|
await self._on_delta(delta)
|
|
|
|
elif et in ("conversation.item.input_audio_transcription.completed", "input_audio_buffer.transcription.completed", "conversation.item.created"):
|
|
# Final or item created with transcript
|
|
item = data.get("item", {})
|
|
content = item.get("content", []) if item else []
|
|
transcript = data.get("transcript", "")
|
|
if not transcript:
|
|
for part in (content or []):
|
|
if part.get("type") in ("transcript", "text"):
|
|
transcript = part.get("transcript", "") or part.get("text", "")
|
|
break
|
|
if transcript:
|
|
self._transcript = transcript
|
|
await self._on_delta(transcript)
|
|
await self._on_done(transcript.strip())
|
|
self._transcript = ""
|
|
|
|
elif et == "input_audio_buffer.speech_stopped":
|
|
if self._transcript.strip():
|
|
await self._on_done(self._transcript.strip())
|
|
self._transcript = ""
|
|
|
|
elif et == "error":
|
|
err = data.get("error", {})
|
|
msg = err.get("message", str(data))
|
|
logger.warning(f"Whisper error: {msg}")
|
|
await self._on_error(msg)
|
|
|
|
async def send_audio(self, pcm16_bytes: bytes):
|
|
if not self._connected:
|
|
return
|
|
try:
|
|
b64 = base64.b64encode(pcm16_bytes).decode("utf-8")
|
|
await self._send({"type": "input_audio_buffer.append", "audio": b64})
|
|
except Exception as e:
|
|
logger.warning(f"send audio: {e}")
|
|
|
|
async def _send(self, data: dict):
|
|
try:
|
|
await self._conn.send(json.dumps(data))
|
|
except Exception as e:
|
|
logger.warning(f"send: {e}")
|
|
|
|
async def disconnect(self):
|
|
self._connected = False
|
|
if self._conn:
|
|
try:
|
|
await self._conn.close()
|
|
except Exception:
|
|
pass
|
|
self._conn = None
|