Master patterns for integrating with LLM APIs reliably at scale. Learn error handling, rate limiting, caching, cost optimization, and production-ready architectures for OpenAI, Anthropic, and other providers.
Integrating with LLM APIs seems straightforward—send a request, get a response. But production systems face rate limits, network failures, cost overruns, and latency spikes. The difference between a prototype that works in demo and a production system that reliably serves thousands of users is robust API integration patterns.
This guide covers battle-tested patterns for production LLM applications: exponential backoff for transient failures, circuit breakers to prevent cascade failures, rate limiting to stay within quotas, intelligent caching to reduce costs and latency, fallback strategies when primary providers fail, and comprehensive monitoring to detect issues early.
Whether you're building with OpenAI, Anthropic Claude, Google Gemini, or multiple providers, these patterns apply universally. You'll learn not just how to make API calls, but how to build resilient systems that gracefully handle the inevitable failures and scale from hundreds to millions of requests per day.
API calls fail. Networks have hiccups, services have outages, and rate limits are hit. Robust error handling is non-negotiable.
| Error Type | HTTP Code | Retry? | Strategy |
|---|---|---|---|
| Rate Limit | 429 | Yes | Exponential backoff with jitter |
| Server Error | 500, 502, 503 | Yes | Retry 2-3 times |
| Timeout | - | Yes | Retry with longer timeout |
| Invalid Request | 400 | No | Log and return error to user |
| Auth Error | 401, 403 | No | Check API key, alert team |
| Not Found | 404 | No | Invalid endpoint/model |
import time
import random
from typing import Callable, Any
class ExponentialBackoff:
def __init__(self, max_retries=3, base_delay=1, max_delay=60):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def execute(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with exponential backoff retry."""
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except RateLimitError as e:
if attempt == self.max_retries - 1:
raise # Final attempt, re-raise
# Calculate delay with exponential backoff
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay
)
# Add jitter (randomness) to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
logging.warning(f"Rate limited. Retrying in {total_delay:.1f}s...")
time.sleep(total_delay)
except (ServerError, TimeoutError) as e:
if attempt == self.max_retries - 1:
raise
delay = self.base_delay * (2 ** attempt)
logging.warning(f"Transient error. Retrying in {delay}s...")
time.sleep(delay)
except ClientError as e:
# Don't retry client errors (4xx)
logging.error(f"Client error: {e}")
raise
# Usage
backoff = ExponentialBackoff(max_retries=3, base_delay=1, max_delay=60)
def call_openai():
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello"}]
)
response = backoff.execute(call_openai)from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from openai import RateLimitError, APIError
@retry(
retry=retry_if_exception_type((RateLimitError, APIError)),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(3),
before_sleep=lambda retry_state: logging.info(f"Retrying after {retry_state.next_action.sleep}s...")
)
def call_llm_with_retry(prompt):
"""Call LLM with automatic retry logic."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
return response.choices[0].message.content
# Automatically handles retries
result = call_llm_with_retry("What is AI?")Prevent cascading failures when API is consistently failing:
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout # seconds
self.failures = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
# Check if timeout has passed
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = CircuitState.HALF_OPEN
logging.info("Circuit breaker: Entering HALF_OPEN state")
else:
raise CircuitBreakerOpenError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
# Success - reset failures
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
logging.info("Circuit breaker: Recovered, entering CLOSED state")
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
logging.error(f"Circuit breaker: OPEN after {self.failures} failures")
raise
# Usage
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
def make_api_call():
try:
return circuit_breaker.call(call_openai)
except CircuitBreakerOpenError:
# Fallback behavior
return use_cached_response() or return_generic_error()API providers impose rate limits. Exceeding them causes errors and potential bans. Proactive rate limiting is essential.
OpenAI (as of early 2025):
Anthropic Claude:
import time
import threading
class TokenBucketRateLimiter:
def __init__(self, rate, capacity):
"""
rate: tokens added per second
capacity: maximum tokens in bucket
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = threading.Lock()
def acquire(self, tokens=1):
"""Acquire tokens, block if not available."""
with self.lock:
while True:
now = time.time()
elapsed = now - self.last_update
# Add new tokens based on elapsed time
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# Not enough tokens, wait
sleep_time = (tokens - self.tokens) / self.rate
time.sleep(sleep_time)
# Usage: 60 requests per minute = 1 request per second
rate_limiter = TokenBucketRateLimiter(rate=1, capacity=60)
def call_api():
rate_limiter.acquire() # Blocks until token available
return client.chat.completions.create(...)class MultiProviderRateLimiter:
def __init__(self):
self.limiters = {
"openai": TokenBucketRateLimiter(rate=10000/60, capacity=10000), # 10K RPM
"anthropic": TokenBucketRateLimiter(rate=50/60, capacity=50), # 50 RPM
"google": TokenBucketRateLimiter(rate=60/60, capacity=60) # 60 RPM
}
def call_with_limit(self, provider, func, *args, **kwargs):
"""Call API respecting provider-specific rate limits."""
self.limiters[provider].acquire()
return func(*args, **kwargs)
# Usage
limiter = MultiProviderRateLimiter()
# OpenAI call
response = limiter.call_with_limit(
"openai",
lambda: client.chat.completions.create(...)
)
# Claude call
response = limiter.call_with_limit(
"anthropic",
lambda: anthropic_client.messages.create(...)
)class AdaptiveRateLimiter:
def __init__(self, initial_rate):
self.current_rate = initial_rate
self.min_rate = initial_rate * 0.1
self.max_rate = initial_rate * 2
self.consecutive_errors = 0
def on_success(self):
"""Increase rate after successful calls."""
self.consecutive_errors = 0
# Gradually increase rate
self.current_rate = min(
self.max_rate,
self.current_rate * 1.1
)
def on_rate_limit(self):
"""Decrease rate when hitting limits."""
self.consecutive_errors += 1
# Aggressively decrease rate
self.current_rate = max(
self.min_rate,
self.current_rate * 0.5
)
logging.warning(f"Rate limit hit. Reducing to {self.current_rate:.1f} req/s")
def get_delay(self):
"""Get current delay between requests."""
return 1.0 / self.current_rate
# Usage
limiter = AdaptiveRateLimiter(initial_rate=100) # 100 req/s
while True:
try:
time.sleep(limiter.get_delay())
response = call_api()
limiter.on_success()
except RateLimitError:
limiter.on_rate_limit()Caching can reduce API costs by 30-70% and improve latency dramatically. Let's implement smart caching strategies.
import hashlib
import json
from datetime import datetime, timedelta
from typing import Any, Optional
class ResponseCache:
def __init__(self, ttl_seconds=3600):
self.cache = {} # In production: use Redis
self.ttl = ttl_seconds
def _generate_key(self, model, messages, **kwargs):
"""Generate cache key from request parameters."""
# Create deterministic hash
cache_data = {
"model": model,
"messages": messages,
"temperature": kwargs.get("temperature", 0),
"max_tokens": kwargs.get("max_tokens")
}
key_string = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(key_string.encode()).hexdigest()
def get(self, model, messages, **kwargs) -> Optional[str]:
"""Get cached response if available and fresh."""
key = self._generate_key(model, messages, **kwargs)
cached = self.cache.get(key)
if not cached:
return None
# Check if expired
if datetime.now() > cached["expires_at"]:
del self.cache[key]
return None
return cached["response"]
def set(self, model, messages, response, **kwargs):
"""Cache a response."""
key = self._generate_key(model, messages, **kwargs)
self.cache[key] = {
"response": response,
"cached_at": datetime.now(),
"expires_at": datetime.now() + timedelta(seconds=self.ttl)
}
# Usage
cache = ResponseCache(ttl_seconds=3600) # 1 hour TTL
def cached_llm_call(model, messages, **kwargs):
"""LLM call with caching."""
# Check cache
cached_response = cache.get(model, messages, **kwargs)
if cached_response:
logging.info("Cache hit!")
return cached_response
# Cache miss - make API call
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
result = response.choices[0].message.content
# Cache response
cache.set(model, messages, result, **kwargs)
return resultCache based on semantic similarity, not exact match:
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
class SemanticCache:
def __init__(self, similarity_threshold=0.95):
self.cache = [] # List of (embedding, response) tuples
self.threshold = similarity_threshold
def _embed(self, text):
"""Generate embedding for text."""
# Use cheap, fast embedding model
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def get(self, query: str) -> Optional[str]:
"""Get cached response if semantically similar query exists."""
if not self.cache:
return None
# Embed query
query_embedding = self._embed(query)
# Find most similar cached query
similarities = []
for cached_embedding, _ in self.cache:
sim = cosine_similarity(
[query_embedding],
[cached_embedding]
)[0][0]
similarities.append(sim)
max_similarity = max(similarities)
# Return if above threshold
if max_similarity >= self.threshold:
idx = similarities.index(max_similarity)
logging.info(f"Semantic cache hit! Similarity: {max_similarity:.3f}")
return self.cache[idx][1]
return None
def set(self, query: str, response: str):
"""Cache a query-response pair."""
query_embedding = self._embed(query)
self.cache.append((query_embedding, response))
# Limit cache size
if len(self.cache) > 1000:
self.cache.pop(0) # Remove oldest
# Usage
semantic_cache = SemanticCache(similarity_threshold=0.95)
def cached_query(query):
# Check semantic cache
cached = semantic_cache.get(query)
if cached:
return cached
# Make API call
response = call_llm(query)
# Cache result
semantic_cache.set(query, response)
return response
# These will hit cache despite different wording:
result1 = cached_query("What is machine learning?")
result2 = cached_query("Can you explain machine learning?") # Cache hit!class CacheWarmer:
"""Pre-populate cache with common queries."""
def __init__(self, cache, llm_client):
self.cache = cache
self.client = llm_client
def warm_cache(self, common_queries):
"""Warm cache with common queries."""
logging.info(f"Warming cache with {len(common_queries)} queries...")
for query in common_queries:
# Check if already cached
if self.cache.get(query):
continue
# Generate and cache response
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}]
)
self.cache.set(query, response.choices[0].message.content)
time.sleep(0.1) # Rate limit
# Usage: Warm cache during off-peak hours
common_queries = [
"How do I reset my password?",
"What are your business hours?",
"How do I contact support?",
# ... more common questions
]
warmer = CacheWarmer(cache, client)
warmer.warm_cache(common_queries)When your primary LLM provider fails, having fallbacks prevents downtime.
from typing import List, Callable
class MultiProviderClient:
def __init__(self, providers):
"""
providers: List of (name, client_func, model) tuples
"""
self.providers = providers
def call_with_fallback(self, messages, **kwargs):
"""Try providers in order until one succeeds."""
errors = []
for name, client_func, model in self.providers:
try:
logging.info(f"Trying provider: {name}")
response = client_func(
model=model,
messages=messages,
**kwargs
)
logging.info(f"Success with provider: {name}")
return {
"content": response,
"provider": name,
"model": model
}
except Exception as e:
logging.warning(f"Provider {name} failed: {e}")
errors.append((name, str(e)))
continue
# All providers failed
raise AllProvidersFailedError(f"All providers failed: {errors}")
# Setup providers in order of preference
providers = [
("openai-primary", lambda **kw: openai_client.chat.completions.create(**kw), "gpt-4o-mini"),
("anthropic-fallback", lambda **kw: anthropic_client.messages.create(**kw), "claude-3-5-haiku-20241022"),
("google-fallback", lambda **kw: google_client.generate_content(**kw), "gemini-1.5-flash")
]
multi_client = MultiProviderClient(providers)
# Automatically falls back if OpenAI is down
response = multi_client.call_with_fallback(
messages=[{"role": "user", "content": "Hello"}]
)class GracefulDegradation:
"""Provide degraded service when primary LLM fails."""
def __init__(self, llm_client, fallback_responses):
self.client = llm_client
self.fallback_responses = fallback_responses # Simple rule-based responses
def get_response(self, query):
"""Get response with graceful degradation."""
try:
# Try LLM first
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
timeout=5
)
return {
"content": response.choices[0].message.content,
"source": "llm"
}
except Exception as e:
logging.warning(f"LLM failed, using fallback: {e}")
# Try rule-based fallback
for pattern, response in self.fallback_responses.items():
if pattern.lower() in query.lower():
return {
"content": response,
"source": "rule-based",
"degraded": True
}
# Ultimate fallback
return {
"content": "I'm experiencing technical difficulties. Please try again or contact support at support@example.com",
"source": "generic-fallback",
"degraded": True
}
# Define simple fallback responses
fallback_responses = {
"password": "To reset your password, visit https://example.com/reset or contact support.",
"hours": "Our support hours are Monday-Friday 9 AM - 5 PM AEST.",
"contact": "Contact us at support@example.com or call 1-800-EXAMPLE."
}
degradation = GracefulDegradation(client, fallback_responses)
response = degradation.get_response("How do I reset my password?")LLM API costs can spiral quickly. Let's implement cost controls.
class CostTracker:
def __init__(self):
self.costs = []
self.pricing = {
"gpt-4o": {"input": 5.00, "output": 15.00}, # per 1M tokens
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.25, "output": 1.25}
}
def track_request(self, model, input_tokens, output_tokens):
"""Track cost of a request."""
if model not in self.pricing:
logging.warning(f"Unknown model for pricing: {model}")
return
rates = self.pricing[model]
cost = (
(input_tokens / 1_000_000) * rates["input"] +
(output_tokens / 1_000_000) * rates["output"]
)
self.costs.append({
"timestamp": datetime.now(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost
})
return cost
def get_total_cost(self, since=None):
"""Get total cost, optionally since a timestamp."""
if since:
relevant = [c for c in self.costs if c["timestamp"] >= since]
else:
relevant = self.costs
return sum(c["cost_usd"] for c in relevant)
def get_cost_by_model(self):
"""Get costs broken down by model."""
by_model = {}
for cost_entry in self.costs:
model = cost_entry["model"]
by_model[model] = by_model.get(model, 0) + cost_entry["cost_usd"]
return by_model
# Usage
tracker = CostTracker()
response = client.chat.completions.create(...)
# Track cost
cost = tracker.track_request(
model="gpt-4o-mini",
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens
)
# Check daily spending
daily_cost = tracker.get_total_cost(since=datetime.now() - timedelta(days=1))
if daily_cost > 100: # $100 daily budget
alert("Daily budget exceeded!", {"cost": daily_cost})class CostOptimizedRouter:
"""Route requests to cheapest model that meets requirements."""
def route_request(self, query, required_quality="medium"):
"""Route to appropriate model based on requirements."""
# Classify query complexity
complexity = self._estimate_complexity(query)
# Route based on complexity and required quality
if required_quality == "highest" or complexity > 0.8:
return {"provider": "openai", "model": "gpt-4o"}
elif complexity > 0.5 or required_quality == "high":
return {"provider": "anthropic", "model": "claude-3-5-sonnet"}
else: # Simple query
return {"provider": "openai", "model": "gpt-4o-mini"}
def _estimate_complexity(self, query):
"""Estimate query complexity (0-1)."""
# Simple heuristics
indicators = {
"length": len(query.split()) > 50,
"technical": any(term in query.lower() for term in ["code", "algorithm", "calculate"]),
"multi_part": any(word in query.lower() for word in ["first", "then", "also", "additionally"])
}
return sum(indicators.values()) / len(indicators)
# Usage
router = CostOptimizedRouter()
routing = router.route_request("What's 2+2?") # → gpt-4o-mini (cheap)
routing = router.route_request("Explain quantum computing and write code to simulate a qubit") # → gpt-4o (complex)class BudgetEnforcer:
def __init__(self, daily_budget_usd):
self.daily_budget = daily_budget_usd
self.tracker = CostTracker()
def check_budget(self):
"""Check if budget allows more requests."""
daily_cost = self.tracker.get_total_cost(
since=datetime.now() - timedelta(days=1)
)
if daily_cost >= self.daily_budget:
return False, f"Daily budget exceeded: ${daily_cost:.2f} / ${self.daily_budget:.2f}"
return True, f"Budget OK: ${daily_cost:.2f} / ${self.daily_budget:.2f}"
def call_with_budget(self, func, *args, **kwargs):
"""Execute function only if budget allows."""
allowed, message = self.check_budget()
if not allowed:
raise BudgetExceededError(message)
# Execute and track
response = func(*args, **kwargs)
self.tracker.track_request(
model=kwargs.get("model"),
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens
)
return response
# Usage
enforcer = BudgetEnforcer(daily_budget_usd=100)
try:
response = enforcer.call_with_budget(
client.chat.completions.create,
model="gpt-4o-mini",
messages=[...]
)
except BudgetExceededError as e:
# Use cached response or return error
return "Service temporarily limited. Please try again later."You can't fix what you can't see. Comprehensive monitoring is essential.
from dataclasses import dataclass, asdict
import json
@dataclass
class RequestMetrics:
request_id: str
timestamp: datetime
provider: str
model: str
latency_ms: float
input_tokens: int
output_tokens: int
cost_usd: float
success: bool
error: str = None
class MetricsCollector:
def __init__(self):
self.metrics = []
def record_request(self, metrics: RequestMetrics):
"""Record request metrics."""
self.metrics.append(metrics)
# Send to monitoring service (DataDog, CloudWatch, etc.)
self._send_to_datadog(metrics)
# Alert on anomalies
self._check_for_anomalies(metrics)
def _send_to_datadog(self, metrics):
"""Send metrics to DataDog."""
# Use datadog library
from datadog import statsd
statsd.increment('llm.requests',
tags=[f"provider:{metrics.provider}", f"model:{metrics.model}"])
statsd.histogram('llm.latency', metrics.latency_ms,
tags=[f"provider:{metrics.provider}"])
statsd.gauge('llm.cost', metrics.cost_usd,
tags=[f"model:{metrics.model}"])
if not metrics.success:
statsd.increment('llm.errors',
tags=[f"provider:{metrics.provider}", f"error:{metrics.error}"])
def _check_for_anomalies(self, metrics):
"""Check for anomalous behavior."""
# High latency
if metrics.latency_ms > 10000: # 10 seconds
alert("High latency detected", {"latency": metrics.latency_ms})
# Unexpected cost
expected_cost = (metrics.input_tokens + metrics.output_tokens) / 1_000_000 * 0.60 # rough estimate
if metrics.cost_usd > expected_cost * 10:
alert("Unexpected high cost", {"cost": metrics.cost_usd, "expected": expected_cost})
# Usage with request tracking
def tracked_llm_call(prompt, model="gpt-4o-mini"):
"""LLM call with full metrics tracking."""
request_id = str(uuid.uuid4())
start_time = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
latency = (time.time() - start_time) * 1000
metrics = RequestMetrics(
request_id=request_id,
timestamp=datetime.now(),
provider="openai",
model=model,
latency_ms=latency,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
cost_usd=calculate_cost(model, response.usage),
success=True
)
metrics_collector.record_request(metrics)
return response.choices[0].message.content
except Exception as e:
latency = (time.time() - start_time) * 1000
metrics = RequestMetrics(
request_id=request_id,
timestamp=datetime.now(),
provider="openai",
model=model,
latency_ms=latency,
input_tokens=0,
output_tokens=0,
cost_usd=0,
success=False,
error=str(e)
)
metrics_collector.record_request(metrics)
raiseclass HealthChecker:
"""Check health of LLM integrations."""
def check_all_providers(self):
"""Check health of all providers."""
results = {}
for provider in ["openai", "anthropic", "google"]:
results[provider] = self.check_provider(provider)
return results
def check_provider(self, provider):
"""Check if provider is healthy."""
start_time = time.time()
try:
# Make simple test request
if provider == "openai":
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "test"}],
max_tokens=5,
timeout=5
)
latency = (time.time() - start_time) * 1000
return {
"healthy": True,
"latency_ms": latency,
"error": None
}
except Exception as e:
return {
"healthy": False,
"latency_ms": None,
"error": str(e)
}
# Run health checks every minute
def run_health_checks():
checker = HealthChecker()
while True:
results = checker.check_all_providers()
for provider, health in results.items():
if not health["healthy"]:
alert(f"Provider {provider} is unhealthy", health)
time.sleep(60)
# Run in background thread
threading.Thread(target=run_health_checks, daemon=True).start()Building reliable LLM API integrations requires more than just calling an endpoint. Production systems need comprehensive error handling with exponential backoff, proactive rate limiting to avoid hitting quotas, intelligent caching to reduce costs and latency, multi-provider fallbacks for resilience, budget enforcement to prevent cost overruns, and continuous monitoring to detect issues early.
The patterns in this guide—circuit breakers, semantic caching, graceful degradation, cost tracking, and health checks—transform fragile prototypes into robust production systems. Start with the basics (retry logic, rate limiting, simple caching) and incrementally add sophistication (semantic caching, multi-provider fallbacks, predictive budgeting) as your application scales.
Remember that API integration is not "set and forget." Monitor continuously, alert on anomalies, and be prepared to adjust strategies as your usage patterns evolve, providers change their limits, and costs fluctuate. The investment in robust integration patterns pays dividends in reliability, cost savings, and peace of mind.
Learn how to build a production-ready RAG (Retrieval Augmented Generation) system from scratch with practical code examples, architecture patterns, and best practices.
Comprehensive guide to testing AI applications. Learn evaluation frameworks, test dataset creation, automated testing, regression detection, and quality assurance for production LLM systems.
Complete guide to deploying and scaling AI applications in production. Learn infrastructure patterns, load balancing, caching, monitoring, cost optimization, and strategies for handling thousands to millions of users.