"""Subprocess wrapper around `marauder memory` CLI. Provides recall + store. Marauder's memory CLI returns table output by default and may or may not honour --json depending on subcommand. We parse what we get. Shared EEMS namespace: chat.saiden.dev reads and writes the same memories BT-on-CLI uses. One Pilot, one memory. """ from __future__ import annotations import asyncio import json import logging import re from dataclasses import dataclass from typing import Any log = logging.getLogger("chat-saiden.memory") @dataclass class Memory: id: int | None subject: str content: str classification: str = "standard" async def _run(*args: str, timeout: float = 10.0) -> tuple[int, str, str]: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() return 124, "", "timeout" return proc.returncode or 0, out.decode("utf-8", "replace"), err.decode("utf-8", "replace") def _try_json(text: str) -> Any: """Try to extract JSON from output that might be mixed with log lines.""" if not text: return None # try the whole thing first try: return json.loads(text) except Exception: pass # find JSON object/array boundaries for opener, closer in [("{", "}"), ("[", "]")]: first = text.find(opener) last = text.rfind(closer) if first != -1 and last > first: try: return json.loads(text[first:last + 1]) except Exception: continue return None # Header line: "#3933 (0.8690) user.identity.nco-preference-..." _RECALL_HEADER = re.compile(r"^#(\d+)\s+\(([\d.]+)\)\s+(\S.*)$") async def recall(query: str, limit: int = 5, subject: str | None = None) -> list[Memory]: """Semantic recall. Returns up to `limit` memories ordered by similarity. `--json` is documented but not implemented for `marauder memory recall` in current builds, so we parse the table-ish text format instead. """ args = ["marauder", "memory", "recall", query, "--limit", str(limit)] if subject: args.extend(["--subject", subject]) code, out, err = await _run(*args) if code != 0: log.warning("memory recall failed (rc=%s): %s", code, err[:200]) return [] memories: list[Memory] = [] current: Memory | None = None body_lines: list[str] = [] def flush(): nonlocal current, body_lines if current is not None: current.content = "\n".join(body_lines).strip() memories.append(current) current = None body_lines = [] for raw in out.splitlines(): line = raw.rstrip() # skip embedding/sqlite log lines (ISO timestamps from tracing) if re.match(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", line): continue m = _RECALL_HEADER.match(line) if m: flush() current = Memory(id=int(m.group(1)), subject=m.group(3).strip(), content="") continue if current is not None and line.strip(): body_lines.append(line.strip()) elif current is not None and not line.strip() and body_lines: # blank line within body — keep separator body_lines.append("") flush() return memories _STORE_RX = re.compile(r"Stored memory #(\d+)") async def store(subject: str, content: str, classification: str | None = None) -> int | None: """Store a memory. Returns memory ID on success. Output is plain text 'Stored memory #NNNN ...'; we regex it.""" args = ["marauder", "memory", "store", subject, content] if classification: args.extend(["--classification", classification]) code, out, err = await _run(*args, timeout=20.0) if code != 0: log.warning("memory store failed (rc=%s): %s", code, err[:200]) return None m = _STORE_RX.search(out + " " + err) if m: return int(m.group(1)) log.debug("memory store output: %r / %r", out[:200], err[:200]) return None # ----------------------------------------------------------------- context shaping async def operator_context(operator_email: str, persona_name: str) -> str: """Pull a tight context block of memories relevant to the operator. Used to seed the system prompt at session start so the cart speaks with continuity.""" queries: list[tuple[str, str | None]] = [ # who the operator is ("operator preferences and self-description", "self"), # what they're working on (f"recent {persona_name} interactions and projects", None), # active doctrine that affects how the cart should behave ("doctrine that shapes how I talk to the pilot", "doctrine"), ] # Fire all recalls in parallel — each is a separate marauder subprocess results = await asyncio.gather( *[recall(q, limit=3, subject=subj) for q, subj in queries], return_exceptions=True, ) blocks: list[str] = [] for (q, _), memories in zip(queries, results): if isinstance(memories, Exception): continue for m in memories: if not m.content: continue blocks.append(f"— ({m.subject}) {m.content.strip()[:600]}") if not blocks: return "" return ( "\n\n## Pilot context (recalled from EEMS)\n" "Use these as background only. Don't recite. Refer naturally if useful.\n\n" + "\n".join(blocks[:8]) # cap so the prompt doesn't bloat )