Real-Time Claude Token Monitoring Pipeline
Batch-processed cost reports tell you what happened yesterday. A real-time monitoring pipeline tells you what’s happening now – and that 60-second difference can save $237.50 per cache miss incident. When a system prompt cache expires and 1,000 Opus 4.7 requests suddenly process 50,000 uncached tokens each, you need detection in seconds, not hours. This guide builds a streaming pipeline that processes every API response’s usage data in real time and triggers alerts within one minute of an anomaly.
The Setup
Real-time monitoring requires three components: a collection layer (captures usage from every API response), a processing layer (calculates rolling metrics and detects anomalies), and an alerting layer (sends notifications when thresholds are breached). The Claude API doesn’t offer webhooks or streaming analytics – you build this from the response data your application already receives. The key architectural decision is whether to process in-process (low latency, coupled to application) or out-of-process (via a message queue, decoupled but adds infrastructure).
The Math
Time-to-detection directly correlates with cost exposure:
Without real-time monitoring (daily batch reports):
- Cache miss starts at 2:00 AM
- Batch report runs at 9:00 AM
- Engineer reviews at 10:00 AM
- Fix deployed at 11:00 AM
- Exposure window: 9 hours
- At 100 requests/hour with 50K uncached tokens at $5.00/MTok:
- Cost: 9 x 100 x 50K x $5/MTok = $225.00 wasted
With real-time monitoring (60-second detection):
- Cache miss starts at 2:00 AM
- Alert fires at 2:01 AM
- Auto-remediation triggers cache refresh at 2:02 AM
- Exposure window: 2 minutes
- Cost: (2/60) x 100 x 50K x $5/MTok = $0.83 wasted
Savings per incident: $224.17 (99.6% reduction)
The Technique
Build a lightweight in-process monitoring pipeline using rolling windows.
import anthropic
import time
import threading
from collections import deque
from dataclasses import dataclass, field
PRICING = {
"claude-opus-4-7": {"input": 5.00, "output": 25.00, "cache_read": 0.50},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00, "cache_read": 0.30},
"claude-haiku-4-5": {"input": 1.00, "output": 5.00, "cache_read": 0.10},
}
@dataclass
class TokenEvent:
timestamp: float
model: str
input_tokens: int
output_tokens: int
cache_read_tokens: int
cache_write_tokens: int
cost: float
@dataclass
class StreamingMonitor:
"""Real-time token monitoring with rolling windows."""
window_seconds: int = 60
events: deque = field(default_factory=lambda: deque(maxlen=100000))
alert_callbacks: list = field(default_factory=list)
# Thresholds
cost_rate_limit: float = 2.00 # max $/minute
cache_miss_threshold: float = 0.50 # alert if >50% cache misses
token_spike_multiplier: float = 3.0 # alert if 3x normal rate
# Baseline (set from first hour of data)
baseline_cost_per_minute: float = 0.0
baseline_tokens_per_minute: int = 0
def on_alert(self, callback):
"""Register an alert callback."""
self.alert_callbacks.append(callback)
def ingest(self, model: str, usage):
"""Process a single API response's usage data."""
prices = PRICING.get(model, PRICING["claude-sonnet-4-6"])
cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0
cache_write = getattr(usage, "cache_creation_input_tokens", 0) or 0
cost = (
usage.input_tokens * prices["input"] / 1_000_000
+ usage.output_tokens * prices["output"] / 1_000_000
+ cache_read * prices["cache_read"] / 1_000_000
)
event = TokenEvent(
timestamp=time.time(),
model=model,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_read_tokens=cache_read,
cache_write_tokens=cache_write,
cost=cost,
)
self.events.append(event)
self._check_thresholds()
def _get_window(self) -> list[TokenEvent]:
"""Get events within the current window."""
cutoff = time.time() - self.window_seconds
return [e for e in self.events if e.timestamp > cutoff]
def _check_thresholds(self):
"""Check all thresholds against the current window."""
window = self._get_window()
if len(window) < 5:
return # need minimum data
# Check 1: Cost rate
window_cost = sum(e.cost for e in window)
cost_per_minute = window_cost / (self.window_seconds / 60)
if cost_per_minute > self.cost_rate_limit:
self._fire_alert(
"cost_rate_exceeded",
f"Cost rate ${cost_per_minute:.2f}/min exceeds "
f"${self.cost_rate_limit:.2f}/min limit",
"critical"
)
# Check 2: Cache miss rate
cache_eligible = [e for e in window if e.input_tokens > 5000]
if cache_eligible:
cache_hits = sum(
1 for e in cache_eligible if e.cache_read_tokens > 0
)
miss_rate = 1 - (cache_hits / len(cache_eligible))
if miss_rate > self.cache_miss_threshold:
self._fire_alert(
"high_cache_miss_rate",
f"Cache miss rate {miss_rate:.0%} in last "
f"{self.window_seconds}s "
f"({len(cache_eligible)} eligible requests)",
"warning"
)
# Check 3: Token spike
window_tokens = sum(
e.input_tokens + e.output_tokens for e in window
)
if (self.baseline_tokens_per_minute > 0 and
window_tokens > self.baseline_tokens_per_minute
* self.token_spike_multiplier):
self._fire_alert(
"token_spike",
f"Token rate {window_tokens:,}/min is "
f"{window_tokens / self.baseline_tokens_per_minute:.1f}x "
f"baseline",
"warning"
)
def _fire_alert(self, alert_type: str, message: str,
severity: str):
for cb in self.alert_callbacks:
cb(alert_type, message, severity)
def set_baseline(self):
"""Set baseline from current window data."""
window = self._get_window()
if window:
minutes = self.window_seconds / 60
self.baseline_cost_per_minute = sum(
e.cost for e in window
) / minutes
self.baseline_tokens_per_minute = sum(
e.input_tokens + e.output_tokens for e in window
) / minutes
def slack_alert(alert_type: str, message: str, severity: str):
"""Send alert to console (replace with Slack webhook)."""
icon = "!!" if severity == "critical" else "!"
print(f"[{icon} {severity.upper()}] {alert_type}: {message}")
# Usage
monitor = StreamingMonitor(
window_seconds=60,
cost_rate_limit=1.50,
cache_miss_threshold=0.40,
)
monitor.on_alert(slack_alert)
client = anthropic.Anthropic()
# Wrap your API calls
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Process this data"}]
)
monitor.ingest("claude-sonnet-4-6", response.usage)
The Tradeoffs
In-process monitoring adds CPU overhead to every API response handler. For most workloads this is negligible (microseconds), but at 10,000+ requests/second, consider offloading to a separate monitoring service via a message queue. The rolling window approach uses memory proportional to request volume – at 1,000 requests/minute with 60-second windows, you’re storing 60,000 events. The maxlen=100000 deque cap prevents unbounded growth. False positives from natural traffic spikes require baseline calibration; run for 24 hours before setting thresholds.
Implementation Checklist
- Deploy the StreamingMonitor alongside your API client
- Run for 24 hours to establish baselines before enabling alerts
- Set initial thresholds at 3x baseline to avoid false positives
- Connect alerts to Slack, PagerDuty, or your incident management system
- Build auto-remediation for common issues (cache refresh, circuit breaker)
- Monitor the monitor: alert if the monitoring pipeline itself goes down
- Review and tighten thresholds weekly based on false positive/negative rates
Measuring Impact
Measure mean-time-to-detection (MTTD) for cost anomalies before and after deploying the pipeline. Without monitoring, MTTD is typically 8-24 hours (next batch report or human observation). With real-time monitoring, MTTD drops to 1-5 minutes. Cost impact: multiply the MTTD reduction by your average anomaly cost rate. For a $2.00/minute cache miss incident, reducing detection from 1 hour to 1 minute saves $118.00 per incident.