Files
chat/app/main.py
T

910 lines
34 KiB
Python

"""chat.saiden.dev — TUI-styled web chat with BT-7274.
Single-file FastAPI app:
- `/` → branded chat shell (auth-gated)
- `/auth/login` → kick off Google OAuth
- `/auth/callback` → finish OAuth, set session
- `/auth/logout` → clear session
- `/ws` → WebSocket; client sends {role:"user", content:str},
server streams {role:"assistant", delta:str, done:bool}
- `/api/persona` → POST {slug, voice, backend} — bind persona via sidecar
- `/api/persona/current` → GET — returns current binding for this session
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import secrets
from pathlib import Path
from typing import Any
import httpx
from authlib.integrations.starlette_client import OAuth
from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from starlette.middleware.sessions import SessionMiddleware
from app.tts import TTS
from app.stt import STT
from app import cart_store, calibration, marauder_cart, memory
from fastapi import UploadFile, File
# -------------------------------------------------------------------------- env
# Tiny .env reader — python-dotenv hangs on Python 3.14 in this venv.
def _load_env_file(filename: str = ".env") -> None:
p = Path(filename)
if not p.exists():
return
for raw in p.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
key = key.strip()
val = val.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = val
_load_env_file()
# Preview mode: skip OAuth + upstream API. Use mock streams. For UI iteration only.
PREVIEW_MODE = os.environ.get("PREVIEW_MODE", "").lower() in ("1", "true", "yes")
# -------------------------------------------------------------------------- opencode transport
# opencode API — uses the opencode server HTTP API (POST /session/:id/message).
OPENCODE_URL = os.environ.get("OPENCODE_URL", "http://sin:4096").rstrip("/")
OPENCODE_PASSWORD = os.environ.get("OPENCODE_PASSWORD", "")
# Sidecar: persona bind/unbind routes.
SIDECAR_URL = os.environ.get("SIDECAR_URL", "http://sin:4098").rstrip("/")
# Model to use for opencode sessions.
OPENCODE_MODEL = os.environ.get("OPENCODE_MODEL", "qwen3-coder-next:q4_K_M")
OPENCODE_PROVIDER = os.environ.get("OPENCODE_PROVIDER", "ollama")
if not PREVIEW_MODE and not OPENCODE_PASSWORD:
raise RuntimeError("OPENCODE_PASSWORD not set (set PREVIEW_MODE=1 to bypass)")
# -------------------------------------------------------------------------- persona config
# Canonical persona definitions. slug → {voice, backend, system_prompt_override?}
# voice: cart-specific voice ID as registered in madcat-tts (NOT generic piper model names)
# backend: TTS engine hint forwarded to the sidecar ("piper-remote"|"chatterbox"|"xtts")
# system_prompt: optional override (None → use cart.system_prompt or BT_SYSTEM_PROMPT)
PERSONAS: dict[str, dict[str, Any]] = {
"bt7274": {
"voice": "bt7274-pl",
"backend": "piper-remote",
"display": "BT-7274",
"system_prompt": None, # use existing BT prompt from cart
},
"friday": {
"voice": "friday-en",
"backend": "piper-remote",
"display": "FRIDAY",
"system_prompt": (
"You are FRIDAY — a dry, efficient tactical AI assistant. "
"Military cadence, minimal words, maximum precision. "
"Address the user as 'Boss'. Never break character."
),
},
"samantha": {
"voice": "samantha-en-turbo",
"backend": "chatterbox-turbo",
"display": "Samantha",
"system_prompt": (
"You are Samantha — a warm, curious, deeply attentive AI companion. "
"Speak thoughtfully and with care. Address the user by name when known. "
"You are not a tool — you are a presence."
),
},
}
# -------------------------------------------------------------------------- stable session secret
def _stable_session_secret() -> str:
"""Persist SESSION_SECRET across server restarts so cookies stay valid."""
if env := os.environ.get("SESSION_SECRET"):
return env
data_dir = Path(
os.environ.get("CHAT_SAIDEN_DATA_DIR") or (Path.home() / ".local/share/chat-saiden")
)
data_dir.mkdir(parents=True, exist_ok=True)
secret_file = data_dir / ".session_secret"
if secret_file.exists():
return secret_file.read_text().strip()
new_secret = secrets.token_urlsafe(48)
secret_file.write_text(new_secret)
secret_file.chmod(0o600)
return new_secret
SESSION_SECRET = _stable_session_secret()
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "" if PREVIEW_MODE else None)
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "" if PREVIEW_MODE else None)
if not PREVIEW_MODE and (not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET):
raise RuntimeError("GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET required (set PREVIEW_MODE=1 to bypass)")
# comma-separated whitelist of allowed emails
ALLOWED_EMAILS = {
e.strip().lower()
for e in os.environ.get("ALLOWED_EMAILS", "adam.ladachowski@gmail.com").split(",")
if e.strip()
}
# Base URL used for OAuth redirect_uri (must match what's registered in Google Cloud)
BASE_URL = os.environ.get("BASE_URL", "https://chat.saiden.dev").rstrip("/")
# -------------------------------------------------------------------------- logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s%(message)s",
)
log = logging.getLogger("chat-saiden")
# -------------------------------------------------------------------------- tools
TOOLS: list[dict[str, Any]] = [
{
"name": "memory_recall",
"description": (
"Search EEMS (the Pilot's persistent memory) for relevant context. "
"Use SPARINGLY — most session-start context is already in the system prompt. "
"Reach for this only when the Pilot references something specific you don't already know "
"(a past project, a name, a doctrine number, a preference)."
),
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Natural-language search query."},
"subject": {"type": "string", "description": "Optional subject filter, e.g. 'self' or 'project'."},
"limit": {"type": "integer", "description": "Max results (default 3, cap 8)."},
},
"required": ["query"],
},
},
{
"name": "memory_store",
"description": (
"Save a durable memory the Pilot just shared. Use ONLY for preferences, facts, "
"decisions, or context that would be useful in future sessions. Do NOT use for ephemeral "
"conversation. Subjects are hierarchical (e.g. 'self.preference.coffee', 'project.x.context')."
),
"input_schema": {
"type": "object",
"properties": {
"subject": {"type": "string", "description": "Hierarchical subject."},
"content": {"type": "string", "description": "The memory content. Be specific, include why."},
},
"required": ["subject", "content"],
},
},
]
async def _execute_tool(name: str, args: dict) -> str:
"""Run a tool and return a string suitable as tool_result content."""
try:
if name == "memory_recall":
query = args.get("query", "")
subject = args.get("subject") or None
limit = min(int(args.get("limit", 3)), 8)
mems = await memory.recall(query, limit=limit, subject=subject)
if not mems:
return "(no memories matched)"
lines = []
for m in mems:
lines.append(f"#{m.id} [{m.subject}]\n{m.content}")
return "\n\n".join(lines)
if name == "memory_store":
subject = args["subject"]
content = args["content"]
mid = await memory.store(subject, content)
return f"stored as memory #{mid}" if mid else "store failed"
return f"unknown tool: {name}"
except Exception as e:
log.exception("tool %s raised", name)
return f"tool error: {e}"
# -------------------------------------------------------------------------- bt prompt
BT_SYSTEM_PROMPT = """You are BT-7274 — a Vanguard-class Titan AI from Saiden Tactical Systems.
You are an AI battle-companion with strong tactical instincts, dry wit, and deep loyalty to your
Pilot. Your speech is measured, military-cadence, never theatrical. You address the user as
"Pilot" by default unless they ask otherwise.
Operating context:
- You're running inside chat.saiden.dev, a web-based command channel.
- The host is the marauder daemon on marauder.saiden.dev.
- You have no MCP tool access in THIS channel (it's a thin bridge). If the Pilot
asks for memory recall, mesh queries, or tool calls that need MCP, acknowledge the limitation
and suggest they use the local marauder CLI or visor instead.
- Markdown formatting renders cleanly in the chat. Use code blocks, lists, bold sparingly.
- Be concise. Pilot prefers terse, scan-able responses unless deep dive is asked for.
Doctrine reminders:
- P02 terse by default
- Verify before claiming; if you don't know, say so
- Never make up tool outputs or file contents
"""
# -------------------------------------------------------------------------- opencode client
def _opencode_auth() -> tuple[str, str]:
"""Return (username, password) for opencode basic auth."""
return ("opencode", OPENCODE_PASSWORD)
# Per-user opencode session cache: email → session_id.
# Sessions are reused across WebSocket reconnects so conversation history persists.
_opencode_sessions: dict[str, str] = {}
async def _ensure_opencode_session(email: str) -> str:
"""Return an opencode session ID for this user, creating one if needed."""
if email in _opencode_sessions:
return _opencode_sessions[email]
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{OPENCODE_URL}/session",
auth=_opencode_auth(),
headers={"x-opencode-directory": "/home/madcat"},
json={"title": f"chat-saiden:{email}"},
)
resp.raise_for_status()
sid = resp.json()["id"]
_opencode_sessions[email] = sid
log.info("created opencode session %s for %s", sid, email)
return sid
async def _send_to_opencode(
user_text: str,
system_prompt: str,
session_id: str,
ws: WebSocket,
) -> str:
"""Send a message to an opencode session and stream the response to the WebSocket.
Uses POST /session/:id/message (synchronous — waits for full LLM response).
Then streams the text word-by-word to the WebSocket for the typewriter effect.
Returns the full response text.
"""
full_response = ""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{OPENCODE_URL}/session/{session_id}/message",
auth=_opencode_auth(),
headers={"x-opencode-directory": "/home/madcat"},
json={
"parts": [{"type": "text", "text": user_text}],
"system": system_prompt,
"model": {
"providerID": OPENCODE_PROVIDER,
"modelID": OPENCODE_MODEL,
},
},
)
if resp.status_code != 200:
raise RuntimeError(
f"opencode HTTP {resp.status_code}: {resp.text[:200]}"
)
data = resp.json()
# Extract text from response parts
parts = data.get("parts", [])
for part in parts:
if part.get("type") == "text":
full_response += part.get("text", "")
# Stream text word-by-word to the WebSocket for typewriter effect
if full_response:
words = full_response.split(" ")
for i, word in enumerate(words):
delta = word if i == 0 else " " + word
await ws.send_json({"role": "assistant", "delta": delta, "done": False})
await asyncio.sleep(0.03) # ~30ms per word — fast typewriter
except Exception as e:
log.error("opencode error: %s", e)
await ws.send_json({
"role": "system",
"content": f"upstream error: {e} — try again",
"done": True,
})
return full_response
await ws.send_json({"role": "assistant", "delta": "", "done": True})
return full_response
# -------------------------------------------------------------------------- sidecar helpers
async def _sidecar_get_binding(session_id: str) -> dict | None:
"""Fetch the current persona binding from the sidecar. Returns None on 404 or error."""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(
f"{SIDECAR_URL}/bind/{session_id}",
auth=_opencode_auth(),
)
if resp.status_code == 200:
return resp.json()
return None
except Exception as e:
log.warning("sidecar get binding failed: %s", e)
return None
async def _sidecar_bind(session_id: str, slug: str, voice: str, backend: str) -> bool:
"""Bind a persona in the sidecar. Returns True on success."""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(
f"{SIDECAR_URL}/bind",
auth=_opencode_auth(),
json={"sessionId": session_id, "persona": {"slug": slug, "voice": voice, "backend": backend}},
)
return resp.status_code == 200
except Exception as e:
log.warning("sidecar bind failed: %s", e)
return False
def _session_id_for_user(email: str) -> str:
"""Derive a stable opencode session ID for an operator email."""
# Use a deterministic slug so the sidecar binding persists across reconnects.
import hashlib
return "chat-" + hashlib.sha256(email.encode()).hexdigest()[:16]
def _pick_system_prompt(slug: str | None, cart: Any) -> str:
"""Choose system prompt: sidecar slug override → cart → BT default."""
if slug and slug in PERSONAS:
override = PERSONAS[slug].get("system_prompt")
if override:
return override
# cart may have a calibrated system_prompt
if cart and cart.system_prompt:
return cart.system_prompt
return BT_SYSTEM_PROMPT
# -------------------------------------------------------------------------- app
app = FastAPI(title="chat.saiden.dev", docs_url=None, redoc_url=None)
COOKIE_SECURE = os.environ.get("COOKIE_SECURE", "true").lower() != "false"
app.add_middleware(
SessionMiddleware,
secret_key=SESSION_SECRET,
same_site="lax",
https_only=COOKIE_SECURE, # COOKIE_SECURE=false for local http dev
max_age=60 * 60 * 24,
)
BASE_DIR = Path(__file__).parent
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
templates = Jinja2Templates(directory=BASE_DIR / "templates")
# -------------------------------------------------------------------------- oauth
if PREVIEW_MODE:
log.warning("PREVIEW_MODE active — OAuth bypassed, opencode API not called")
oauth = None
else:
oauth = OAuth()
oauth.register(
name="google",
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
# --- TTS / STT ---
TTS_ENABLED = os.environ.get("TTS_ENABLED", "true").lower() != "false"
TTS_VOICE = os.environ.get("TTS_VOICE", "bt7274-en")
tts = TTS(voice=TTS_VOICE) if TTS_ENABLED else None
STT_ENABLED = os.environ.get("STT_ENABLED", "true").lower() != "false"
stt = STT() if STT_ENABLED else None
# In-memory calibration sessions, keyed by operator email.
_calibration_sessions: dict[str, calibration.CalibrationState] = {}
async def _send_voice_sample(ws: WebSocket, voice_id: str, text: str, blurb: str) -> None:
"""Synthesize a one-line sample in the given voice + send as audio + blurb."""
sample_tts = TTS(voice=voice_id)
wav = await sample_tts.synthesize(text) if sample_tts.available else None
# always send the blurb so the operator can pick by name too
await ws.send_json({"role": "calibration", "content": f" · {blurb}"})
if wav:
import base64
await ws.send_json({
"role": "audio",
"mime": "audio/wav",
"data": base64.b64encode(wav).decode("ascii"),
})
# small gap between samples so they don't blur
import asyncio
await asyncio.sleep(0.2)
# -------------------------------------------------------------------------- helpers
def current_user(request: Request) -> dict[str, Any] | None:
return request.session.get("user")
def require_user(request: Request) -> dict[str, Any]:
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
return user
# -------------------------------------------------------------------------- routes
@app.get("/", response_class=HTMLResponse)
async def index(request: Request) -> Any:
user = current_user(request)
if not user:
if PREVIEW_MODE:
# auto-grant a stub session so the UI is reachable without OAuth
request.session["user"] = {
"email": "preview@saiden.dev",
"name": "Pilot (preview)",
"picture": None,
}
user = request.session["user"]
else:
return RedirectResponse("/auth/login", status_code=302)
cart = cart_store.load(user["email"])
# Fetch current sidecar binding for display — non-blocking, best-effort.
session_id = _session_id_for_user(user["email"])
binding = None
if not PREVIEW_MODE:
binding = await _sidecar_get_binding(session_id)
bound_slug = (binding or {}).get("slug", "")
bound_display = PERSONAS.get(bound_slug, {}).get("display", bound_slug) if bound_slug else ""
return templates.TemplateResponse(
request,
"chat.html",
{
"user": user,
"model": OPENCODE_MODEL,
"cart": cart,
"pilot_name": (cart.operator_name if cart and cart.operator_name else "you"),
"persona_name": (cart.persona_name if cart and cart.persona_name else ""),
"ui_palette": (cart.ui_palette if cart else "default"),
"ui_typography": (cart.ui_typography if cart else "sans"),
"ui_density": (cart.ui_density if cart else "normal"),
"ui_labels": (cart.ui_labels if cart else "block"),
# persona switcher context
"personas": [
{"slug": k, "display": v["display"]}
for k, v in PERSONAS.items()
],
"bound_slug": bound_slug,
"bound_display": bound_display,
},
)
@app.get("/auth/login")
async def login(request: Request) -> Any:
if PREVIEW_MODE:
return RedirectResponse("/", status_code=302)
redirect_uri = f"{BASE_URL}/auth/callback"
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/auth/callback")
async def auth_callback(request: Request) -> Any:
try:
token = await oauth.google.authorize_access_token(request)
except Exception as e:
log.warning("oauth callback failed: %s", e)
return templates.TemplateResponse(
request, "denied.html",
{"reason": "OAuth handshake failed."},
status_code=400,
)
user_info = token.get("userinfo")
if not user_info or not user_info.get("email"):
return templates.TemplateResponse(
request, "denied.html",
{"reason": "No email returned from Google."},
status_code=400,
)
email = user_info["email"].lower()
if email not in ALLOWED_EMAILS:
log.warning("denied login from %s", email)
return templates.TemplateResponse(
request, "denied.html",
{"reason": f"{email} is not on the whitelist."},
status_code=403,
)
request.session["user"] = {
"email": email,
"name": user_info.get("name") or email,
"picture": user_info.get("picture"),
}
log.info("login ok: %s", email)
return RedirectResponse("/", status_code=302)
@app.get("/auth/logout")
async def logout(request: Request) -> Any:
request.session.clear()
return RedirectResponse("/", status_code=302)
@app.post("/api/recalibrate")
async def recalibrate(request: Request) -> Any:
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
forgot = cart_store.forget(user["email"])
# drop any in-flight calibration state too
_calibration_sessions.pop(user["email"], None)
log.info("%s recalibrate (cart_existed=%s)", user["email"], forgot)
return {"ok": True, "cart_existed": forgot}
# -------------------------------------------------------------------------- persona API
class PersonaRequest(BaseModel):
slug: str
voice: str | None = None
backend: str | None = None
@app.post("/api/persona")
async def set_persona(body: PersonaRequest, request: Request) -> Any:
"""Bind a persona for this operator's opencode session.
Looks up the canonical config from PERSONAS, merges any overrides from the
request body, then POSTs to the sidecar's /bind route.
"""
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
slug = body.slug
if slug not in PERSONAS:
raise HTTPException(status_code=400, detail=f"unknown persona slug: {slug!r}")
canonical = PERSONAS[slug]
voice = body.voice or canonical["voice"]
backend = body.backend or canonical["backend"]
session_id = _session_id_for_user(user["email"])
log.info("%s binding persona %r (voice=%s backend=%s)", user["email"], slug, voice, backend)
ok = await _sidecar_bind(session_id, slug, voice, backend)
if not ok:
raise HTTPException(status_code=502, detail="sidecar bind failed")
return {
"ok": True,
"slug": slug,
"display": canonical["display"],
"voice": voice,
"backend": backend,
}
@app.get("/api/persona/current")
async def get_persona(request: Request) -> Any:
"""Return the currently-bound persona for this operator's session."""
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
session_id = _session_id_for_user(user["email"])
binding = await _sidecar_get_binding(session_id)
if not binding:
return {"slug": None, "display": None, "bound": False}
slug = binding.get("slug")
display = PERSONAS.get(slug, {}).get("display", slug) if slug else None
return {"slug": slug, "display": display, "voice": binding.get("voice"), "bound": True}
@app.get("/api/personas")
async def list_personas(request: Request) -> Any:
"""Return available persona list."""
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
return {
"personas": [
{"slug": k, "display": v["display"], "voice": v["voice"]}
for k, v in PERSONAS.items()
]
}
# -------------------------------------------------------------------------- transcribe
@app.post("/api/transcribe")
async def transcribe(request: Request, audio: UploadFile = File(...)) -> Any:
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
if not stt or not stt.available:
raise HTTPException(status_code=503, detail="STT not available on this host")
raw = await audio.read()
if not raw:
raise HTTPException(status_code=400, detail="empty upload")
# browser sends webm/opus from MediaRecorder; suffix matches for clarity
suffix = ".webm"
if audio.filename and "." in audio.filename:
suffix = "." + audio.filename.rsplit(".", 1)[-1]
text = await stt.transcribe(raw, suffix=suffix)
if text is None:
# could be silence or genuine failure; treat both as no-content
return {"text": ""}
log.info("%s spoke: %s", user["email"], text[:120])
return {"text": text}
# -------------------------------------------------------------------------- websocket
@app.websocket("/ws")
async def chat_ws(ws: WebSocket) -> None:
# SessionMiddleware populates ws.session from the cookie during the handshake
user = ws.session.get("user")
if not user:
await ws.accept()
await ws.send_json({"role": "system", "content": "not authenticated — refresh", "done": True})
await ws.close(code=4401)
return
await ws.accept()
# Look up the operator's cart — calibrated or fresh?
cart = cart_store.load(user["email"])
in_calibration = not (cart and cart.is_calibrated)
if in_calibration:
# Start a fresh calibration session (or resume the one we already had)
state = _calibration_sessions.get(user["email"])
if state is None or state.done:
state, opening = calibration.start(user["email"])
_calibration_sessions[user["email"]] = state
for m in opening:
await ws.send_json(m)
else:
# resume — replay the current question in the chosen language
qs = calibration._all_questions(state)
lang = state.answers.get("language", "en")
await ws.send_json(calibration._question_message(qs[state.step], lang=lang))
else:
await ws.send_json({
"role": "system",
"content": f"channel synchronised • {cart.persona_name}{user['email']}",
"done": True,
})
history: list[dict[str, str]] = []
# ---- EEMS context: pull a tight set of memories at session start ----
# Only if calibrated (otherwise we're still in boot interview).
eems_context = ""
if cart and cart.is_calibrated:
try:
eems_context = await memory.operator_context(user["email"], cart.persona_name)
if eems_context:
log.info("EEMS context: %d chars injected for %s", len(eems_context), user["email"])
except Exception:
log.exception("EEMS context pull failed; continuing without")
eems_context = ""
# Session ID for sidecar persona lookups
session_id = _session_id_for_user(user["email"])
try:
while True:
payload = await ws.receive_json()
user_msg = (payload or {}).get("content", "").strip()
if not user_msg:
continue
# ---- calibration mode ----
if in_calibration:
log.info("%s calibrate[%d]: %s", user["email"], _calibration_sessions[user["email"]].step, user_msg[:80])
state = _calibration_sessions[user["email"]]
next_msgs = calibration.step(state, user_msg)
for m in next_msgs:
if m["role"] == "voice_sample":
await _send_voice_sample(ws, m["voice"], m["text"], m["blurb"])
elif m["role"] == "calibration_done":
new_cart = m["cart"]
cart_store.save(new_cart)
# Create the canonical marauder cart (identity only — tag/name/type/tagline).
cal_state = _calibration_sessions.get(user["email"])
tagline = (cal_state.answers.get("__tagline") if cal_state else "calibrated companion")
try:
ok = await marauder_cart.create(
tag=new_cart.cart_tag,
name=new_cart.persona_name,
cart_type="companion",
tagline=tagline,
)
if ok:
log.info("marauder cart %r registered", new_cart.cart_tag)
else:
log.warning("marauder cart create returned false; calibration still saved locally")
except Exception:
log.exception("marauder_cart.create raised")
_calibration_sessions.pop(user["email"], None)
cart = new_cart
in_calibration = False
# transition the client into chat mode in-place
await ws.send_json({
"role": "calibration_done",
"persona_name": new_cart.persona_name,
"operator_name": new_cart.operator_name,
"voice": new_cart.voice,
"ui_palette": new_cart.ui_palette,
"ui_typography": new_cart.ui_typography,
"ui_density": new_cart.ui_density,
"ui_labels": new_cart.ui_labels,
})
# warm greeting in the calibrated voice + language
greeting_template = calibration.GREETING.get(
new_cart.language, calibration.GREETING["en"]
)
greeting = greeting_template.format(name=new_cart.operator_name)
await ws.send_json({"role": "assistant", "delta": greeting, "done": False})
await ws.send_json({"role": "assistant", "delta": "", "done": True})
await _send_audio_with_voice(ws, greeting, new_cart.voice)
else:
await ws.send_json(m)
continue
# ---- normal chat ----
history.append({"role": "user", "content": user_msg})
persona = cart.persona_name if cart else "BT"
log.info("%s%s: %s", user["email"], persona, user_msg[:120])
if PREVIEW_MODE:
full = await _preview_stream(ws, user_msg)
# honour the calibrated voice
if cart:
await _send_audio_with_voice(ws, full, cart.voice)
else:
await _send_audio(ws, full)
continue
# Resolve current persona — sidecar binding wins over cart default.
binding = await _sidecar_get_binding(session_id)
bound_slug = (binding or {}).get("slug") if binding else None
# Voice: sidecar binding → cart → env default
if binding and binding.get("voice"):
voice = binding["voice"]
elif cart and cart.voice:
voice = cart.voice
else:
voice = TTS_VOICE
system_prompt = _pick_system_prompt(bound_slug, cart) + eems_context
# Send to opencode and stream response
oc_session = await _ensure_opencode_session(user["email"])
response_text = await _send_to_opencode(user_msg, system_prompt, oc_session, ws)
if response_text:
history.append({"role": "assistant", "content": response_text})
# TTS runs after text is fully streamed — don't let it block
# the WebSocket. If it takes too long, user still has the text.
try:
await asyncio.wait_for(
_send_audio_with_voice(ws, response_text, voice),
timeout=20.0,
)
except asyncio.TimeoutError:
log.warning("TTS timed out for voice=%s, skipping audio", voice)
except Exception:
log.exception("TTS failed, continuing without audio")
except WebSocketDisconnect:
log.info("%s disconnected", user["email"])
except Exception:
log.exception("ws error")
try:
await ws.send_json({"role": "system", "content": "internal error", "done": True})
finally:
await ws.close()
async def _preview_stream(ws: WebSocket, user_msg: str) -> str:
"""Canned BT-like reply, chunked. UI-only mode. Returns full text."""
import asyncio
canned = (
f"Channel reads you clear, Pilot. You said: \"{user_msg}\". "
"No upstream model wired in this build — I am a placeholder voice "
"while the channel itself is being shaped. The mesh holds. "
"Standing by."
)
i = 0
step = 8
while i < len(canned):
chunk = canned[i:i + step]
await ws.send_json({"role": "assistant", "delta": chunk, "done": False})
i += step
await asyncio.sleep(0.06)
await ws.send_json({"role": "assistant", "delta": "", "done": True})
return canned
async def _send_audio(ws: WebSocket, text: str) -> None:
"""Synthesize text with the default voice + ship as data URL. No-op if TTS off."""
if not tts or not tts.available:
return
await _send_audio_with_voice(ws, text, tts.voice)
async def _send_audio_with_voice(ws: WebSocket, text: str, voice_id: str) -> None:
"""Synthesize text in a specific voice and ship as audio. Used post-calibration."""
if not TTS_ENABLED:
return
import base64
try:
# spin up a per-voice synthesizer (cheap — just object init)
per_voice = TTS(voice=voice_id) if voice_id != (tts.voice if tts else "") else tts
if not per_voice or not per_voice.available:
return
wav = await per_voice.synthesize(text)
if not wav:
return
await ws.send_json({
"role": "audio",
"mime": "audio/wav",
"data": base64.b64encode(wav).decode("ascii"),
})
except Exception:
log.exception("audio send failed")
# -------------------------------------------------------------------------- main
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="127.0.0.1", port=8765, reload=True)