Claude Streaming API Guide
Streaming lets you receive Claude’s response incrementally via Server-Sent Events (SSE) instead of waiting for the full response. This is essential for real-time UX and for avoiding timeouts on long-running requests.
Quick Fix
Stream a response in Python:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
What You Need
- Anthropic Python or TypeScript SDK
- Set
stream=Trueor use the SDK streaming helpers
Full Solution
Python: Stream Helper (Recommended)
The stream() context manager is the simplest way to stream:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": "Write a short story about a robot"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Get the complete message after streaming
message = stream.get_final_message()
print(f"\n\nTokens: {message.usage.input_tokens} in, {message.usage.output_tokens} out")
print(f"Stop reason: {message.stop_reason}")
TypeScript: Stream Helper (Recommended)
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const stream = client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 4096,
messages: [{ role: "user", content: "Write a short story about a robot" }]
}).on("text", (text) => {
process.stdout.write(text);
});
const message = await stream.finalMessage();
console.log(`\n\nTokens: ${message.usage.input_tokens} in, ${message.usage.output_tokens} out`);
Python: Raw SSE Events
For full control over individual events:
import anthropic
client = anthropic.Anthropic()
stream = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
stream=True,
messages=[{"role": "user", "content": "Hello"}]
)
for event in stream:
if event.type == "message_start":
print(f"Model: {event.message.model}")
elif event.type == "content_block_start":
print(f"Content block type: {event.content_block.type}")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_delta":
print(f"\nStop reason: {event.delta.stop_reason}")
print(f"Output tokens: {event.usage.output_tokens}")
elif event.type == "message_stop":
print("[Stream complete]")
TypeScript: Raw SSE Events
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const stream = await client.messages.create({
model: "claude-sonnet-4-6",
max_tokens: 1024,
stream: true,
messages: [{ role: "user", content: "Hello" }]
});
for await (const event of stream) {
if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
process.stdout.write(event.delta.text);
}
}
SSE Event Types Reference
| Event | Description |
|---|---|
message_start |
Stream opened, contains message metadata |
content_block_start |
New content block beginning |
content_block_delta |
Incremental content (text_delta, input_json_delta) |
content_block_stop |
Content block finished |
message_delta |
Message-level changes (stop_reason, usage) |
message_stop |
Stream complete |
ping |
Keep-alive signal |
error |
Error during streaming |
Async Streaming (Python)
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def stream_response():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": "Write a poem"}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
message = stream.get_final_message()
return message
import asyncio
result = asyncio.run(stream_response())
Streaming with Extended Thinking
Extended thinking content streams alongside text content:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": "Solve: What is 127 * 389?"}]
) as stream:
for event in stream:
pass # Process events
message = stream.get_final_message()
for block in message.content:
if block.type == "thinking":
print(f"Thinking: {block.thinking[:100]}...")
elif block.type == "text":
print(f"Answer: {block.text}")
Streaming with Tool Use
When streaming tool use, you receive input_json_delta events as Claude builds the tool input:
import anthropic
client = anthropic.Anthropic()
tools = [
{
"name": "get_weather",
"description": "Get weather for a location",
"input_schema": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}
]
stream = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
stream=True,
tools=tools,
messages=[{"role": "user", "content": "Weather in NYC?"}]
)
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="")
elif event.delta.type == "input_json_delta":
print(f"[Tool input: {event.delta.partial_json}]", end="")
Avoid Timeout with get_final_message
For long requests where you do not need real-time output, use get_final_message() to avoid the SDK’s 10-minute non-streaming timeout:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-opus-4-6",
max_tokens=128000,
messages=[{"role": "user", "content": "Write a comprehensive analysis"}]
) as stream:
message = stream.get_final_message()
# message is a complete Message object
print(message.content[0].text)
CLI Streaming
ant messages create --stream --format jsonl \
--model claude-sonnet-4-6 \
--max-tokens 1024 \
--message '{role: user, content: "Hello"}'
Prevention
- Use streaming for any request with max_tokens > 4096: This prevents timeout issues and gives users immediate feedback.
- Always use the SDK helpers:
stream()/text_streamin Python and.stream()/.on("text")in TypeScript handle all event parsing for you. - Handle stream errors: Wrap streaming code in try/except because errors can arrive mid-stream after a 200 response.
- Use get_final_message() for batch-style workloads: When you need the complete response without implementing event handling.
Related Guides
- Claude Streaming Not Working – troubleshoot streaming failures and mid-stream errors.
- Claude SDK Timeout Configuration – configure timeouts for streaming connections.
- Claude Extended Thinking API Guide – stream extended thinking output in real time.
- Claude Tool Use Not Working – handle streamed tool use events.
- Claude Python SDK Getting Started – basic SDK setup before implementing streaming.