cd /news/developer-tools/streaming-a-langgraph-agent-as-opena… · home topics developer-tools article
[ARTICLE · art-37022] src=dev.to ↗ pub= topic=developer-tools verified=true sentiment=↑ positive

Streaming a LangGraph Agent as OpenAI-Compatible SSE (with a Thinking Panel)

A developer built an adapter that converts LangGraph agent event streams into OpenAI-compatible Server-Sent Events, enabling tools like Open WebUI to display a 'thinking' panel showing the agent's tool calls in real time. The 90-line solution handles the strict OpenAI chunk format, including role, content, finish reason, and the required [DONE] sentinel, while also wrapping tool activity in <think> tags for a collapsible reasoning display.

read5 min views6 publishedJun 24, 2026

In Part 1 I built a LangGraph ReAct agent behind an OpenAI-compatible API and waved at one line:

return StreamingResponse(graph_to_openai_sse(graph, inputs, model_name, config=config),
                         media_type="text/event-stream")

That graph_to_openai_sse

is where the real work hides. An OpenAI client like Open WebUI doesn't want "a LangGraph run" — it wants a very specific stream of chat.completion.chunk

JSON objects over Server-Sent Events, terminated by a [DONE]

sentinel. LangGraph, meanwhile, emits its own rich event stream. This post is the adapter between the two — about 90 lines that also give you a free "thinking" panel showing the agent's tool calls as they happen.

What the client expects — each token arrives as an SSE line: data: {json}\n\n

, where the JSON is an OpenAI chunk:

def make_chunk(delta, model_name, completion_id, finish_reason=None):
    return {
        "id": completion_id,                       # "chatcmpl-..."
        "object": "chat.completion.chunk",
        "created": int(time.time()),
        "model": model_name,
        "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
    }

The stream has a strict shape:

delta = {"role": "assistant"}

,delta = {"content": "..."}

— one per token,finish_reason = "stop"

,data: [DONE]\n\n

.Miss the [DONE]

and the client spins forever. Skip the role chunk and some clients drop the first token. The contract is small but unforgiving.

What LangGraph emitsastream_events

is a single async stream of typed events for everything happening inside the graph: model tokens, tool calls, node transitions. We subscribe once and translate each event we care about into chunks.

async def graph_to_openai_sse(graph, inputs, model_name, config=None):
    completion_id = new_completion_id()
    yield _sse(make_chunk({"role": "assistant"}, model_name, completion_id))  # (1) role

    def emit(text):
        return _sse(make_chunk({"content": text}, model_name, completion_id))

    async for event in graph.astream_events(inputs, config=config, version="v2"):
        kind = event.get("event")

        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if isinstance(chunk, AIMessageChunk) and isinstance(chunk.content, str):
                yield emit(chunk.content)                                     # (2) tokens

    yield _sse(make_chunk({}, model_name, completion_id, finish_reason="stop"))  # (3) stop
    yield b"data: [DONE]\n\n"                                                     # (4) done

Three things to notice:

version="v2"

metadata.langgraph_node

and data.chunk

keys don't silently move under you.on_chat_model_stream

data.chunk

is an AIMessageChunk

— but only when the LLM is actually streaming. Guarding with isinstance(...)

avoids crashing on the non-streaming events that also flow through.completion_id

for the whole response._sse

is just the wire framing — and note ensure_ascii=False

, which matters the moment your tokens are Korean, Japanese, or emoji:

def _sse(payload):
    return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n".encode("utf-8")

Streaming the final answer is table stakes. The interesting part of a ReAct agent is what it did before answering — which document it searched, what came back. Open WebUI renders any text wrapped in <think>...</think>

as a collapsible reasoning panel. So we narrate tool activity into that panel.

First, label the nodes worth announcing:

NODE_LABELS = {
    "tools": "🔍 Searching the docs…",
}

Then open a <think>

