Most "streaming" LLM chatbots stream just the text. The model says "I'll search for that…" and then you wait 6 seconds while the tokens dribble in. The actual search? Hidden. The 3 scrapes it did to fact-check? Hidden. You're staring at a typing indicator that doesn't tell you anything about what's actually taking time.
I just built a chatbot where every tool call surfaces as a step in real time — 🔍 search_engine
, 📄 scrape_as_markdown
, 📄 scrape_as_markdown
— while the response streams token by token afterwards. The user sees the agent's chain-of-thought as it happens, not as a postmortem.
The trick is that you have to stream three different things, and each layer needs to know what to do with each kind of event. Here's the architecture.
The agent runner (in my case, fi-runner
wrapping the Claude Agent SDK) emits events of three types as they happen:
async for event in runner.run_stream(user_message, session_id=sid):
Three types because they mean three different things visually:
tool_call
text
result
text
deltas because post-turn guards (anti-drift, PHI redaction) may have rewritten the response.That last point is a footgun the spec doesn't yell at you about. We'll come back to it.
Server-Sent Events (SSE) is the right transport here — unidirectional, text-based, survives proxies, browsers handle reconnect natively. FastAPI handles it with StreamingResponse
:
import json
from fastapi.responses import StreamingResponse
def _sse(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data)}\n\n"
@app.post("/chat/stream")
async def chat_stream_endpoint(req: ChatRequest) -> StreamingResponse:
async def gen():
yield _sse("open", {"session_id": req.session_id})
try:
async with asyncio.timeout(180):
async for event in chat_stream(req.message, session_id=req.session_id):
t = event.get("type")
if t == "tool_call":
yield _sse("tool_call", tool_call_to_wire(event["tool"]))
elif t == "text":
yield _sse("text", {"delta": event["text"]})
elif t == "result":
yield _sse("result", result_to_wire(event["result"]))
except asyncio.CancelledError:
raise # client closed tab — propagate so the LLM call cancels
except TimeoutError:
yield _sse("error", {"kind": "TimeoutError", "message": "turn exceeded 180s"})
except Exception as exc:
yield _sse("error", {"kind": type(exc).__name__, "message": str(exc)})
finally:
yield _sse("done", {})
return StreamingResponse(
gen(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # nginx: don't buffer
"Connection": "keep-alive",
},
)
Three things in here are non-obvious:
The exception ladder. except CancelledError: raise
MUST come before except Exception
. When the user closes the tab, FastAPI propagates CancelledError
into the generator — if you swallow it as a "normal error" and yield an error frame, you (a) write to a socket that's already closed, and (b) more importantly, the LLM call upstream may not actually cancel. It keeps running in the shadow, burning tokens.
** asyncio.timeout(180).** If your upstream tool (Bright Data MCP in my case) hangs, the SSE socket stays open forever. The user sees a typing indicator that never resolves. A hard ceiling per turn turns a wedge into a clean error event.
** X-Accel-Buffering: no.** nginx by default buffers responses. SSE through nginx without this header means the user gets
The naïve approach is to dict()
the ToolCall
and send it. Don't. The input
field on a tool call carries whatever the LLM passed in — for a search tool, that's the query verbatim; for Bright Data, URLs with auth tokens in query strings; for an internal medical tool, possibly PHI. None of that should leave the process over the SSE wire.
I keep the wire shape in its own module:
from typing import TypedDict, Any
class ToolCallWire(TypedDict):
name: str | None
server: str | None
id: str | None
is_error: bool | None
def tool_call_to_wire(tc: Any) -> ToolCallWire:
return {
"name": getattr(tc, "name", None),
"server": getattr(tc, "server", None),
"id": getattr(tc, "id", None),
"is_error": getattr(tc, "is_error", None),
}
Two things to notice:
dict(tool_call)
, you have to actively bypass the type to leak input. That's how you make PHI-safety the tool_call_to_wire
uses getattr
with None
defaults because it sees ToolUseBlock
arrives BEFORE its matching ToolResultBlock
, so is_error
is still None
. Defensive getattr
here is correct. The result_to_wire
counterpart, where the object is always complete, uses EventSource is the obvious choice for SSE… except it's GET-only, no request body. My chat endpoint is POST. So I drop
EventSource
and use fetch
streaming:
const res = await fetch(`${API_URL}/chat/stream`, {
method: "POST",
headers: { "Content-Type": "application/json", Accept: "text/event-stream" },
body: JSON.stringify({ session_id, message }),
signal: abortController.signal, // ← user can cancel mid-stream
});
const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true }); // {stream:true} handles UTF-8 split between chunks
const frames = buffer.split("\n\n");
buffer = frames.pop() ?? ""; // last frame may be partial — save for next read
for (const frame of frames) {
const { event, data } = parseFrame(frame);
if (event === "tool_call") {
patchAssistant({ steps: [...prev.steps, data], status: "streaming" });
} else if (event === "text") {
patchAssistant({ content: prev.content + data.delta });
} else if (event === "result") {
// REPLACE, don't append — post-guard text may differ
patchAssistant({ content: data.text, steps: data.tool_calls, status: "done" });
}
}
}
The {stream: true}
flag on TextDecoder
is what makes this work for UTF-8 — without it, a multi-byte character split between chunks corrupts. The buffer-and-split-on-blank-line is just the SSE framing.
The replace-not-append
on the result
event is the footgun I promised. The streamed text
deltas are the LLM's raw output as it generates. The result.text
is what the post-turn guards left after running. If your anti-drift guard rewrites the response (mine does — it strips report-voice markdown headers), the streamed deltas and the final text don't match. If you append the result to the streamed content, you double-render. If you replace, you get a smooth "preview → settled" transition. The spec calls for replace.
Naïve useEffect(() => scrollIntoView(), [messages])
runs on every text delta. Result: ~30 scroll animations per second fighting each other, AND if the user scrolled up to re-read an earlier response, you yank them back to the tail mid-read. Both unusable.
The fix is the "sticky-bottom" pattern that ChatGPT and Claude.ai use:
useEffect(() => {
const distanceFromBottom = doc.scrollHeight - (window.innerHeight + window.scrollY);
const nearBottom = distanceFromBottom < 200;
const newMessage = messages.length > lastCountRef.current;
lastCountRef.current = messages.length;
if (newMessage || nearBottom) {
tailRef.current?.scrollIntoView({ behavior: "smooth", block: "end" });
}
}, [messages]);
Scroll on new message (always — turn boundary, the user wants to see the answer). Scroll on delta only if the user is already near the bottom. The 200px threshold is the sweet spot — strict enough that you respect intent to read, lax enough that a small scroll bump doesn't lose autoscroll.
When this all hangs together right, the user types acme.com
and immediately sees:
🤖 pensando…
🔍 search_engine
📄 scrape_as_markdown
📄 scrape_as_markdown
⚙️ search_documents
…stepping in over ~4 seconds, with the roast text starting to type after. That sequence used to be a black box. Now it's receipts.
Two gaps I'm hitting in the current setup:
No duration_ms per tool call — when one of those scrape steps takes 8 seconds, you can't show it. The Mermaid turn-flow can't colour slow steps.
ToolCall.duration_ms
paired by tool_use_id
.No preflight on the MCP servers — if Bright Data MCP fails to spawn at boot (bad token, missing npx
), I only find out when the model tries the first tool. Generic is_error=true
, mid-roast, in production. Also shipped in 0.14 — Runner.preflight() does a JSON-RPC handshake (initialize → tools/list) against each MCP at startup, returns {name: alive, tools, error}. Wire into your lifespan event and the first bad demo dies at boot.
If you're building anything with agents that use external tools, the message of this post is: don't hide the tools. The chain-of-thought IS the product. Showing it turns "the AI is doing magic" into "the AI is making 4 specific API calls and here they are", which is the difference between users trusting it and not.