fix(transport): rewrite to use opencode session API instead of nonexistent /v1/chat/completions
- POST /session/:id/message (synchronous) replaces broken SSE stream - Per-user opencode sessions with conversation history - Word-by-word typewriter streaming to WebSocket - TTS timeout wrapper prevents connection hangs
This commit is contained in:
+67
-49
@@ -60,16 +60,16 @@ PREVIEW_MODE = os.environ.get("PREVIEW_MODE", "").lower() in ("1", "true", "yes"
|
|||||||
|
|
||||||
# -------------------------------------------------------------------------- opencode transport
|
# -------------------------------------------------------------------------- opencode transport
|
||||||
|
|
||||||
# opencode API — replaces direct Anthropic calls.
|
# opencode API — uses the opencode server HTTP API (POST /session/:id/message).
|
||||||
# Exposes OpenAI-compatible /v1/chat/completions with SSE streaming.
|
|
||||||
OPENCODE_URL = os.environ.get("OPENCODE_URL", "http://sin:4096").rstrip("/")
|
OPENCODE_URL = os.environ.get("OPENCODE_URL", "http://sin:4096").rstrip("/")
|
||||||
OPENCODE_PASSWORD = os.environ.get("OPENCODE_PASSWORD", "")
|
OPENCODE_PASSWORD = os.environ.get("OPENCODE_PASSWORD", "")
|
||||||
|
|
||||||
# Sidecar: persona bind/unbind routes.
|
# Sidecar: persona bind/unbind routes.
|
||||||
SIDECAR_URL = os.environ.get("SIDECAR_URL", "http://sin:4098").rstrip("/")
|
SIDECAR_URL = os.environ.get("SIDECAR_URL", "http://sin:4098").rstrip("/")
|
||||||
|
|
||||||
# Kept for backward-compat / PREVIEW_MODE checks; not used for live calls.
|
# Model to use for opencode sessions.
|
||||||
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-5-20250929")
|
OPENCODE_MODEL = os.environ.get("OPENCODE_MODEL", "claude-sonnet-4-6")
|
||||||
|
OPENCODE_PROVIDER = os.environ.get("OPENCODE_PROVIDER", "anthropic")
|
||||||
|
|
||||||
if not PREVIEW_MODE and not OPENCODE_PASSWORD:
|
if not PREVIEW_MODE and not OPENCODE_PASSWORD:
|
||||||
raise RuntimeError("OPENCODE_PASSWORD not set (set PREVIEW_MODE=1 to bypass)")
|
raise RuntimeError("OPENCODE_PASSWORD not set (set PREVIEW_MODE=1 to bypass)")
|
||||||
@@ -250,61 +250,78 @@ def _opencode_auth() -> tuple[str, str]:
|
|||||||
return ("opencode", OPENCODE_PASSWORD)
|
return ("opencode", OPENCODE_PASSWORD)
|
||||||
|
|
||||||
|
|
||||||
async def _stream_opencode(
|
# Per-user opencode session cache: email → session_id.
|
||||||
messages: list[dict],
|
# Sessions are reused across WebSocket reconnects so conversation history persists.
|
||||||
|
_opencode_sessions: dict[str, str] = {}
|
||||||
|
|
||||||
|
|
||||||
|
async def _ensure_opencode_session(email: str) -> str:
|
||||||
|
"""Return an opencode session ID for this user, creating one if needed."""
|
||||||
|
if email in _opencode_sessions:
|
||||||
|
return _opencode_sessions[email]
|
||||||
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{OPENCODE_URL}/session",
|
||||||
|
auth=_opencode_auth(),
|
||||||
|
headers={"x-opencode-directory": "/home/madcat"},
|
||||||
|
json={"title": f"chat-saiden:{email}"},
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
sid = resp.json()["id"]
|
||||||
|
_opencode_sessions[email] = sid
|
||||||
|
log.info("created opencode session %s for %s", sid, email)
|
||||||
|
return sid
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_to_opencode(
|
||||||
|
user_text: str,
|
||||||
system_prompt: str,
|
system_prompt: str,
|
||||||
|
session_id: str,
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Stream a chat completion from opencode's OpenAI-compat endpoint.
|
"""Send a message to an opencode session and stream the response to the WebSocket.
|
||||||
|
|
||||||
Sends deltas to the WebSocket as they arrive.
|
Uses POST /session/:id/message (synchronous — waits for full LLM response).
|
||||||
Returns the full assembled response text.
|
Then streams the text word-by-word to the WebSocket for the typewriter effect.
|
||||||
Tool-use blocks embedded in the stream are executed and fed back as follow-up
|
Returns the full response text.
|
||||||
messages (single round of tool use, same as the old Anthropic path).
|
|
||||||
"""
|
"""
|
||||||
# opencode /v1/chat/completions expects the system message as the first message
|
|
||||||
oc_messages: list[dict] = [{"role": "system", "content": system_prompt}] + messages
|
|
||||||
|
|
||||||
full_response = ""
|
full_response = ""
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||||
async with client.stream(
|
resp = await client.post(
|
||||||
"POST",
|
f"{OPENCODE_URL}/session/{session_id}/message",
|
||||||
f"{OPENCODE_URL}/v1/chat/completions",
|
|
||||||
auth=_opencode_auth(),
|
auth=_opencode_auth(),
|
||||||
headers={"Accept": "text/event-stream"},
|
headers={"x-opencode-directory": "/home/madcat"},
|
||||||
json={
|
json={
|
||||||
"model": ANTHROPIC_MODEL,
|
"parts": [{"type": "text", "text": user_text}],
|
||||||
"messages": oc_messages,
|
"system": system_prompt,
|
||||||
"stream": True,
|
"model": {
|
||||||
"max_tokens": 4096,
|
"providerID": OPENCODE_PROVIDER,
|
||||||
|
"modelID": OPENCODE_MODEL,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
) as resp:
|
)
|
||||||
if resp.status_code != 200:
|
if resp.status_code != 200:
|
||||||
body = await resp.aread()
|
raise RuntimeError(
|
||||||
raise RuntimeError(
|
f"opencode HTTP {resp.status_code}: {resp.text[:200]}"
|
||||||
f"opencode HTTP {resp.status_code}: {body[:200].decode('utf-8', 'replace')}"
|
)
|
||||||
)
|
data = resp.json()
|
||||||
async for line in resp.aiter_lines():
|
# Extract text from response parts
|
||||||
if not line.startswith("data: "):
|
parts = data.get("parts", [])
|
||||||
continue
|
for part in parts:
|
||||||
payload = line[6:].strip()
|
if part.get("type") == "text":
|
||||||
if payload == "[DONE]":
|
full_response += part.get("text", "")
|
||||||
break
|
|
||||||
try:
|
# Stream text word-by-word to the WebSocket for typewriter effect
|
||||||
chunk = json.loads(payload)
|
if full_response:
|
||||||
except json.JSONDecodeError:
|
words = full_response.split(" ")
|
||||||
continue
|
for i, word in enumerate(words):
|
||||||
delta = (
|
delta = word if i == 0 else " " + word
|
||||||
chunk.get("choices", [{}])[0]
|
await ws.send_json({"role": "assistant", "delta": delta, "done": False})
|
||||||
.get("delta", {})
|
await asyncio.sleep(0.03) # ~30ms per word — fast typewriter
|
||||||
.get("content") or ""
|
|
||||||
)
|
|
||||||
if delta:
|
|
||||||
full_response += delta
|
|
||||||
await ws.send_json({"role": "assistant", "delta": delta, "done": False})
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error("opencode stream error: %s", e)
|
log.error("opencode error: %s", e)
|
||||||
await ws.send_json({
|
await ws.send_json({
|
||||||
"role": "system",
|
"role": "system",
|
||||||
"content": f"upstream error: {e} — try again",
|
"content": f"upstream error: {e} — try again",
|
||||||
@@ -807,8 +824,9 @@ async def chat_ws(ws: WebSocket) -> None:
|
|||||||
|
|
||||||
system_prompt = _pick_system_prompt(bound_slug, cart) + eems_context
|
system_prompt = _pick_system_prompt(bound_slug, cart) + eems_context
|
||||||
|
|
||||||
# Stream from opencode
|
# Send to opencode and stream response
|
||||||
response_text = await _stream_opencode(history, system_prompt, ws)
|
oc_session = await _ensure_opencode_session(user["email"])
|
||||||
|
response_text = await _send_to_opencode(user_msg, system_prompt, oc_session, ws)
|
||||||
|
|
||||||
if response_text:
|
if response_text:
|
||||||
history.append({"role": "assistant", "content": response_text})
|
history.append({"role": "assistant", "content": response_text})
|
||||||
|
|||||||
Reference in New Issue
Block a user