diff --git a/app/main.py b/app/main.py index 546e707..f7540f9 100644 --- a/app/main.py +++ b/app/main.py @@ -273,23 +273,27 @@ async def _ensure_opencode_session(email: str) -> str: return sid -async def _send_to_opencode( +async def _stream_opencode( user_text: str, system_prompt: str, session_id: str, ws: WebSocket, ) -> str: - """Send a message to an opencode session and stream the response to the WebSocket. + """Send a message to opencode and stream the response via SSE. - Uses POST /session/:id/message (synchronous — waits for full LLM response). - Then streams the text word-by-word to the WebSocket for the typewriter effect. - Returns the full response text. + 1. POST /session/:id/prompt_async — fire the prompt (returns 204 immediately) + 2. GET /event (SSE) — stream message.part.updated events with text deltas + + The SSE stream emits events for ALL sessions. We filter by our session ID + and track the assistant message parts to extract text deltas. """ full_response = "" + prev_text = "" # track cumulative text to compute deltas + try: - async with httpx.AsyncClient(timeout=120.0) as client: - resp = await client.post( - f"{OPENCODE_URL}/session/{session_id}/message", + async with httpx.AsyncClient(timeout=5.0) as fire_client: + resp = await fire_client.post( + f"{OPENCODE_URL}/session/{session_id}/prompt_async", auth=_opencode_auth(), headers={"x-opencode-directory": "/home/madcat"}, json={ @@ -301,27 +305,65 @@ async def _send_to_opencode( }, }, ) - if resp.status_code != 200: - raise RuntimeError( - f"opencode HTTP {resp.status_code}: {resp.text[:200]}" - ) - data = resp.json() - # Extract text from response parts - parts = data.get("parts", []) - for part in parts: - if part.get("type") == "text": - full_response += part.get("text", "") + if resp.status_code not in (200, 204): + raise RuntimeError(f"prompt_async HTTP {resp.status_code}: {resp.text[:200]}") - # Stream text word-by-word to the WebSocket for typewriter effect - if full_response: - words = full_response.split(" ") - for i, word in enumerate(words): - delta = word if i == 0 else " " + word - await ws.send_json({"role": "assistant", "delta": delta, "done": False}) - await asyncio.sleep(0.03) # ~30ms per word — fast typewriter + # Now stream SSE events until the assistant message is done + async with httpx.AsyncClient(timeout=180.0) as sse_client: + async with sse_client.stream( + "GET", + f"{OPENCODE_URL}/event", + auth=_opencode_auth(), + headers={"Accept": "text/event-stream", "x-opencode-directory": "/home/madcat"}, + ) as sse_resp: + if sse_resp.status_code != 200: + body = await sse_resp.aread() + raise RuntimeError(f"SSE HTTP {sse_resp.status_code}: {body[:200]}") + + async for line in sse_resp.aiter_lines(): + if not line.startswith("data: "): + continue + payload = line[6:].strip() + if not payload: + continue + try: + evt = json.loads(payload) + except json.JSONDecodeError: + continue + + evt_type = evt.get("type", "") + props = evt.get("properties", {}) + + # Filter to our session + if props.get("sessionID") != session_id: + continue + + # message.part.updated — contains text deltas + if evt_type == "message.part.updated": + part = props.get("part", {}) + if part.get("type") == "text": + new_text = part.get("text", "") + if len(new_text) > len(prev_text): + delta = new_text[len(prev_text):] + prev_text = new_text + full_response = new_text + await ws.send_json({"role": "assistant", "delta": delta, "done": False}) + + # message.updated with role=assistant + completed status = done + if evt_type == "message.updated": + msg = props.get("message", props) + role = msg.get("role", "") + status = msg.get("status", "") + if role == "assistant" and status in ("completed", "done", "error"): + break + + # session status idle = agent finished + if evt_type == "session.status": + if props.get("status") == "idle": + break except Exception as e: - log.error("opencode error: %s", e) + log.error("opencode stream error: %s", e) await ws.send_json({ "role": "system", "content": f"upstream error: {e} — try again", @@ -824,9 +866,9 @@ async def chat_ws(ws: WebSocket) -> None: system_prompt = _pick_system_prompt(bound_slug, cart) + eems_context - # Send to opencode and stream response + # Send to opencode and stream response via SSE oc_session = await _ensure_opencode_session(user["email"]) - response_text = await _send_to_opencode(user_msg, system_prompt, oc_session, ws) + response_text = await _stream_opencode(user_msg, system_prompt, oc_session, ws) if response_text: history.append({"role": "assistant", "content": response_text})