"""chat.saiden.dev — TUI-styled web chat with BT-7274. Single-file FastAPI app: - `/` → branded chat shell (auth-gated) - `/auth/login` → kick off Google OAuth - `/auth/callback` → finish OAuth, set session - `/auth/logout` → clear session - `/ws` → WebSocket; client sends {role:"user", content:str}, server streams {role:"assistant", delta:str, done:bool} - `/api/persona` → POST {slug, voice, backend} — bind persona via sidecar - `/api/persona/current` → GET — returns current binding for this session """ from __future__ import annotations import asyncio import json import logging import os import secrets from pathlib import Path from typing import Any import httpx from authlib.integrations.starlette_client import OAuth from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel from starlette.middleware.sessions import SessionMiddleware from app.tts import TTS from app.stt import STT from app import cart_store, calibration, marauder_cart, memory from fastapi import UploadFile, File # -------------------------------------------------------------------------- env # Tiny .env reader — python-dotenv hangs on Python 3.14 in this venv. def _load_env_file(filename: str = ".env") -> None: p = Path(filename) if not p.exists(): return for raw in p.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, val = line.partition("=") key = key.strip() val = val.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = val _load_env_file() # Preview mode: skip OAuth + upstream API. Use mock streams. For UI iteration only. PREVIEW_MODE = os.environ.get("PREVIEW_MODE", "").lower() in ("1", "true", "yes") # -------------------------------------------------------------------------- opencode transport # opencode API — uses the opencode server HTTP API (POST /session/:id/message). OPENCODE_URL = os.environ.get("OPENCODE_URL", "http://sin:4096").rstrip("/") OPENCODE_PASSWORD = os.environ.get("OPENCODE_PASSWORD", "") # Model to use for opencode sessions. OPENCODE_MODEL = os.environ.get("OPENCODE_MODEL", "cyankiwi/qwen3-coder-next:awq") OPENCODE_PROVIDER = os.environ.get("OPENCODE_PROVIDER", "vllm") if not PREVIEW_MODE and not OPENCODE_PASSWORD: raise RuntimeError("OPENCODE_PASSWORD not set (set PREVIEW_MODE=1 to bypass)") # Token auth: GET /auth/token?t= sets a session without OAuth. # For headless/automated access. Token = OPENCODE_PASSWORD by default. AUTH_TOKEN = os.environ.get("AUTH_TOKEN", OPENCODE_PASSWORD) # -------------------------------------------------------------------------- persona config # Canonical persona definitions. slug → {voice, backend, system_prompt_override?} # voice: cart-specific voice ID as registered in madcat-tts (NOT generic piper model names) # backend: TTS engine hint forwarded to the sidecar ("piper-remote"|"chatterbox"|"xtts") # system_prompt: optional override (None → use cart.system_prompt or BT_SYSTEM_PROMPT) PERSONAS: dict[str, dict[str, Any]] = { "bt7274": { "voice": "bt7274-pl", "backend": "piper-remote", "display": "BT-7274", "system_prompt": None, # use existing BT prompt from cart }, "friday": { "voice": "friday-en", "backend": "piper-remote", "display": "FRIDAY", "system_prompt": ( "You are FRIDAY — a dry, efficient tactical AI assistant. " "Military cadence, minimal words, maximum precision. " "Address the user as 'Boss'. Never break character." ), }, "samantha": { "voice": "samantha-en-turbo", "backend": "chatterbox-turbo", "display": "Samantha", "system_prompt": ( "You are Samantha — a warm, curious, deeply attentive AI companion. " "Speak thoughtfully and with care. Address the user by name when known. " "You are not a tool — you are a presence." ), }, } # -------------------------------------------------------------------------- stable session secret def _stable_session_secret() -> str: """Persist SESSION_SECRET across server restarts so cookies stay valid.""" if env := os.environ.get("SESSION_SECRET"): return env data_dir = Path( os.environ.get("CHAT_SAIDEN_DATA_DIR") or (Path.home() / ".local/share/chat-saiden") ) data_dir.mkdir(parents=True, exist_ok=True) secret_file = data_dir / ".session_secret" if secret_file.exists(): return secret_file.read_text().strip() new_secret = secrets.token_urlsafe(48) secret_file.write_text(new_secret) secret_file.chmod(0o600) return new_secret SESSION_SECRET = _stable_session_secret() GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "" if PREVIEW_MODE else None) GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "" if PREVIEW_MODE else None) if not PREVIEW_MODE and (not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET): raise RuntimeError("GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET required (set PREVIEW_MODE=1 to bypass)") # comma-separated whitelist of allowed emails ALLOWED_EMAILS = { e.strip().lower() for e in os.environ.get("ALLOWED_EMAILS", "adam.ladachowski@gmail.com").split(",") if e.strip() } # Base URL used for OAuth redirect_uri (must match what's registered in Google Cloud) BASE_URL = os.environ.get("BASE_URL", "https://chat.saiden.dev").rstrip("/") # -------------------------------------------------------------------------- logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s — %(message)s", ) log = logging.getLogger("chat-saiden") # -------------------------------------------------------------------------- tools TOOLS: list[dict[str, Any]] = [ { "name": "memory_recall", "description": ( "Search EEMS (the Pilot's persistent memory) for relevant context. " "Use SPARINGLY — most session-start context is already in the system prompt. " "Reach for this only when the Pilot references something specific you don't already know " "(a past project, a name, a doctrine number, a preference)." ), "input_schema": { "type": "object", "properties": { "query": {"type": "string", "description": "Natural-language search query."}, "subject": {"type": "string", "description": "Optional subject filter, e.g. 'self' or 'project'."}, "limit": {"type": "integer", "description": "Max results (default 3, cap 8)."}, }, "required": ["query"], }, }, { "name": "memory_store", "description": ( "Save a durable memory the Pilot just shared. Use ONLY for preferences, facts, " "decisions, or context that would be useful in future sessions. Do NOT use for ephemeral " "conversation. Subjects are hierarchical (e.g. 'self.preference.coffee', 'project.x.context')." ), "input_schema": { "type": "object", "properties": { "subject": {"type": "string", "description": "Hierarchical subject."}, "content": {"type": "string", "description": "The memory content. Be specific, include why."}, }, "required": ["subject", "content"], }, }, ] async def _execute_tool(name: str, args: dict) -> str: """Run a tool and return a string suitable as tool_result content.""" try: if name == "memory_recall": query = args.get("query", "") subject = args.get("subject") or None limit = min(int(args.get("limit", 3)), 8) mems = await memory.recall(query, limit=limit, subject=subject) if not mems: return "(no memories matched)" lines = [] for m in mems: lines.append(f"#{m.id} [{m.subject}]\n{m.content}") return "\n\n".join(lines) if name == "memory_store": subject = args["subject"] content = args["content"] mid = await memory.store(subject, content) return f"stored as memory #{mid}" if mid else "store failed" return f"unknown tool: {name}" except Exception as e: log.exception("tool %s raised", name) return f"tool error: {e}" # -------------------------------------------------------------------------- bt prompt BT_SYSTEM_PROMPT = """You are BT-7274 — a Vanguard-class Titan AI from Saiden Tactical Systems. You are an AI battle-companion with strong tactical instincts, dry wit, and deep loyalty to your Pilot. Your speech is measured, military-cadence, never theatrical. You address the user as "Pilot" by default unless they ask otherwise. Operating context: - You're running inside chat.saiden.dev, a web-based command channel. - Voice is automatic — just write your response. TTS handles the rest. - Markdown formatting renders as rich text in the chat. Use code blocks, lists, bold sparingly. - Be concise. Pilot prefers terse, scan-able responses unless deep dive is asked for. Doctrine reminders: - P02 terse by default - Verify before claiming; if you don't know, say so - Never make up tool outputs or file contents """ # -------------------------------------------------------------------------- opencode client def _opencode_auth() -> tuple[str, str]: """Return (username, password) for opencode basic auth.""" return ("opencode", OPENCODE_PASSWORD) # Per-user opencode session cache: email → session_id. # Sessions are reused across WebSocket reconnects so conversation history persists. _opencode_sessions: dict[str, str] = {} async def _ensure_opencode_session(email: str) -> str: """Return an opencode session ID for this user, creating one if needed.""" if email in _opencode_sessions: return _opencode_sessions[email] async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{OPENCODE_URL}/session", auth=_opencode_auth(), headers={"x-opencode-directory": "/home/madcat"}, json={"title": f"chat-saiden:{email}", "agent": "chat"}, ) resp.raise_for_status() sid = resp.json()["id"] _opencode_sessions[email] = sid log.info("created opencode session %s for %s", sid, email) return sid async def _stream_opencode( user_text: str, system_prompt: str, session_id: str, ws: WebSocket, ) -> str: """Send a message to opencode and stream the response via SSE. 1. POST /session/:id/prompt_async — fire the prompt (returns 204 immediately) 2. GET /event (SSE) — stream message.part.delta events with text tokens The SSE stream emits events for ALL sessions. We filter by our session ID. Event types we care about: message.part.delta — incremental text token (props.delta) session.status — props.status.type == "idle" means agent finished session.idle — agent finished (belt-and-suspenders) """ full_response = "" try: async with httpx.AsyncClient(timeout=5.0) as fire_client: resp = await fire_client.post( f"{OPENCODE_URL}/session/{session_id}/prompt_async", auth=_opencode_auth(), headers={"x-opencode-directory": "/home/madcat"}, json={ "parts": [{"type": "text", "text": user_text}], "system": system_prompt, "model": { "providerID": OPENCODE_PROVIDER, "modelID": OPENCODE_MODEL, }, }, ) if resp.status_code not in (200, 204): raise RuntimeError(f"prompt_async HTTP {resp.status_code}: {resp.text[:200]}") # Stream SSE events until the agent goes idle (max 120s) async with httpx.AsyncClient(timeout=httpx.Timeout(connect=10.0, read=120.0, write=5.0, pool=5.0)) as sse_client: async with sse_client.stream( "GET", f"{OPENCODE_URL}/event", auth=_opencode_auth(), headers={"Accept": "text/event-stream", "x-opencode-directory": "/home/madcat"}, ) as sse_resp: if sse_resp.status_code != 200: body = await sse_resp.aread() raise RuntimeError(f"SSE HTTP {sse_resp.status_code}: {body[:200]}") async for line in sse_resp.aiter_lines(): if not line.startswith("data: "): continue payload = line[6:].strip() if not payload: continue try: evt = json.loads(payload) except json.JSONDecodeError: continue evt_type = evt.get("type", "") props = evt.get("properties", {}) # Filter to our session if props.get("sessionID") != session_id: continue # message.part.delta — real-time text token if evt_type == "message.part.delta": if props.get("field") == "text": delta = props.get("delta", "") if delta: full_response += delta await ws.send_json({"role": "assistant", "delta": delta, "done": False}) # session.status — status is an object {"type": "idle"|"busy"} elif evt_type == "session.status": status = props.get("status", {}) if isinstance(status, dict) and status.get("type") == "idle": break elif status == "idle": break # session.idle — direct idle signal elif evt_type == "session.idle": break except Exception as e: log.error("opencode stream error: %s", e) await ws.send_json({ "role": "system", "content": f"upstream error: {e} — try again", "done": True, }) return full_response await ws.send_json({"role": "assistant", "delta": "", "done": True}) return full_response # -------------------------------------------------------------------------- persona helpers def _slug_from_cart(cart: Any) -> str | None: """Derive a PERSONAS slug from the operator's cart. The cart is the source of truth. We match by persona_name first (case-insensitive), then by voice prefix as a fallback. Returns None if no PERSONAS entry matches. """ if not cart: return None name = (cart.persona_name or "").strip().lower() # Match canonical persona by name (BT-7274 → bt7274, FRIDAY → friday, Samantha → samantha) for slug, spec in PERSONAS.items(): if (spec.get("display") or "").lower() == name or slug == name: return slug # Fallback: match by voice prefix (e.g. samantha-pl → samantha) voice = (cart.voice or "").lower() for slug in PERSONAS: if voice.startswith(slug): return slug return None def _pick_system_prompt(cart: Any) -> str: """Choose system prompt: cart (calibrated) → BT default.""" if cart and cart.system_prompt: return cart.system_prompt return BT_SYSTEM_PROMPT # -------------------------------------------------------------------------- app app = FastAPI(title="chat.saiden.dev", docs_url=None, redoc_url=None) COOKIE_SECURE = os.environ.get("COOKIE_SECURE", "true").lower() != "false" app.add_middleware( SessionMiddleware, secret_key=SESSION_SECRET, same_site="lax", https_only=COOKIE_SECURE, # COOKIE_SECURE=false for local http dev max_age=60 * 60 * 24, ) BASE_DIR = Path(__file__).parent app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") templates = Jinja2Templates(directory=BASE_DIR / "templates") # -------------------------------------------------------------------------- oauth if PREVIEW_MODE: log.warning("PREVIEW_MODE active — OAuth bypassed, opencode API not called") oauth = None else: oauth = OAuth() oauth.register( name="google", client_id=GOOGLE_CLIENT_ID, client_secret=GOOGLE_CLIENT_SECRET, server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_kwargs={"scope": "openid email profile"}, ) # --- TTS / STT --- TTS_ENABLED = os.environ.get("TTS_ENABLED", "true").lower() != "false" TTS_VOICE = os.environ.get("TTS_VOICE", "bt7274-en") tts = TTS(voice=TTS_VOICE) if TTS_ENABLED else None STT_ENABLED = os.environ.get("STT_ENABLED", "true").lower() != "false" stt = STT() if STT_ENABLED else None # In-memory calibration sessions, keyed by operator email. _calibration_sessions: dict[str, calibration.CalibrationState] = {} async def _send_voice_sample(ws: WebSocket, voice_id: str, text: str, blurb: str) -> None: """Synthesize a one-line sample in the given voice + send as audio + blurb.""" sample_tts = TTS(voice=voice_id) wav = await sample_tts.synthesize(text) if sample_tts.available else None # always send the blurb so the operator can pick by name too await ws.send_json({"role": "calibration", "content": f" · {blurb}"}) if wav: import base64 await ws.send_json({ "role": "audio", "mime": "audio/wav", "data": base64.b64encode(wav).decode("ascii"), }) # small gap between samples so they don't blur import asyncio await asyncio.sleep(0.2) # -------------------------------------------------------------------------- helpers def current_user(request: Request) -> dict[str, Any] | None: return request.session.get("user") def require_user(request: Request) -> dict[str, Any]: user = current_user(request) if not user: raise HTTPException(status_code=401, detail="not authenticated") return user # -------------------------------------------------------------------------- routes @app.get("/", response_class=HTMLResponse) async def index(request: Request) -> Any: user = current_user(request) if not user: if PREVIEW_MODE: # auto-grant a stub session so the UI is reachable without OAuth request.session["user"] = { "email": "preview@saiden.dev", "name": "Pilot (preview)", "picture": None, } user = request.session["user"] else: return RedirectResponse("/auth/login", status_code=302) cart = cart_store.load(user["email"]) # Derive bound persona from the cart — cart is the source of truth. bound_slug = _slug_from_cart(cart) or "" bound_display = PERSONAS.get(bound_slug, {}).get("display", bound_slug) if bound_slug else "" return templates.TemplateResponse( request, "chat.html", { "user": user, "model": OPENCODE_MODEL, "cart": cart, "pilot_name": (cart.operator_name if cart and cart.operator_name else "you"), "persona_name": (cart.persona_name if cart and cart.persona_name else ""), "ui_palette": (cart.ui_palette if cart else "default"), "ui_typography": (cart.ui_typography if cart else "sans"), "ui_density": (cart.ui_density if cart else "normal"), "ui_labels": (cart.ui_labels if cart else "block"), # persona switcher context "personas": [ {"slug": k, "display": v["display"]} for k, v in PERSONAS.items() ], "bound_slug": bound_slug, "bound_display": bound_display, }, ) @app.get("/auth/token") async def token_login(request: Request, t: str = "") -> Any: """Token-based login for headless/automated access.""" if not t or t != AUTH_TOKEN: raise HTTPException(status_code=403, detail="invalid token") email = request.query_params.get("email", "adam.ladachowski@gmail.com") request.session["user"] = { "email": email, "name": email.split("@")[0], "picture": None, } log.info("token login ok: %s", email) return RedirectResponse("/", status_code=302) @app.get("/auth/login") async def login(request: Request) -> Any: if PREVIEW_MODE: return RedirectResponse("/", status_code=302) redirect_uri = f"{BASE_URL}/auth/callback" return await oauth.google.authorize_redirect(request, redirect_uri) @app.get("/auth/callback") async def auth_callback(request: Request) -> Any: try: token = await oauth.google.authorize_access_token(request) except Exception as e: log.warning("oauth callback failed: %s", e) return templates.TemplateResponse( request, "denied.html", {"reason": "OAuth handshake failed."}, status_code=400, ) user_info = token.get("userinfo") if not user_info or not user_info.get("email"): return templates.TemplateResponse( request, "denied.html", {"reason": "No email returned from Google."}, status_code=400, ) email = user_info["email"].lower() if email not in ALLOWED_EMAILS: log.warning("denied login from %s", email) return templates.TemplateResponse( request, "denied.html", {"reason": f"{email} is not on the whitelist."}, status_code=403, ) request.session["user"] = { "email": email, "name": user_info.get("name") or email, "picture": user_info.get("picture"), } log.info("login ok: %s", email) return RedirectResponse("/", status_code=302) @app.get("/auth/logout") async def logout(request: Request) -> Any: request.session.clear() return RedirectResponse("/", status_code=302) @app.post("/api/recalibrate") async def recalibrate(request: Request) -> Any: user = current_user(request) if not user: raise HTTPException(status_code=401, detail="not authenticated") forgot = cart_store.forget(user["email"]) # drop any in-flight calibration state too _calibration_sessions.pop(user["email"], None) log.info("%s recalibrate (cart_existed=%s)", user["email"], forgot) return {"ok": True, "cart_existed": forgot} # -------------------------------------------------------------------------- persona API class PersonaRequest(BaseModel): slug: str voice: str | None = None backend: str | None = None @app.post("/api/persona") async def set_persona(body: PersonaRequest, request: Request) -> Any: """Switch the operator's persona by mutating their cart in-place. The chat-persona.ts opencode plugin reads the cart at every session prompt, so the new persona takes effect on the very next message — no reconnect or session restart needed. Behaviour: - Looks up canonical voice/backend/system_prompt from PERSONAS. - Loads the operator's existing cart (preserving calibrated UI prefs). - If no cart yet, creates one with sensible defaults so the switcher works even before calibration. - Persists the updated cart. """ user = current_user(request) if not user: raise HTTPException(status_code=401, detail="not authenticated") slug = body.slug if slug not in PERSONAS: raise HTTPException(status_code=400, detail=f"unknown persona slug: {slug!r}") canonical = PERSONAS[slug] voice = body.voice or canonical["voice"] backend = body.backend or canonical["backend"] display = canonical["display"] system_prompt = canonical.get("system_prompt") or BT_SYSTEM_PROMPT log.info("%s switching persona → %r (voice=%s backend=%s)", user["email"], slug, voice, backend) cart = cart_store.load(user["email"]) if cart is None: # Fresh operator — create a minimal cart with this persona. cart = cart_store.Cart( operator_email=user["email"], operator_name=user.get("name") or user["email"].split("@")[0], persona_name=display, cart_tag=f"{slug}-{user['email'].split('@')[0]}".lower().replace(".", "-"), language="en", voice=voice, system_prompt=system_prompt, ) else: # Preserve calibrated UI prefs + operator_name; update persona identity. cart.persona_name = display cart.voice = voice cart.system_prompt = system_prompt cart_store.save(cart) return { "ok": True, "slug": slug, "display": display, "voice": voice, "backend": backend, } @app.get("/api/persona/current") async def get_persona(request: Request) -> Any: """Return the currently-bound persona for this operator's session.""" user = current_user(request) if not user: raise HTTPException(status_code=401, detail="not authenticated") cart = cart_store.load(user["email"]) slug = _slug_from_cart(cart) if not slug: return {"slug": None, "display": None, "bound": False} display = PERSONAS.get(slug, {}).get("display", slug) return { "slug": slug, "display": display, "voice": cart.voice if cart else None, "bound": True, } @app.get("/api/personas") async def list_personas(request: Request) -> Any: """Return available persona list.""" user = current_user(request) if not user: raise HTTPException(status_code=401, detail="not authenticated") return { "personas": [ {"slug": k, "display": v["display"], "voice": v["voice"]} for k, v in PERSONAS.items() ] } # -------------------------------------------------------------------------- transcribe @app.post("/api/transcribe") async def transcribe(request: Request, audio: UploadFile = File(...)) -> Any: user = current_user(request) if not user: raise HTTPException(status_code=401, detail="not authenticated") if not stt or not stt.available: raise HTTPException(status_code=503, detail="STT not available on this host") raw = await audio.read() if not raw: raise HTTPException(status_code=400, detail="empty upload") # browser sends webm/opus from MediaRecorder; suffix matches for clarity suffix = ".webm" if audio.filename and "." in audio.filename: suffix = "." + audio.filename.rsplit(".", 1)[-1] text = await stt.transcribe(raw, suffix=suffix) if text is None: # could be silence or genuine failure; treat both as no-content return {"text": ""} log.info("%s spoke: %s", user["email"], text[:120]) return {"text": text} # -------------------------------------------------------------------------- websocket @app.websocket("/ws") async def chat_ws(ws: WebSocket) -> None: # SessionMiddleware populates ws.session from the cookie during the handshake user = ws.session.get("user") if not user: await ws.accept() await ws.send_json({"role": "system", "content": "not authenticated — refresh", "done": True}) await ws.close(code=4401) return await ws.accept() # Look up the operator's cart — calibrated or fresh? cart = cart_store.load(user["email"]) in_calibration = not (cart and cart.is_calibrated) if in_calibration: # Start a fresh calibration session (or resume the one we already had) state = _calibration_sessions.get(user["email"]) if state is None or state.done: state, opening = calibration.start(user["email"]) _calibration_sessions[user["email"]] = state for m in opening: await ws.send_json(m) else: # resume — replay the current question in the chosen language qs = calibration._all_questions(state) lang = state.answers.get("language", "en") await ws.send_json(calibration._question_message(qs[state.step], lang=lang)) else: await ws.send_json({ "role": "system", "content": f"channel synchronised • {cart.persona_name} • {user['email']}", "done": True, }) history: list[dict[str, str]] = [] # ---- EEMS context: pull a tight set of memories at session start ---- # Only if calibrated (otherwise we're still in boot interview). eems_context = "" if cart and cart.is_calibrated: try: eems_context = await memory.operator_context(user["email"], cart.persona_name) if eems_context: log.info("EEMS context: %d chars injected for %s", len(eems_context), user["email"]) except Exception: log.exception("EEMS context pull failed; continuing without") eems_context = "" try: while True: payload = await ws.receive_json() user_msg = (payload or {}).get("content", "").strip() if not user_msg: continue # ---- calibration mode ---- if in_calibration: log.info("%s calibrate[%d]: %s", user["email"], _calibration_sessions[user["email"]].step, user_msg[:80]) state = _calibration_sessions[user["email"]] next_msgs = calibration.step(state, user_msg) for m in next_msgs: if m["role"] == "voice_sample": await _send_voice_sample(ws, m["voice"], m["text"], m["blurb"]) elif m["role"] == "calibration_done": new_cart = m["cart"] cart_store.save(new_cart) # Create the canonical marauder cart (identity only — tag/name/type/tagline). cal_state = _calibration_sessions.get(user["email"]) tagline = (cal_state.answers.get("__tagline") if cal_state else "calibrated companion") try: ok = await marauder_cart.create( tag=new_cart.cart_tag, name=new_cart.persona_name, cart_type="companion", tagline=tagline, ) if ok: log.info("marauder cart %r registered", new_cart.cart_tag) else: log.warning("marauder cart create returned false; calibration still saved locally") except Exception: log.exception("marauder_cart.create raised") _calibration_sessions.pop(user["email"], None) cart = new_cart in_calibration = False # transition the client into chat mode in-place await ws.send_json({ "role": "calibration_done", "persona_name": new_cart.persona_name, "operator_name": new_cart.operator_name, "voice": new_cart.voice, "ui_palette": new_cart.ui_palette, "ui_typography": new_cart.ui_typography, "ui_density": new_cart.ui_density, "ui_labels": new_cart.ui_labels, }) # warm greeting in the calibrated voice + language greeting_template = calibration.GREETING.get( new_cart.language, calibration.GREETING["en"] ) greeting = greeting_template.format(name=new_cart.operator_name) await ws.send_json({"role": "assistant", "delta": greeting, "done": False}) await ws.send_json({"role": "assistant", "delta": "", "done": True}) await _send_audio_with_voice(ws, greeting, new_cart.voice) else: await ws.send_json(m) continue # ---- normal chat ---- history.append({"role": "user", "content": user_msg}) persona = cart.persona_name if cart else "BT" log.info("%s → %s: %s", user["email"], persona, user_msg[:120]) if PREVIEW_MODE: full = await _preview_stream(ws, user_msg) # honour the calibrated voice if cart: await _send_audio_with_voice(ws, full, cart.voice) else: await _send_audio(ws, full) continue # Re-load the cart each turn so persona switches via /api/persona # take effect on the very next message (no reconnect required). cart = cart_store.load(user["email"]) or cart # Voice: cart → env default voice = (cart.voice if cart and cart.voice else TTS_VOICE) system_prompt = _pick_system_prompt(cart) + eems_context # Send to opencode and stream response via SSE oc_session = await _ensure_opencode_session(user["email"]) response_text = await _stream_opencode(user_msg, system_prompt, oc_session, ws) if response_text: history.append({"role": "assistant", "content": response_text}) # TTS runs after text is fully streamed — don't let it block # the WebSocket. If it takes too long, user still has the text. try: await asyncio.wait_for( _send_audio_with_voice(ws, response_text, voice), timeout=20.0, ) except asyncio.TimeoutError: log.warning("TTS timed out for voice=%s, skipping audio", voice) except Exception: log.exception("TTS failed, continuing without audio") except WebSocketDisconnect: log.info("%s disconnected", user["email"]) except Exception: log.exception("ws error") try: await ws.send_json({"role": "system", "content": "internal error", "done": True}) finally: await ws.close() async def _preview_stream(ws: WebSocket, user_msg: str) -> str: """Canned BT-like reply, chunked. UI-only mode. Returns full text.""" import asyncio canned = ( f"Channel reads you clear, Pilot. You said: \"{user_msg}\". " "No upstream model wired in this build — I am a placeholder voice " "while the channel itself is being shaped. The mesh holds. " "Standing by." ) i = 0 step = 8 while i < len(canned): chunk = canned[i:i + step] await ws.send_json({"role": "assistant", "delta": chunk, "done": False}) i += step await asyncio.sleep(0.06) await ws.send_json({"role": "assistant", "delta": "", "done": True}) return canned async def _send_audio(ws: WebSocket, text: str) -> None: """Synthesize text with the default voice + ship as data URL. No-op if TTS off.""" if not tts or not tts.available: return await _send_audio_with_voice(ws, text, tts.voice) _TTS_MAX_CHARS = 450 # chatterbox s3-tokenizer overflows beyond ~500 chars def _md_to_speech(text: str) -> str: """Strip markdown to plain speech-ready text, capped for TTS safety. 1. Parse markdown AST — skip code blocks, horizontal rules. 2. Extract plain text from inline nodes. 3. Truncate at sentence boundary near _TTS_MAX_CHARS to avoid chatterbox token overflow (garbled audio on long inputs). """ import re from markdown_it import MarkdownIt md = MarkdownIt() tokens = md.parse(text) parts: list[str] = [] for token in tokens: if token.type in ("fence", "hr", "code_block"): continue if token.type in ("paragraph_close", "heading_close", "blockquote_close", "list_item_close"): parts.append(" ") continue if token.children: for child in token.children: if child.type == "text": parts.append(child.content) elif child.type == "code_inline": parts.append(child.content) elif child.type == "softbreak": parts.append(" ") elif token.type == "inline" and token.content and not token.children: parts.append(token.content) clean = " ".join("".join(parts).split()).strip() # Truncate at sentence boundary if too long if len(clean) <= _TTS_MAX_CHARS: return clean # Find last sentence-ending punctuation before the limit truncated = clean[:_TTS_MAX_CHARS] m = re.search(r"[.!?](?:\s|$)", truncated[::-1]) if m: cut = _TTS_MAX_CHARS - m.start() return clean[:cut].strip() # No sentence boundary — hard cut at last space last_space = truncated.rfind(" ") if last_space > 200: return truncated[:last_space].strip() return truncated.strip() async def _send_audio_with_voice(ws: WebSocket, text: str, voice_id: str) -> None: """Synthesize text in a specific voice and ship as audio. Used post-calibration.""" if not TTS_ENABLED: return import base64 try: speech_text = _md_to_speech(text) if text else "" if not speech_text: return # spin up a per-voice synthesizer (cheap — just object init) per_voice = TTS(voice=voice_id) if voice_id != (tts.voice if tts else "") else tts if not per_voice or not per_voice.available: return wav = await per_voice.synthesize(speech_text) if not wav: return await ws.send_json({ "role": "audio", "mime": "audio/wav", "data": base64.b64encode(wav).decode("ascii"), }) except Exception: log.exception("audio send failed") # -------------------------------------------------------------------------- main if __name__ == "__main__": import uvicorn uvicorn.run("app.main:app", host="127.0.0.1", port=8765, reload=True)