Migrating Real-Time Claude Calls to Batch API
Moving non-urgent Claude API calls from real-time to batch saves exactly 50% on every migrated token. A typical SaaS application spending $4,200/month on Claude can migrate 60-70% of its calls to batch, saving $1,260-$1,470 per month without touching any user-facing functionality.
The Setup
You have a production application with Claude Sonnet 4.6 handling five use cases: live chat (real-time), content generation (daily), document summarization (on-demand), classification (hourly), and report generation (weekly).
Your current code makes standard API calls everywhere. Three of these five use cases can tolerate batch latency. The migration requires changes to request format, submission flow, and result handling – but zero changes to your prompts or model selection.
Expected savings: $1,400/month by migrating content generation, classification, and report generation to batch.
The Math
Monthly spend by use case, Sonnet 4.6:
| Use Case | Requests/mo | Input MTok | Output MTok | Standard Cost | Batch Cost |
|---|---|---|---|---|---|
| Live chat | 200K | 600 | 200 | $4,800 | N/A (real-time) |
| Content gen | 15K | 75 | 45 | $900 | $450 |
| Doc summary | 5K | 100 | 15 | $525 | $262.50 |
| Classification | 50K | 150 | 25 | $825 | $412.50 |
| Reports | 1K | 20 | 10 | $210 | $105 |
Total migratable savings: $1,230/month
(Docs summarization left on real-time due to on-demand user triggers)
The Technique
The migration has three phases: audit, adapt, and verify.
Phase 1: Audit your API calls.
import json
from collections import defaultdict
from datetime import datetime, timedelta
def audit_api_calls(log_file: str) -> dict:
"""Analyze API call patterns to identify batch candidates."""
endpoints = defaultdict(lambda: {
"count": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"avg_response_time_ms": 0,
"max_acceptable_latency": None,
"times": []
})
with open(log_file) as f:
for line in f:
entry = json.loads(line)
ep = entry["endpoint"]
endpoints[ep]["count"] += 1
endpoints[ep]["total_input_tokens"] += entry["input_tokens"]
endpoints[ep]["total_output_tokens"] += entry["output_tokens"]
endpoints[ep]["times"].append(entry["timestamp"])
# Classify each endpoint
for ep, data in endpoints.items():
avg_in = data["total_input_tokens"] / data["count"]
avg_out = data["total_output_tokens"] / data["count"]
# Estimate monthly cost at standard Sonnet 4.6 pricing
monthly_input = data["total_input_tokens"] * 30 * 3.00 / 1e6
monthly_output = data["total_output_tokens"] * 30 * 15.00 / 1e6
monthly_total = monthly_input + monthly_output
batch_savings = monthly_total * 0.5
data["monthly_cost_standard"] = round(monthly_total, 2)
data["monthly_batch_savings"] = round(batch_savings, 2)
data["avg_input_tokens"] = round(avg_in)
data["avg_output_tokens"] = round(avg_out)
del data["times"] # Clean up
return dict(endpoints)
result = audit_api_calls("claude_api.log")
for ep, data in sorted(result.items(),
key=lambda x: x[1]["monthly_batch_savings"],
reverse=True):
print(f"{ep}: ${data['monthly_cost_standard']}/mo "
f"(save ${data['monthly_batch_savings']})")
Phase 2: Create a batch wrapper.
import anthropic
import time
from typing import Callable
client = anthropic.Anthropic()
class BatchMigrator:
"""Drop-in replacement that batches real-time calls."""
def __init__(self, flush_interval: int = 300, max_batch: int = 1000):
self.pending = []
self.flush_interval = flush_interval # seconds
self.max_batch = max_batch
self.callbacks = {}
def submit(
self,
custom_id: str,
params: dict,
callback: Callable
) -> None:
"""Queue a request for batch processing."""
self.pending.append({
"custom_id": custom_id,
"params": params
})
self.callbacks[custom_id] = callback
if len(self.pending) >= self.max_batch:
self.flush()
def flush(self) -> None:
"""Submit accumulated requests as a batch."""
if not self.pending:
return
batch = client.batches.create(requests=self.pending)
print(f"Flushed batch {batch.id}: {len(self.pending)} requests")
submitted = list(self.pending)
self.pending = []
# Poll in background (simplified)
self._wait_and_process(batch.id)
def _wait_and_process(self, batch_id: str) -> None:
while True:
status = client.batches.retrieve(batch_id)
if status.processing_status == "ended":
break
time.sleep(30)
for result in client.batches.results(batch_id):
cid = result.custom_id
if cid in self.callbacks:
if result.result.type == "succeeded":
self.callbacks[cid](result.result.message)
else:
self.callbacks[cid](None)
del self.callbacks[cid]
# Usage: replace real-time calls with batch submissions
migrator = BatchMigrator(flush_interval=300, max_batch=500)
def generate_content(article_spec: dict) -> None:
migrator.submit(
custom_id=f"article-{article_spec['id']}",
params={
"model": "claude-sonnet-4-6-20250929",
"max_tokens": 4096,
"messages": [
{"role": "user", "content": article_spec["prompt"]}
]
},
callback=lambda msg: save_article(article_spec["id"], msg)
)
Phase 3: Verify savings.
# Compare costs before and after migration
python3 -c "
# Week 1: all real-time (baseline)
baseline_daily = 140.00 # dollars
# Week 2: content gen + classification on batch
migrated_daily = 98.00 # dollars (30% moved to batch at 50% discount)
savings = baseline_daily - migrated_daily
monthly = savings * 30
print(f'Daily savings: \${savings:.2f}')
print(f'Monthly savings: \${monthly:.2f}')
print(f'Reduction: {savings/baseline_daily*100:.1f}%')
"
The Tradeoffs
Migration introduces new failure modes:
- Delayed error discovery: A malformed prompt produces errors in minutes with real-time, but up to an hour with batch. Always test prompts in real-time mode first.
- Result ordering: Batch results may arrive in any order. Your downstream processing must handle out-of-order delivery.
- Increased system complexity: You now have two API interaction patterns (real-time and batch) with different error handling, monitoring, and retry logic.
- Rollback risk: If batch processing has issues, rolling back to real-time doubles your cost immediately. Maintain real-time fallback paths for critical workloads.
Implementation Checklist
- Run the audit script on 7 days of API logs
- Classify each endpoint: real-time required or batch-eligible
- Start with the lowest-risk, highest-savings endpoint
- Implement the batch wrapper with automatic flushing
- Deploy to staging and verify results match real-time output quality
- Roll out to production for one endpoint, monitor for 48 hours
- Migrate remaining batch-eligible endpoints one at a time
- Compare monthly Anthropic invoice against pre-migration baseline
Measuring Impact
Track migration health with these metrics:
- Batch migration ratio: Percentage of total requests flowing through batch. Target: 50-70%.
- Cost per request by mode: Real-time vs batch, same model. Should be exactly 2x different.
- Batch failure rate: Failed requests / total batch requests. Alert above 0.5%.
- End-to-end latency: Time from request queuing to result delivery. Should stay under 90 minutes for 95th percentile.