Real-Time Claude Token Monitoring Pipeline

Written by Michael Lip · Solo founder of Zovo · $400K+ on Upwork · 100% JSS Join 50+ builders · More at zovo.one

Batch-processed cost reports tell you what happened yesterday. A real-time monitoring pipeline tells you what’s happening now – and that 60-second difference can save $237.50 per cache miss incident. When a system prompt cache expires and 1,000 Opus 4.7 requests suddenly process 50,000 uncached tokens each, you need detection in seconds, not hours. This guide builds a streaming pipeline that processes every API response’s usage data in real time and triggers alerts within one minute of an anomaly.

The Setup

Real-time monitoring requires three components: a collection layer (captures usage from every API response), a processing layer (calculates rolling metrics and detects anomalies), and an alerting layer (sends notifications when thresholds are breached). The Claude API doesn’t offer webhooks or streaming analytics – you build this from the response data your application already receives. The key architectural decision is whether to process in-process (low latency, coupled to application) or out-of-process (via a message queue, decoupled but adds infrastructure).

The Math

Time-to-detection directly correlates with cost exposure:

Without real-time monitoring (daily batch reports):

With real-time monitoring (60-second detection):

Savings per incident: $224.17 (99.6% reduction)

The Technique

Build a lightweight in-process monitoring pipeline using rolling windows.

import anthropic
import time
import threading
from collections import deque
from dataclasses import dataclass, field

PRICING = {
    "claude-opus-4-7": {"input": 5.00, "output": 25.00, "cache_read": 0.50},
    "claude-sonnet-4-6": {"input": 3.00, "output": 15.00, "cache_read": 0.30},
    "claude-haiku-4-5": {"input": 1.00, "output": 5.00, "cache_read": 0.10},
}

@dataclass
class TokenEvent:
    timestamp: float
    model: str
    input_tokens: int
    output_tokens: int
    cache_read_tokens: int
    cache_write_tokens: int
    cost: float

@dataclass
class StreamingMonitor:
    """Real-time token monitoring with rolling windows."""
    window_seconds: int = 60
    events: deque = field(default_factory=lambda: deque(maxlen=100000))
    alert_callbacks: list = field(default_factory=list)

    # Thresholds
    cost_rate_limit: float = 2.00  # max $/minute
    cache_miss_threshold: float = 0.50  # alert if >50% cache misses
    token_spike_multiplier: float = 3.0  # alert if 3x normal rate

    # Baseline (set from first hour of data)
    baseline_cost_per_minute: float = 0.0
    baseline_tokens_per_minute: int = 0

    def on_alert(self, callback):
        """Register an alert callback."""
        self.alert_callbacks.append(callback)

    def ingest(self, model: str, usage):
        """Process a single API response's usage data."""
        prices = PRICING.get(model, PRICING["claude-sonnet-4-6"])

        cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0
        cache_write = getattr(usage, "cache_creation_input_tokens", 0) or 0

        cost = (
            usage.input_tokens * prices["input"] / 1_000_000
            + usage.output_tokens * prices["output"] / 1_000_000
            + cache_read * prices["cache_read"] / 1_000_000
        )

        event = TokenEvent(
            timestamp=time.time(),
            model=model,
            input_tokens=usage.input_tokens,
            output_tokens=usage.output_tokens,
            cache_read_tokens=cache_read,
            cache_write_tokens=cache_write,
            cost=cost,
        )
        self.events.append(event)
        self._check_thresholds()

    def _get_window(self) -> list[TokenEvent]:
        """Get events within the current window."""
        cutoff = time.time() - self.window_seconds
        return [e for e in self.events if e.timestamp > cutoff]

    def _check_thresholds(self):
        """Check all thresholds against the current window."""
        window = self._get_window()
        if len(window) < 5:
            return  # need minimum data

        # Check 1: Cost rate
        window_cost = sum(e.cost for e in window)
        cost_per_minute = window_cost / (self.window_seconds / 60)

        if cost_per_minute > self.cost_rate_limit:
            self._fire_alert(
                "cost_rate_exceeded",
                f"Cost rate ${cost_per_minute:.2f}/min exceeds "
                f"${self.cost_rate_limit:.2f}/min limit",
                "critical"
            )

        # Check 2: Cache miss rate
        cache_eligible = [e for e in window if e.input_tokens > 5000]
        if cache_eligible:
            cache_hits = sum(
                1 for e in cache_eligible if e.cache_read_tokens > 0
            )
            miss_rate = 1 - (cache_hits / len(cache_eligible))
            if miss_rate > self.cache_miss_threshold:
                self._fire_alert(
                    "high_cache_miss_rate",
                    f"Cache miss rate {miss_rate:.0%} in last "
                    f"{self.window_seconds}s "
                    f"({len(cache_eligible)} eligible requests)",
                    "warning"
                )

        # Check 3: Token spike
        window_tokens = sum(
            e.input_tokens + e.output_tokens for e in window
        )
        if (self.baseline_tokens_per_minute > 0 and
                window_tokens > self.baseline_tokens_per_minute
                * self.token_spike_multiplier):
            self._fire_alert(
                "token_spike",
                f"Token rate {window_tokens:,}/min is "
                f"{window_tokens / self.baseline_tokens_per_minute:.1f}x "
                f"baseline",
                "warning"
            )

    def _fire_alert(self, alert_type: str, message: str,
                     severity: str):
        for cb in self.alert_callbacks:
            cb(alert_type, message, severity)

    def set_baseline(self):
        """Set baseline from current window data."""
        window = self._get_window()
        if window:
            minutes = self.window_seconds / 60
            self.baseline_cost_per_minute = sum(
                e.cost for e in window
            ) / minutes
            self.baseline_tokens_per_minute = sum(
                e.input_tokens + e.output_tokens for e in window
            ) / minutes


def slack_alert(alert_type: str, message: str, severity: str):
    """Send alert to console (replace with Slack webhook)."""
    icon = "!!" if severity == "critical" else "!"
    print(f"[{icon} {severity.upper()}] {alert_type}: {message}")


# Usage
monitor = StreamingMonitor(
    window_seconds=60,
    cost_rate_limit=1.50,
    cache_miss_threshold=0.40,
)
monitor.on_alert(slack_alert)

client = anthropic.Anthropic()

# Wrap your API calls
response = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Process this data"}]
)
monitor.ingest("claude-sonnet-4-6", response.usage)

The Tradeoffs

In-process monitoring adds CPU overhead to every API response handler. For most workloads this is negligible (microseconds), but at 10,000+ requests/second, consider offloading to a separate monitoring service via a message queue. The rolling window approach uses memory proportional to request volume – at 1,000 requests/minute with 60-second windows, you’re storing 60,000 events. The maxlen=100000 deque cap prevents unbounded growth. False positives from natural traffic spikes require baseline calibration; run for 24 hours before setting thresholds.

Implementation Checklist

Measuring Impact

Measure mean-time-to-detection (MTTD) for cost anomalies before and after deploying the pipeline. Without monitoring, MTTD is typically 8-24 hours (next batch report or human observation). With real-time monitoring, MTTD drops to 1-5 minutes. Cost impact: multiply the MTTD reduction by your average anomaly cost rate. For a $2.00/minute cache miss incident, reducing detection from 1 hour to 1 minute saves $118.00 per incident.