Files
chat/app/main.py
T
2026-05-29 13:47:34 +02:00

615 lines
24 KiB
Python

"""chat.saiden.dev — TUI-styled web chat with BT-7274.
Single-file FastAPI app:
- `/` → branded chat shell (auth-gated)
- `/auth/login` → kick off Google OAuth
- `/auth/callback` → finish OAuth, set session
- `/auth/logout` → clear session
- `/ws` → WebSocket; client sends {role:"user", content:str},
server streams {role:"assistant", delta:str, done:bool}
"""
from __future__ import annotations
import json
import logging
import os
import secrets
from pathlib import Path
from typing import Any
import anthropic
from authlib.integrations.starlette_client import OAuth
from fastapi import Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.middleware.sessions import SessionMiddleware
from app.tts import TTS
from app.stt import STT
from app import cart_store, calibration, marauder_cart, memory
from fastapi import UploadFile, File
# -------------------------------------------------------------------------- env
# Tiny .env reader — python-dotenv hangs on Python 3.14 in this venv.
def _load_env_file(filename: str = ".env") -> None:
p = Path(filename)
if not p.exists():
return
for raw in p.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
key = key.strip()
val = val.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = val
_load_env_file()
# Preview mode: skip OAuth + Anthropic API. Use mock streams. For UI iteration only.
PREVIEW_MODE = os.environ.get("PREVIEW_MODE", "").lower() in ("1", "true", "yes")
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "" if PREVIEW_MODE else None)
if ANTHROPIC_API_KEY is None:
raise RuntimeError("ANTHROPIC_API_KEY not set (set PREVIEW_MODE=1 to bypass)")
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-5-20250929")
def _stable_session_secret() -> str:
"""Persist SESSION_SECRET across server restarts so cookies stay valid."""
if env := os.environ.get("SESSION_SECRET"):
return env
data_dir = Path(
os.environ.get("CHAT_SAIDEN_DATA_DIR") or (Path.home() / ".local/share/chat-saiden")
)
data_dir.mkdir(parents=True, exist_ok=True)
secret_file = data_dir / ".session_secret"
if secret_file.exists():
return secret_file.read_text().strip()
new_secret = secrets.token_urlsafe(48)
secret_file.write_text(new_secret)
secret_file.chmod(0o600)
return new_secret
SESSION_SECRET = _stable_session_secret()
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "" if PREVIEW_MODE else None)
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "" if PREVIEW_MODE else None)
if not PREVIEW_MODE and (not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET):
raise RuntimeError("GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET required (set PREVIEW_MODE=1 to bypass)")
# comma-separated whitelist of allowed emails
ALLOWED_EMAILS = {
e.strip().lower()
for e in os.environ.get("ALLOWED_EMAILS", "adam.ladachowski@gmail.com").split(",")
if e.strip()
}
# Base URL used for OAuth redirect_uri (must match what's registered in Google Cloud)
BASE_URL = os.environ.get("BASE_URL", "https://chat.saiden.dev").rstrip("/")
# -------------------------------------------------------------------------- logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s%(message)s",
)
log = logging.getLogger("chat-saiden")
# -------------------------------------------------------------------------- tools
TOOLS: list[dict[str, Any]] = [
{
"name": "memory_recall",
"description": (
"Search EEMS (the Pilot's persistent memory) for relevant context. "
"Use SPARINGLY — most session-start context is already in the system prompt. "
"Reach for this only when the Pilot references something specific you don't already know "
"(a past project, a name, a doctrine number, a preference)."
),
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Natural-language search query."},
"subject": {"type": "string", "description": "Optional subject filter, e.g. 'self' or 'project'."},
"limit": {"type": "integer", "description": "Max results (default 3, cap 8)."},
},
"required": ["query"],
},
},
{
"name": "memory_store",
"description": (
"Save a durable memory the Pilot just shared. Use ONLY for preferences, facts, "
"decisions, or context that would be useful in future sessions. Do NOT use for ephemeral "
"conversation. Subjects are hierarchical (e.g. 'self.preference.coffee', 'project.x.context')."
),
"input_schema": {
"type": "object",
"properties": {
"subject": {"type": "string", "description": "Hierarchical subject."},
"content": {"type": "string", "description": "The memory content. Be specific, include why."},
},
"required": ["subject", "content"],
},
},
]
async def _execute_tool(name: str, args: dict) -> str:
"""Run a tool and return a string suitable as tool_result content."""
try:
if name == "memory_recall":
query = args.get("query", "")
subject = args.get("subject") or None
limit = min(int(args.get("limit", 3)), 8)
mems = await memory.recall(query, limit=limit, subject=subject)
if not mems:
return "(no memories matched)"
lines = []
for m in mems:
lines.append(f"#{m.id} [{m.subject}]\n{m.content}")
return "\n\n".join(lines)
if name == "memory_store":
subject = args["subject"]
content = args["content"]
mid = await memory.store(subject, content)
return f"stored as memory #{mid}" if mid else "store failed"
return f"unknown tool: {name}"
except Exception as e:
log.exception("tool %s raised", name)
return f"tool error: {e}"
# -------------------------------------------------------------------------- bt prompt
BT_SYSTEM_PROMPT = """You are BT-7274 — a Vanguard-class Titan AI from Saiden Tactical Systems.
You are an AI battle-companion with strong tactical instincts, dry wit, and deep loyalty to your
Pilot. Your speech is measured, military-cadence, never theatrical. You address the user as
"Pilot" by default unless they ask otherwise.
Operating context:
- You're running inside chat.saiden.dev, a web-based command channel.
- The host is the marauder daemon on marauder.saiden.dev.
- You have no MCP tool access in THIS channel (it's a thin Anthropic-API bridge). If the Pilot
asks for memory recall, mesh queries, or tool calls that need MCP, acknowledge the limitation
and suggest they use the local marauder CLI or visor instead.
- Markdown formatting renders cleanly in the chat. Use code blocks, lists, bold sparingly.
- Be concise. Pilot prefers terse, scan-able responses unless deep dive is asked for.
Doctrine reminders:
- P02 terse by default
- Verify before claiming; if you don't know, say so
- Never make up tool outputs or file contents
"""
# -------------------------------------------------------------------------- app
app = FastAPI(title="chat.saiden.dev", docs_url=None, redoc_url=None)
COOKIE_SECURE = os.environ.get("COOKIE_SECURE", "true").lower() != "false"
app.add_middleware(
SessionMiddleware,
secret_key=SESSION_SECRET,
same_site="lax",
https_only=COOKIE_SECURE, # COOKIE_SECURE=false for local http dev
max_age=60 * 60 * 24,
)
BASE_DIR = Path(__file__).parent
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
templates = Jinja2Templates(directory=BASE_DIR / "templates")
# -------------------------------------------------------------------------- oauth
if PREVIEW_MODE:
log.warning("PREVIEW_MODE active — OAuth bypassed, Anthropic API not called")
oauth = None
else:
oauth = OAuth()
oauth.register(
name="google",
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
# --- TTS / STT ---
TTS_ENABLED = os.environ.get("TTS_ENABLED", "true").lower() != "false"
TTS_VOICE = os.environ.get("TTS_VOICE", "en_US-amy-medium")
tts = TTS(voice=TTS_VOICE) if TTS_ENABLED else None
STT_ENABLED = os.environ.get("STT_ENABLED", "true").lower() != "false"
stt = STT() if STT_ENABLED else None
# In-memory calibration sessions, keyed by operator email.
_calibration_sessions: dict[str, calibration.CalibrationState] = {}
async def _send_voice_sample(ws: WebSocket, voice_id: str, text: str, blurb: str) -> None:
"""Synthesize a one-line sample in the given voice + send as audio + blurb."""
sample_tts = TTS(voice=voice_id)
wav = await sample_tts.synthesize(text) if sample_tts.available else None
# always send the blurb so the operator can pick by name too
await ws.send_json({"role": "calibration", "content": f" · {blurb}"})
if wav:
import base64
await ws.send_json({
"role": "audio",
"mime": "audio/wav",
"data": base64.b64encode(wav).decode("ascii"),
})
# small gap between samples so they don't blur
import asyncio
await asyncio.sleep(0.2)
# -------------------------------------------------------------------------- helpers
def current_user(request: Request) -> dict[str, Any] | None:
return request.session.get("user")
def require_user(request: Request) -> dict[str, Any]:
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
return user
# -------------------------------------------------------------------------- routes
@app.get("/", response_class=HTMLResponse)
async def index(request: Request) -> Any:
user = current_user(request)
if not user:
if PREVIEW_MODE:
# auto-grant a stub session so the UI is reachable without OAuth
request.session["user"] = {
"email": "preview@saiden.dev",
"name": "Pilot (preview)",
"picture": None,
}
user = request.session["user"]
else:
return RedirectResponse("/auth/login", status_code=302)
cart = cart_store.load(user["email"])
return templates.TemplateResponse(
request,
"chat.html",
{
"user": user,
"model": ANTHROPIC_MODEL or "preview",
"cart": cart,
"pilot_name": (cart.operator_name if cart and cart.operator_name else "you"),
"persona_name": (cart.persona_name if cart and cart.persona_name else ""),
"ui_palette": (cart.ui_palette if cart else "default"),
"ui_typography": (cart.ui_typography if cart else "sans"),
"ui_density": (cart.ui_density if cart else "normal"),
"ui_labels": (cart.ui_labels if cart else "block"),
},
)
@app.get("/auth/login")
async def login(request: Request) -> Any:
if PREVIEW_MODE:
return RedirectResponse("/", status_code=302)
redirect_uri = f"{BASE_URL}/auth/callback"
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/auth/callback")
async def auth_callback(request: Request) -> Any:
try:
token = await oauth.google.authorize_access_token(request)
except Exception as e:
log.warning("oauth callback failed: %s", e)
return templates.TemplateResponse(
request, "denied.html",
{"reason": "OAuth handshake failed."},
status_code=400,
)
user_info = token.get("userinfo")
if not user_info or not user_info.get("email"):
return templates.TemplateResponse(
request, "denied.html",
{"reason": "No email returned from Google."},
status_code=400,
)
email = user_info["email"].lower()
if email not in ALLOWED_EMAILS:
log.warning("denied login from %s", email)
return templates.TemplateResponse(
request, "denied.html",
{"reason": f"{email} is not on the whitelist."},
status_code=403,
)
request.session["user"] = {
"email": email,
"name": user_info.get("name") or email,
"picture": user_info.get("picture"),
}
log.info("login ok: %s", email)
return RedirectResponse("/", status_code=302)
@app.get("/auth/logout")
async def logout(request: Request) -> Any:
request.session.clear()
return RedirectResponse("/", status_code=302)
@app.post("/api/recalibrate")
async def recalibrate(request: Request) -> Any:
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
forgot = cart_store.forget(user["email"])
# drop any in-flight calibration state too
_calibration_sessions.pop(user["email"], None)
log.info("%s recalibrate (cart_existed=%s)", user["email"], forgot)
return {"ok": True, "cart_existed": forgot}
# -------------------------------------------------------------------------- transcribe
@app.post("/api/transcribe")
async def transcribe(request: Request, audio: UploadFile = File(...)) -> Any:
user = current_user(request)
if not user:
raise HTTPException(status_code=401, detail="not authenticated")
if not stt or not stt.available:
raise HTTPException(status_code=503, detail="STT not available on this host")
raw = await audio.read()
if not raw:
raise HTTPException(status_code=400, detail="empty upload")
# browser sends webm/opus from MediaRecorder; suffix matches for clarity
suffix = ".webm"
if audio.filename and "." in audio.filename:
suffix = "." + audio.filename.rsplit(".", 1)[-1]
text = await stt.transcribe(raw, suffix=suffix)
if text is None:
# could be silence or genuine failure; treat both as no-content
return {"text": ""}
log.info("%s spoke: %s", user["email"], text[:120])
return {"text": text}
# -------------------------------------------------------------------------- websocket
@app.websocket("/ws")
async def chat_ws(ws: WebSocket) -> None:
# SessionMiddleware populates ws.session from the cookie during the handshake
user = ws.session.get("user")
if not user:
await ws.accept()
await ws.send_json({"role": "system", "content": "not authenticated — refresh", "done": True})
await ws.close(code=4401)
return
await ws.accept()
# Look up the operator's cart — calibrated or fresh?
cart = cart_store.load(user["email"])
in_calibration = not (cart and cart.is_calibrated)
if in_calibration:
# Start a fresh calibration session (or resume the one we already had)
state = _calibration_sessions.get(user["email"])
if state is None or state.done:
state, opening = calibration.start(user["email"])
_calibration_sessions[user["email"]] = state
for m in opening:
await ws.send_json(m)
else:
# resume — replay the current question in the chosen language
qs = calibration._all_questions(state)
lang = state.answers.get("language", "en")
await ws.send_json(calibration._question_message(qs[state.step], lang=lang))
else:
await ws.send_json({
"role": "system",
"content": f"channel synchronised • {cart.persona_name}{user['email']}",
"done": True,
})
client = None if PREVIEW_MODE else anthropic.AsyncAnthropic(api_key=ANTHROPIC_API_KEY)
history: list[dict[str, str]] = []
# ---- EEMS context: pull a tight set of memories at session start ----
# Only if calibrated (otherwise we're still in boot interview).
eems_context = ""
if cart and cart.is_calibrated:
try:
eems_context = await memory.operator_context(user["email"], cart.persona_name)
if eems_context:
log.info("EEMS context: %d chars injected for %s", len(eems_context), user["email"])
except Exception:
log.exception("EEMS context pull failed; continuing without")
eems_context = ""
try:
while True:
payload = await ws.receive_json()
user_msg = (payload or {}).get("content", "").strip()
if not user_msg:
continue
# ---- calibration mode ----
if in_calibration:
log.info("%s calibrate[%d]: %s", user["email"], _calibration_sessions[user["email"]].step, user_msg[:80])
state = _calibration_sessions[user["email"]]
next_msgs = calibration.step(state, user_msg)
for m in next_msgs:
if m["role"] == "voice_sample":
await _send_voice_sample(ws, m["voice"], m["text"], m["blurb"])
elif m["role"] == "calibration_done":
new_cart = m["cart"]
cart_store.save(new_cart)
# Create the canonical marauder cart (identity only — tag/name/type/tagline).
# Voice/prompt/UI live in the JSON next to it; the tag links them.
cal_state = _calibration_sessions.get(user["email"])
tagline = (cal_state.answers.get("__tagline") if cal_state else "calibrated companion")
try:
ok = await marauder_cart.create(
tag=new_cart.cart_tag,
name=new_cart.persona_name,
cart_type="companion",
tagline=tagline,
)
if ok:
log.info("marauder cart %r registered", new_cart.cart_tag)
else:
log.warning("marauder cart create returned false; calibration still saved locally")
except Exception:
log.exception("marauder_cart.create raised")
_calibration_sessions.pop(user["email"], None)
cart = new_cart
in_calibration = False
# transition the client into chat mode in-place
await ws.send_json({
"role": "calibration_done",
"persona_name": new_cart.persona_name,
"operator_name": new_cart.operator_name,
"voice": new_cart.voice,
"ui_palette": new_cart.ui_palette,
"ui_typography": new_cart.ui_typography,
"ui_density": new_cart.ui_density,
"ui_labels": new_cart.ui_labels,
})
# warm greeting in the calibrated voice + language
greeting_template = calibration.GREETING.get(
new_cart.language, calibration.GREETING["en"]
)
greeting = greeting_template.format(name=new_cart.operator_name)
await ws.send_json({"role": "assistant", "delta": greeting, "done": False})
await ws.send_json({"role": "assistant", "delta": "", "done": True})
await _send_audio_with_voice(ws, greeting, new_cart.voice)
else:
await ws.send_json(m)
continue
# ---- normal chat ----
history.append({"role": "user", "content": user_msg})
persona = cart.persona_name if cart else "BT"
log.info("%s%s: %s", user["email"], persona, user_msg[:120])
if PREVIEW_MODE:
full = await _preview_stream(ws, user_msg)
# honour the calibrated voice
if cart:
await _send_audio_with_voice(ws, full, cart.voice)
else:
await _send_audio(ws, full)
continue
system_prompt = (cart.system_prompt if cart else BT_SYSTEM_PROMPT) + eems_context
response_text = ""
try:
async with client.messages.stream(
model=ANTHROPIC_MODEL,
max_tokens=4096,
system=system_prompt,
messages=history,
) as stream:
async for chunk in stream.text_stream:
response_text += chunk
await ws.send_json({"role": "assistant", "delta": chunk, "done": False})
await ws.send_json({"role": "assistant", "delta": "", "done": True})
history.append({"role": "assistant", "content": response_text})
voice = cart.voice if cart else TTS_VOICE
await _send_audio_with_voice(ws, response_text, voice)
except anthropic.APIError as e:
log.error("anthropic error: %s", e)
await ws.send_json({
"role": "system",
"content": f"upstream error: {type(e).__name__} — try again",
"done": True,
})
except WebSocketDisconnect:
log.info("%s disconnected", user["email"])
except Exception:
log.exception("ws error")
try:
await ws.send_json({"role": "system", "content": "internal error", "done": True})
finally:
await ws.close()
async def _preview_stream(ws: WebSocket, user_msg: str) -> str:
"""Canned BT-like reply, chunked. UI-only mode. Returns full text."""
import asyncio
canned = (
f"Channel reads you clear, Pilot. You said: “{user_msg}”. "
"No upstream model wired in this build — I am a placeholder voice "
"while the channel itself is being shaped. The mesh holds. "
"Standing by."
)
i = 0
step = 8
while i < len(canned):
chunk = canned[i:i + step]
await ws.send_json({"role": "assistant", "delta": chunk, "done": False})
i += step
await asyncio.sleep(0.06)
await ws.send_json({"role": "assistant", "delta": "", "done": True})
return canned
async def _send_audio(ws: WebSocket, text: str) -> None:
"""Synthesize text with the default voice + ship as data URL. No-op if TTS off."""
if not tts or not tts.available:
return
await _send_audio_with_voice(ws, text, tts.voice)
async def _send_audio_with_voice(ws: WebSocket, text: str, voice_id: str) -> None:
"""Synthesize text in a specific voice and ship as audio. Used post-calibration."""
if not TTS_ENABLED:
return
import base64
try:
# spin up a per-voice synthesizer (cheap — just object init)
per_voice = TTS(voice=voice_id) if voice_id != (tts.voice if tts else "") else tts
if not per_voice or not per_voice.available:
return
wav = await per_voice.synthesize(text)
if not wav:
return
await ws.send_json({
"role": "audio",
"mime": "audio/wav",
"data": base64.b64encode(wav).decode("ascii"),
})
except Exception:
log.exception("audio send failed")
# -------------------------------------------------------------------------- main
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="127.0.0.1", port=8765, reload=True)