diff --git a/app/main.py b/app/main.py index fa44084..90fd052 100644 --- a/app/main.py +++ b/app/main.py @@ -286,13 +286,16 @@ async def _stream_opencode( """Send a message to opencode and stream the response via SSE. 1. POST /session/:id/prompt_async — fire the prompt (returns 204 immediately) - 2. GET /event (SSE) — stream message.part.updated events with text deltas + 2. GET /event (SSE) — stream message.part.delta events with text tokens - The SSE stream emits events for ALL sessions. We filter by our session ID - and track the assistant message parts to extract text deltas. + The SSE stream emits events for ALL sessions. We filter by our session ID. + + Event types we care about: + message.part.delta — incremental text token (props.delta) + session.status — props.status.type == "idle" means agent finished + session.idle — agent finished (belt-and-suspenders) """ full_response = "" - prev_text = "" # track cumulative text to compute deltas try: async with httpx.AsyncClient(timeout=5.0) as fire_client: @@ -312,8 +315,8 @@ async def _stream_opencode( if resp.status_code not in (200, 204): raise RuntimeError(f"prompt_async HTTP {resp.status_code}: {resp.text[:200]}") - # Now stream SSE events until the assistant message is done - async with httpx.AsyncClient(timeout=180.0) as sse_client: + # Stream SSE events until the agent goes idle (max 120s) + async with httpx.AsyncClient(timeout=httpx.Timeout(connect=10.0, read=120.0, write=5.0, pool=5.0)) as sse_client: async with sse_client.stream( "GET", f"{OPENCODE_URL}/event", @@ -342,29 +345,25 @@ async def _stream_opencode( if props.get("sessionID") != session_id: continue - # message.part.updated — contains text deltas - if evt_type == "message.part.updated": - part = props.get("part", {}) - if part.get("type") == "text": - new_text = part.get("text", "") - if len(new_text) > len(prev_text): - delta = new_text[len(prev_text):] - prev_text = new_text - full_response = new_text + # message.part.delta — real-time text token + if evt_type == "message.part.delta": + if props.get("field") == "text": + delta = props.get("delta", "") + if delta: + full_response += delta await ws.send_json({"role": "assistant", "delta": delta, "done": False}) - # message.updated with role=assistant + completed status = done - if evt_type == "message.updated": - msg = props.get("message", props) - role = msg.get("role", "") - status = msg.get("status", "") - if role == "assistant" and status in ("completed", "done", "error"): + # session.status — status is an object {"type": "idle"|"busy"} + elif evt_type == "session.status": + status = props.get("status", {}) + if isinstance(status, dict) and status.get("type") == "idle": + break + elif status == "idle": break - # session status idle = agent finished - if evt_type == "session.status": - if props.get("status") == "idle": - break + # session.idle — direct idle signal + elif evt_type == "session.idle": + break except Exception as e: log.error("opencode stream error: %s", e)