cd /news/large-language-models/streaming-responses-with-claude-api-… · home topics large-language-models article
[ARTICLE · art-25222] src=dev.to pub= topic=large-language-models verified=true sentiment=· neutral

Streaming Responses with Claude API in Python (2026)

A developer has published a guide demonstrating how to stream responses from Anthropic's Claude API token by token in Python, reducing latency from several seconds to a few hundred milliseconds for chat interfaces. The tutorial covers raw event streams, async streaming, error handling, and a complete FastAPI endpoint that streams Claude's output to a browser using server-sent events. The implementation uses the Anthropic Python SDK's `stream` method, which provides access to individual event types including `content_block_delta` for incremental text and `message_delta` for stop reasons and token usage.

read3 min publishedJun 12, 2026

Originally published at

[kalyna.pro]

Streaming sends Claude's response token by token as it's generated, instead of waiting for the full completion before showing anything. For a chat UI this is the difference between a user staring at a spinner for several seconds and seeing the first words appear within a few hundred milliseconds. The Claude API Tutorial introduces the basic stream.text_stream

helper — this guide covers the full picture: the raw event stream, async streaming, error handling, and a complete FastAPI endpoint that streams Claude's output to a browser.

pip install anthropic
pip install fastapi uvicorn
python
from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    final_message = stream.get_final_message()

print(f"\n\nstop_reason: {final_message.stop_reason}")
print(f"output tokens: {final_message.usage.output_tokens}")

stream.get_final_message()

returns the same Message

object you'd get from a non-streaming call — complete content

, stop_reason

, and usage

— without manually reassembling it from chunks.

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
    for event in stream:
        print(event.type)

Event types, in order:

message_start

— initial Message

shell with usage.input_tokens

content_block_start

— a new content block begins (text

, tool_use

, etc.)content_block_delta

— incremental content: text_delta

(.text

), input_json_delta

(.partial_json

, for tool inputs), or thinking_delta

content_block_stop

— the block is completemessage_delta

stop_reason

and updated usage.output_tokens

message_stop

— stream finished

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
    for event in stream:
        if event.type == "content_block_delta" and event.delta.type == "text_delta":
            print(event.delta.text, end="", flush=True)
        elif event.type == "message_delta":
            print(f"\n[tokens so far: {event.usage.output_tokens}]", end="")
python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def main():
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Write a haiku about debugging."}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

asyncio.run(main())
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic

app = FastAPI()
client = AsyncAnthropic()

@app.get("/chat")
async def chat(message: str):
    async def event_stream():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": message}],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {text}\n\n"

        yield "event: done\ndata: {}\n\n"

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

X-Accel-Buffering: no

stops nginx from buffering the whole response — without it, "streaming" arrives in one burst at the end. On the frontend, read with fetch

  • a ReadableStream

reader, or EventSource

for GET endpoints.

import anthropic

try:
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Write a haiku about debugging."}],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
except anthropic.APIConnectionError:
    print("\n[connection lost — showing partial response]")
except anthropic.RateLimitError:
    print("\n[rate limited — retry shortly]")
except anthropic.APIStatusError as e:
    print(f"\n[API error {e.status_code}]")

If the client disconnects mid-response, exit the generator early so the SDK closes the stream — this stops billing for output tokens generated into the void. For long generations, check await request.is_disconnected()

periodically and break if true.

Text still arrives via text_delta

, tool arguments arrive incrementally via input_json_delta

, and stream.get_final_message()

gives fully-parsed tool_use

blocks once the stream ends. See Claude API Function Calling for the complete tool-use loop — it works unchanged whether calls are streamed or not.

get_final_message()

for stop_reason

/usage

instead of accumulating message_delta

manuallyAsyncAnthropic

in web backends — a sync stream blocks the event loopCache-Control: no-cache

and X-Accel-Buffering: no

for SSE behind a proxyAPIConnectionError

, RateLimitError

, and APIStatusError

explicitlystream.text_stream

yields plain text chunks for displaymessage_start

, content_block_start

, content_block_delta

, content_block_stop

, message_delta

, message_stop

get_final_message()

returns the complete Message

after streamingAsyncAnthropic

  • async with

/async for

for non-blocking backendsStreamingResponse

  • async generator → SSE to the browserinput_json_delta

carries tool argumentsFurther reading:

── more in #large-language-models 4 stories · sorted by recency
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain — perfect for shipping the agent you just read about.

$git push zahid main
Live at https://your-agent.zahid.host
Get free account → Pricing
from €0/mo · no card required
LIVE [news/streaming-responses-…] indexed:0 read:3min 2026-06-12 ·