# Streaming Responses with Claude API in Python (2026)

> Source: <https://dev.to/kalyna_pro/streaming-responses-with-claude-api-in-python-2026-44la>
> Published: 2026-06-12 13:09:50+00:00

Originally published at

[kalyna.pro]

Streaming sends Claude's response token by token as it's generated, instead of waiting for the full completion before showing anything. For a chat UI this is the difference between a user staring at a spinner for several seconds and seeing the first words appear within a few hundred milliseconds. The [Claude API Tutorial](https://kalyna.pro/claude-api-tutorial/) introduces the basic `stream.text_stream`

helper — this guide covers the full picture: the raw event stream, async streaming, error handling, and a complete FastAPI endpoint that streams Claude's output to a browser.

```
pip install anthropic
# for the API endpoint example later:
pip install fastapi uvicorn
python
from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    final_message = stream.get_final_message()

print(f"\n\nstop_reason: {final_message.stop_reason}")
print(f"output tokens: {final_message.usage.output_tokens}")
```

`stream.get_final_message()`

returns the same `Message`

object you'd get from a non-streaming call — complete `content`

, `stop_reason`

, and `usage`

— without manually reassembling it from chunks.

```
with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
    for event in stream:
        print(event.type)
```

Event types, in order:

`message_start`

— initial `Message`

shell with `usage.input_tokens`

`content_block_start`

— a new content block begins (`text`

, `tool_use`

, etc.)`content_block_delta`

— incremental content: `text_delta`

(`.text`

), `input_json_delta`

(`.partial_json`

, for tool inputs), or `thinking_delta`

`content_block_stop`

— the block is complete`message_delta`

— `stop_reason`

and updated `usage.output_tokens`

`message_stop`

— stream finished

```
with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
    for event in stream:
        if event.type == "content_block_delta" and event.delta.type == "text_delta":
            print(event.delta.text, end="", flush=True)
        elif event.type == "message_delta":
            print(f"\n[tokens so far: {event.usage.output_tokens}]", end="")
python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def main():
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Write a haiku about debugging."}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

asyncio.run(main())
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic

app = FastAPI()
client = AsyncAnthropic()

@app.get("/chat")
async def chat(message: str):
    async def event_stream():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": message}],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {text}\n\n"

        yield "event: done\ndata: {}\n\n"

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )
```

`X-Accel-Buffering: no`

stops nginx from buffering the whole response — without it, "streaming" arrives in one burst at the end. On the frontend, read with `fetch`

+ a `ReadableStream`

reader, or `EventSource`

for GET endpoints.

``` python
import anthropic

try:
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Write a haiku about debugging."}],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
except anthropic.APIConnectionError:
    print("\n[connection lost — showing partial response]")
except anthropic.RateLimitError:
    print("\n[rate limited — retry shortly]")
except anthropic.APIStatusError as e:
    print(f"\n[API error {e.status_code}]")
```

If the client disconnects mid-response, exit the generator early so the SDK closes the stream — this stops billing for output tokens generated into the void. For long generations, check `await request.is_disconnected()`

periodically and break if true.

Text still arrives via `text_delta`

, tool arguments arrive incrementally via `input_json_delta`

, and `stream.get_final_message()`

gives fully-parsed `tool_use`

blocks once the stream ends. See [Claude API Function Calling](https://kalyna.pro/claude-api-function-calling/) for the complete tool-use loop — it works unchanged whether calls are streamed or not.

`get_final_message()`

for `stop_reason`

/`usage`

instead of accumulating `message_delta`

manually`AsyncAnthropic`

in web backends — a sync stream blocks the event loop`Cache-Control: no-cache`

and `X-Accel-Buffering: no`

for SSE behind a proxy`APIConnectionError`

, `RateLimitError`

, and `APIStatusError`

explicitly`stream.text_stream`

yields plain text chunks for display`message_start`

, `content_block_start`

, `content_block_delta`

, `content_block_stop`

, `message_delta`

, `message_stop`

`get_final_message()`

returns the complete `Message`

after streaming`AsyncAnthropic`

+ `async with`

/`async for`

for non-blocking backends`StreamingResponse`

+ async generator → SSE to the browser`input_json_delta`

carries tool argumentsFurther reading:
