fix: SSE event parsing — use message.part.delta + correct session.status format

This commit is contained in:
marauder-actual
2026-05-29 18:26:31 +02:00
parent d2638e0650
commit fa018f380c
+23 -24
View File
@@ -286,13 +286,16 @@ async def _stream_opencode(
"""Send a message to opencode and stream the response via SSE. """Send a message to opencode and stream the response via SSE.
1. POST /session/:id/prompt_async — fire the prompt (returns 204 immediately) 1. POST /session/:id/prompt_async — fire the prompt (returns 204 immediately)
2. GET /event (SSE) — stream message.part.updated events with text deltas 2. GET /event (SSE) — stream message.part.delta events with text tokens
The SSE stream emits events for ALL sessions. We filter by our session ID The SSE stream emits events for ALL sessions. We filter by our session ID.
and track the assistant message parts to extract text deltas.
Event types we care about:
message.part.delta — incremental text token (props.delta)
session.status — props.status.type == "idle" means agent finished
session.idle — agent finished (belt-and-suspenders)
""" """
full_response = "" full_response = ""
prev_text = "" # track cumulative text to compute deltas
try: try:
async with httpx.AsyncClient(timeout=5.0) as fire_client: async with httpx.AsyncClient(timeout=5.0) as fire_client:
@@ -312,8 +315,8 @@ async def _stream_opencode(
if resp.status_code not in (200, 204): if resp.status_code not in (200, 204):
raise RuntimeError(f"prompt_async HTTP {resp.status_code}: {resp.text[:200]}") raise RuntimeError(f"prompt_async HTTP {resp.status_code}: {resp.text[:200]}")
# Now stream SSE events until the assistant message is done # Stream SSE events until the agent goes idle (max 120s)
async with httpx.AsyncClient(timeout=180.0) as sse_client: async with httpx.AsyncClient(timeout=httpx.Timeout(connect=10.0, read=120.0, write=5.0, pool=5.0)) as sse_client:
async with sse_client.stream( async with sse_client.stream(
"GET", "GET",
f"{OPENCODE_URL}/event", f"{OPENCODE_URL}/event",
@@ -342,28 +345,24 @@ async def _stream_opencode(
if props.get("sessionID") != session_id: if props.get("sessionID") != session_id:
continue continue
# message.part.updated — contains text deltas # message.part.delta — real-time text token
if evt_type == "message.part.updated": if evt_type == "message.part.delta":
part = props.get("part", {}) if props.get("field") == "text":
if part.get("type") == "text": delta = props.get("delta", "")
new_text = part.get("text", "") if delta:
if len(new_text) > len(prev_text): full_response += delta
delta = new_text[len(prev_text):]
prev_text = new_text
full_response = new_text
await ws.send_json({"role": "assistant", "delta": delta, "done": False}) await ws.send_json({"role": "assistant", "delta": delta, "done": False})
# message.updated with role=assistant + completed status = done # session.status — status is an object {"type": "idle"|"busy"}
if evt_type == "message.updated": elif evt_type == "session.status":
msg = props.get("message", props) status = props.get("status", {})
role = msg.get("role", "") if isinstance(status, dict) and status.get("type") == "idle":
status = msg.get("status", "") break
if role == "assistant" and status in ("completed", "done", "error"): elif status == "idle":
break break
# session status idle = agent finished # session.idle — direct idle signal
if evt_type == "session.status": elif evt_type == "session.idle":
if props.get("status") == "idle":
break break
except Exception as e: except Exception as e: