# Streaming a LangGraph Agent as OpenAI-Compatible SSE (with a Thinking Panel)

> Source: <https://dev.to/javaking1129/streaming-a-langgraph-agent-as-openai-compatible-sse-with-a-thinking-panel-2928>
> Published: 2026-06-24 01:00:27+00:00

In [Part 1](https://dev.to/javaking1129/running-a-langgraph-react-agent-in-production-openai-compatible-api-multi-model-gateway--emi) I built a LangGraph ReAct agent behind an OpenAI-compatible API and waved at one line:

```
return StreamingResponse(graph_to_openai_sse(graph, inputs, model_name, config=config),
                         media_type="text/event-stream")
```

That `graph_to_openai_sse`

is where the real work hides. An OpenAI client like Open WebUI doesn't want "a LangGraph run" — it wants a very specific stream of `chat.completion.chunk`

JSON objects over Server-Sent Events, terminated by a `[DONE]`

sentinel. LangGraph, meanwhile, emits its *own* rich event stream. This post is the adapter between the two — about 90 lines that also give you a free "thinking" panel showing the agent's tool calls as they happen.

**What the client expects** — each token arrives as an SSE line: `data: {json}\n\n`

, where the JSON is an OpenAI chunk:

``` python
# app/api/openai_compat.py
def make_chunk(delta, model_name, completion_id, finish_reason=None):
    return {
        "id": completion_id,                       # "chatcmpl-..."
        "object": "chat.completion.chunk",
        "created": int(time.time()),
        "model": model_name,
        "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
    }
```

The stream has a strict shape:

`delta = {"role": "assistant"}`

,`delta = {"content": "..."}`

— one per token,`finish_reason = "stop"`

,`data: [DONE]\n\n`

.Miss the `[DONE]`

and the client spins forever. Skip the role chunk and some clients drop the first token. The contract is small but unforgiving.

**What LangGraph emits** — `astream_events`

is a single async stream of *typed* events for everything happening inside the graph: model tokens, tool calls, node transitions. We subscribe once and translate each event we care about into chunks.

``` python
# app/api/streaming.py
async def graph_to_openai_sse(graph, inputs, model_name, config=None):
    completion_id = new_completion_id()
    yield _sse(make_chunk({"role": "assistant"}, model_name, completion_id))  # (1) role

    def emit(text):
        return _sse(make_chunk({"content": text}, model_name, completion_id))

    async for event in graph.astream_events(inputs, config=config, version="v2"):
        kind = event.get("event")

        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if isinstance(chunk, AIMessageChunk) and isinstance(chunk.content, str):
                yield emit(chunk.content)                                     # (2) tokens

    yield _sse(make_chunk({}, model_name, completion_id, finish_reason="stop"))  # (3) stop
    yield b"data: [DONE]\n\n"                                                     # (4) done
```

Three things to notice:

`version="v2"`

`metadata.langgraph_node`

and `data.chunk`

keys don't silently move under you.`on_chat_model_stream`

`data.chunk`

is an `AIMessageChunk`

— but only when the LLM is actually streaming. Guarding with `isinstance(...)`

avoids crashing on the non-streaming events that also flow through.`completion_id`

for the whole response.`_sse`

is just the wire framing — and note `ensure_ascii=False`

, which matters the moment your tokens are Korean, Japanese, or emoji:

``` python
def _sse(payload):
    return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n".encode("utf-8")
```

Streaming the final answer is table stakes. The interesting part of a ReAct agent is *what it did before answering* — which document it searched, what came back. Open WebUI renders any text wrapped in `<think>...</think>`

as a collapsible reasoning panel. So we narrate tool activity into that panel.

First, label the nodes worth announcing:

```
NODE_LABELS = {
    "tools": "🔍 Searching the docs…",
}
```

Then open a `<think>`

block, and on the relevant events, emit human-readable progress instead of raw tokens:

```
    show_thinking = bool(NODE_LABELS)
    think_open = False
    prev_node = None

    if show_thinking:
        yield emit("<think>\n")
        think_open = True

    async for event in graph.astream_events(inputs, config=config, version="v2"):
        kind = event.get("event")
        node = (event.get("metadata") or {}).get("langgraph_node", "")

        # node entry → status line
        if node and node != prev_node and node in NODE_LABELS:
            yield emit(f"\n{NODE_LABELS[node]}\n")
            prev_node = node

        if kind == "on_tool_start":
            yield emit(f"  • `{event.get('name', 'tool')}` running…")
            continue

        if kind == "on_tool_end":
            output = event.get("data", {}).get("output")
            text = output.content if hasattr(output, "content") else str(output)
            snippet = " ".join(str(text).split())[:90]          # collapse whitespace, clip
            yield emit(f" ✓ `{snippet}…`\n" if snippet else " ✓\n")
            continue
        # ... on_chat_model_stream handled as before
```

The `on_tool_end`

output is a `ToolMessage`

, so its text lives on `.content`

— hence the `hasattr(output, "content")`

check before falling back to `str()`

. Collapsing whitespace and clipping to ~90 chars keeps the panel readable instead of dumping a wall of retrieved text.

Closing the panel has to happen no matter how the stream ends — success, exception, or early return — so it goes in a `finally`

:

```
    finally:
        if think_open:
            yield _sse(make_chunk({"content": "\n</think>\n"}, model_name, completion_id))
```

The result in the UI: a collapsible **"🔍 Searching the docs… ✓"** panel, then the streamed answer below it. The user sees the agent reach for RAG in real time.

**1. Errors belong in the stream, not in a 500.** Once you've started streaming, the HTTP status is already `200`

and headers are flushed — you can't switch to an error response. So catch inside the generator and emit the error as content:

```
    except Exception as exc:
        log.exception("stream failed")
        yield _sse(make_chunk({"content": f"\n[error] {exc}"}, model_name, completion_id))
```

The user sees `[error] ...`

in the chat instead of a frozen, half-rendered message.

**2. Not every model streams.** Some gateways/models return a single batched response with no `on_chat_model_stream`

events at all. If you only ever forwarded tokens, those models would yield an *empty* answer. Track whether any token was seen, and if not, fall back to a plain `ainvoke`

:

```
    if not saw_token:
        result = await graph.ainvoke(inputs, config=config)
        final = extract_final_text(result.get("messages", []))
        yield emit(final)
```

`extract_final_text`

walks the message log backwards for the last non-empty `AIMessage`

— handling both plain-string content and the list-of-blocks shape some providers return. This one guard is the difference between "streaming works on my dev model" and "works on every model behind the gateway."

```
graph.astream_events(version="v2")
        │
        ├─ on_chat_model_stream → emit({"content": token})
        ├─ node entry           → emit("🔍 status line")   ┐
        ├─ on_tool_start        → emit("• tool running…")  ├─ inside <think>…</think>
        ├─ on_tool_end          → emit("✓ snippet…")       ┘
        └─ (exception)          → emit("[error] …")
        ▼
 first chunk {role}  →  …content chunks…  →  {finish_reason: stop}  →  data: [DONE]
```

The payoff from Part 1 compounds here: because the boundary is *just* OpenAI SSE, this thinking-panel UX shows up in **any** OpenAI-compatible client with zero client code. You wrote a translator, and every frontend in that ecosystem speaks it for free.

Next up: persisting conversation threads with a checkpointer so the agent remembers across requests — and what that does to the streaming loop.

*Built with LangGraph, LangChain, and FastAPI. Part 2 of a series on running LangGraph in production — Part 1 here.*
