In Part 1 I built a LangGraph ReAct agent behind an OpenAI-compatible API and waved at one line:
return StreamingResponse(graph_to_openai_sse(graph, inputs, model_name, config=config),
media_type="text/event-stream")
That graph_to_openai_sse
is where the real work hides. An OpenAI client like Open WebUI doesn't want "a LangGraph run" — it wants a very specific stream of chat.completion.chunk
JSON objects over Server-Sent Events, terminated by a [DONE]
sentinel. LangGraph, meanwhile, emits its own rich event stream. This post is the adapter between the two — about 90 lines that also give you a free "thinking" panel showing the agent's tool calls as they happen.
What the client expects — each token arrives as an SSE line: data: {json}\n\n
, where the JSON is an OpenAI chunk:
def make_chunk(delta, model_name, completion_id, finish_reason=None):
return {
"id": completion_id, # "chatcmpl-..."
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model_name,
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
The stream has a strict shape:
delta = {"role": "assistant"}
,delta = {"content": "..."}
— one per token,finish_reason = "stop"
,data: [DONE]\n\n
.Miss the [DONE]
and the client spins forever. Skip the role chunk and some clients drop the first token. The contract is small but unforgiving.
What LangGraph emits — astream_events
is a single async stream of typed events for everything happening inside the graph: model tokens, tool calls, node transitions. We subscribe once and translate each event we care about into chunks.
async def graph_to_openai_sse(graph, inputs, model_name, config=None):
completion_id = new_completion_id()
yield _sse(make_chunk({"role": "assistant"}, model_name, completion_id)) # (1) role
def emit(text):
return _sse(make_chunk({"content": text}, model_name, completion_id))
async for event in graph.astream_events(inputs, config=config, version="v2"):
kind = event.get("event")
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if isinstance(chunk, AIMessageChunk) and isinstance(chunk.content, str):
yield emit(chunk.content) # (2) tokens
yield _sse(make_chunk({}, model_name, completion_id, finish_reason="stop")) # (3) stop
yield b"data: [DONE]\n\n" # (4) done
Three things to notice:
version="v2"
metadata.langgraph_node
and data.chunk
keys don't silently move under you.on_chat_model_stream
data.chunk
is an AIMessageChunk
— but only when the LLM is actually streaming. Guarding with isinstance(...)
avoids crashing on the non-streaming events that also flow through.completion_id
for the whole response._sse
is just the wire framing — and note ensure_ascii=False
, which matters the moment your tokens are Korean, Japanese, or emoji:
def _sse(payload):
return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n".encode("utf-8")
Streaming the final answer is table stakes. The interesting part of a ReAct agent is what it did before answering — which document it searched, what came back. Open WebUI renders any text wrapped in <think>...</think>
as a collapsible reasoning panel. So we narrate tool activity into that panel.
First, label the nodes worth announcing:
NODE_LABELS = {
"tools": "🔍 Searching the docs…",
}
Then open a <think>
block, and on the relevant events, emit human-readable progress instead of raw tokens:
show_thinking = bool(NODE_LABELS)
think_open = False
prev_node = None
if show_thinking:
yield emit("<think>\n")
think_open = True
async for event in graph.astream_events(inputs, config=config, version="v2"):
kind = event.get("event")
node = (event.get("metadata") or {}).get("langgraph_node", "")
if node and node != prev_node and node in NODE_LABELS:
yield emit(f"\n{NODE_LABELS[node]}\n")
prev_node = node
if kind == "on_tool_start":
yield emit(f" • `{event.get('name', 'tool')}` running…")
continue
if kind == "on_tool_end":
output = event.get("data", {}).get("output")
text = output.content if hasattr(output, "content") else str(output)
snippet = " ".join(str(text).split())[:90] # collapse whitespace, clip
yield emit(f" ✓ `{snippet}…`\n" if snippet else " ✓\n")
continue
The on_tool_end
output is a ToolMessage
, so its text lives on .content
— hence the hasattr(output, "content")
check before falling back to str()
. Collapsing whitespace and clipping to ~90 chars keeps the panel readable instead of dumping a wall of retrieved text.
Closing the panel has to happen no matter how the stream ends — success, exception, or early return — so it goes in a finally
:
finally:
if think_open:
yield _sse(make_chunk({"content": "\n</think>\n"}, model_name, completion_id))
The result in the UI: a collapsible "🔍 Searching the docs… ✓" panel, then the streamed answer below it. The user sees the agent reach for RAG in real time.
1. Errors belong in the stream, not in a 500. Once you've started streaming, the HTTP status is already 200
and headers are flushed — you can't switch to an error response. So catch inside the generator and emit the error as content:
except Exception as exc:
log.exception("stream failed")
yield _sse(make_chunk({"content": f"\n[error] {exc}"}, model_name, completion_id))
The user sees [error] ...
in the chat instead of a frozen, half-rendered message.
2. Not every model streams. Some gateways/models return a single batched response with no on_chat_model_stream
events at all. If you only ever forwarded tokens, those models would yield an empty answer. Track whether any token was seen, and if not, fall back to a plain ainvoke
:
if not saw_token:
result = await graph.ainvoke(inputs, config=config)
final = extract_final_text(result.get("messages", []))
yield emit(final)
extract_final_text
walks the message log backwards for the last non-empty AIMessage
— handling both plain-string content and the list-of-blocks shape some providers return. This one guard is the difference between "streaming works on my dev model" and "works on every model behind the gateway."
graph.astream_events(version="v2")
│
├─ on_chat_model_stream → emit({"content": token})
├─ node entry → emit("🔍 status line") ┐
├─ on_tool_start → emit("• tool running…") ├─ inside <think>…</think>
├─ on_tool_end → emit("✓ snippet…") ┘
└─ (exception) → emit("[error] …")
▼
first chunk {role} → …content chunks… → {finish_reason: stop} → data: [DONE]
The payoff from Part 1 compounds here: because the boundary is just OpenAI SSE, this thinking-panel UX shows up in any OpenAI-compatible client with zero client code. You wrote a translator, and every frontend in that ecosystem speaks it for free.
Next up: persisting conversation threads with a checkpointer so the agent remembers across requests — and what that does to the streaming loop.
Built with LangGraph, LangChain, and FastAPI. Part 2 of a series on running LangGraph in production — Part 1 here.