feat(transport): async prompt + SSE streaming from opencode events
- POST /session/:id/prompt_async fires the prompt (204 immediate) - GET /event SSE stream picks up message.part.updated with real text deltas - Filters events by session ID, computes delta from cumulative text - Breaks on message completed or session idle
This commit is contained in:
+69
-27
@@ -273,23 +273,27 @@ async def _ensure_opencode_session(email: str) -> str:
|
||||
return sid
|
||||
|
||||
|
||||
async def _send_to_opencode(
|
||||
async def _stream_opencode(
|
||||
user_text: str,
|
||||
system_prompt: str,
|
||||
session_id: str,
|
||||
ws: WebSocket,
|
||||
) -> str:
|
||||
"""Send a message to an opencode session and stream the response to the WebSocket.
|
||||
"""Send a message to opencode and stream the response via SSE.
|
||||
|
||||
Uses POST /session/:id/message (synchronous — waits for full LLM response).
|
||||
Then streams the text word-by-word to the WebSocket for the typewriter effect.
|
||||
Returns the full response text.
|
||||
1. POST /session/:id/prompt_async — fire the prompt (returns 204 immediately)
|
||||
2. GET /event (SSE) — stream message.part.updated events with text deltas
|
||||
|
||||
The SSE stream emits events for ALL sessions. We filter by our session ID
|
||||
and track the assistant message parts to extract text deltas.
|
||||
"""
|
||||
full_response = ""
|
||||
prev_text = "" # track cumulative text to compute deltas
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
resp = await client.post(
|
||||
f"{OPENCODE_URL}/session/{session_id}/message",
|
||||
async with httpx.AsyncClient(timeout=5.0) as fire_client:
|
||||
resp = await fire_client.post(
|
||||
f"{OPENCODE_URL}/session/{session_id}/prompt_async",
|
||||
auth=_opencode_auth(),
|
||||
headers={"x-opencode-directory": "/home/madcat"},
|
||||
json={
|
||||
@@ -301,27 +305,65 @@ async def _send_to_opencode(
|
||||
},
|
||||
},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
raise RuntimeError(
|
||||
f"opencode HTTP {resp.status_code}: {resp.text[:200]}"
|
||||
)
|
||||
data = resp.json()
|
||||
# Extract text from response parts
|
||||
parts = data.get("parts", [])
|
||||
for part in parts:
|
||||
if part.get("type") == "text":
|
||||
full_response += part.get("text", "")
|
||||
if resp.status_code not in (200, 204):
|
||||
raise RuntimeError(f"prompt_async HTTP {resp.status_code}: {resp.text[:200]}")
|
||||
|
||||
# Stream text word-by-word to the WebSocket for typewriter effect
|
||||
if full_response:
|
||||
words = full_response.split(" ")
|
||||
for i, word in enumerate(words):
|
||||
delta = word if i == 0 else " " + word
|
||||
# Now stream SSE events until the assistant message is done
|
||||
async with httpx.AsyncClient(timeout=180.0) as sse_client:
|
||||
async with sse_client.stream(
|
||||
"GET",
|
||||
f"{OPENCODE_URL}/event",
|
||||
auth=_opencode_auth(),
|
||||
headers={"Accept": "text/event-stream", "x-opencode-directory": "/home/madcat"},
|
||||
) as sse_resp:
|
||||
if sse_resp.status_code != 200:
|
||||
body = await sse_resp.aread()
|
||||
raise RuntimeError(f"SSE HTTP {sse_resp.status_code}: {body[:200]}")
|
||||
|
||||
async for line in sse_resp.aiter_lines():
|
||||
if not line.startswith("data: "):
|
||||
continue
|
||||
payload = line[6:].strip()
|
||||
if not payload:
|
||||
continue
|
||||
try:
|
||||
evt = json.loads(payload)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
evt_type = evt.get("type", "")
|
||||
props = evt.get("properties", {})
|
||||
|
||||
# Filter to our session
|
||||
if props.get("sessionID") != session_id:
|
||||
continue
|
||||
|
||||
# message.part.updated — contains text deltas
|
||||
if evt_type == "message.part.updated":
|
||||
part = props.get("part", {})
|
||||
if part.get("type") == "text":
|
||||
new_text = part.get("text", "")
|
||||
if len(new_text) > len(prev_text):
|
||||
delta = new_text[len(prev_text):]
|
||||
prev_text = new_text
|
||||
full_response = new_text
|
||||
await ws.send_json({"role": "assistant", "delta": delta, "done": False})
|
||||
await asyncio.sleep(0.03) # ~30ms per word — fast typewriter
|
||||
|
||||
# message.updated with role=assistant + completed status = done
|
||||
if evt_type == "message.updated":
|
||||
msg = props.get("message", props)
|
||||
role = msg.get("role", "")
|
||||
status = msg.get("status", "")
|
||||
if role == "assistant" and status in ("completed", "done", "error"):
|
||||
break
|
||||
|
||||
# session status idle = agent finished
|
||||
if evt_type == "session.status":
|
||||
if props.get("status") == "idle":
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
log.error("opencode error: %s", e)
|
||||
log.error("opencode stream error: %s", e)
|
||||
await ws.send_json({
|
||||
"role": "system",
|
||||
"content": f"upstream error: {e} — try again",
|
||||
@@ -824,9 +866,9 @@ async def chat_ws(ws: WebSocket) -> None:
|
||||
|
||||
system_prompt = _pick_system_prompt(bound_slug, cart) + eems_context
|
||||
|
||||
# Send to opencode and stream response
|
||||
# Send to opencode and stream response via SSE
|
||||
oc_session = await _ensure_opencode_session(user["email"])
|
||||
response_text = await _send_to_opencode(user_msg, system_prompt, oc_session, ws)
|
||||
response_text = await _stream_opencode(user_msg, system_prompt, oc_session, ws)
|
||||
|
||||
if response_text:
|
||||
history.append({"role": "assistant", "content": response_text})
|
||||
|
||||
Reference in New Issue
Block a user