block, and on the relevant events, emit human-readable progress instead of raw tokens:

    show_thinking = bool(NODE_LABELS)
    think_open = False
    prev_node = None

    if show_thinking:
        yield emit("<think>\n")
        think_open = True

    async for event in graph.astream_events(inputs, config=config, version="v2"):
        kind = event.get("event")
        node = (event.get("metadata") or {}).get("langgraph_node", "")

        if node and node != prev_node and node in NODE_LABELS:
            yield emit(f"\n{NODE_LABELS[node]}\n")
            prev_node = node

        if kind == "on_tool_start":
            yield emit(f"  • `{event.get('name', 'tool')}` running…")
            continue

        if kind == "on_tool_end":
            output = event.get("data", {}).get("output")
            text = output.content if hasattr(output, "content") else str(output)
            snippet = " ".join(str(text).split())[:90]          # collapse whitespace, clip
            yield emit(f" ✓ `{snippet}…`\n" if snippet else " ✓\n")
            continue

The on_tool_end

output is a ToolMessage

, so its text lives on .content

— hence the hasattr(output, "content")

check before falling back to str()

. Collapsing whitespace and clipping to ~90 chars keeps the panel readable instead of dumping a wall of retrieved text.

Closing the panel has to happen no matter how the stream ends — success, exception, or early return — so it goes in a finally

:

    finally:
        if think_open:
            yield _sse(make_chunk({"content": "\n</think>\n"}, model_name, completion_id))

The result in the UI: a collapsible "🔍 Searching the docs… ✓" panel, then the streamed answer below it. The user sees the agent reach for RAG in real time.

1. Errors belong in the stream, not in a 500. Once you've started streaming, the HTTP status is already 200

and headers are flushed — you can't switch to an error response. So catch inside the generator and emit the error as content:

    except Exception as exc:
        log.exception("stream failed")
        yield _sse(make_chunk({"content": f"\n[error] {exc}"}, model_name, completion_id))

The user sees [error] ...

in the chat instead of a frozen, half-rendered message.

2. Not every model streams. Some gateways/models return a single batched response with no on_chat_model_stream

events at all. If you only ever forwarded tokens, those models would yield an empty answer. Track whether any token was seen, and if not, fall back to a plain ainvoke

:

    if not saw_token:
        result = await graph.ainvoke(inputs, config=config)
        final = extract_final_text(result.get("messages", []))
        yield emit(final)

extract_final_text

walks the message log backwards for the last non-empty AIMessage

— handling both plain-string content and the list-of-blocks shape some providers return. This one guard is the difference between "streaming works on my dev model" and "works on every model behind the gateway."

graph.astream_events(version="v2")
        │
        ├─ on_chat_model_stream → emit({"content": token})
        ├─ node entry           → emit("🔍 status line")   ┐
        ├─ on_tool_start        → emit("• tool running…")  ├─ inside <think>…</think>
        ├─ on_tool_end          → emit("✓ snippet…")       ┘
        └─ (exception)          → emit("[error] …")
        ▼
 first chunk {role}  →  …content chunks…  →  {finish_reason: stop}  →  data: [DONE]

The payoff from Part 1 compounds here: because the boundary is just OpenAI SSE, this thinking-panel UX shows up in any OpenAI-compatible client with zero client code. You wrote a translator, and every frontend in that ecosystem speaks it for free.

Next up: persisting conversation threads with a checkpointer so the agent remembers across requests — and what that does to the streaming loop.

Built with LangGraph, LangChain, and FastAPI. Part 2 of a series on running LangGraph in production — Part 1 here.

── more in #developer-tools 4 stories · sorted by recency
── more on @langgraph 3 stories trending now
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain — perfect for shipping the agent you just read about.

$git push zahid main
Live at https://your-agent.zahid.host
Get free account → Pricing
from €0/mo · no card required
LIVE [news/streaming-a-langgrap…] indexed:0 read:5min 2026-06-24 ·