LearnImplementation GuidesAPI Integration Patterns: Building Reliable, Scalable LLM Applications
intermediate
13 min read
20 January 2025

API Integration Patterns: Building Reliable, Scalable LLM Applications

Master patterns for integrating with LLM APIs reliably at scale. Learn error handling, rate limiting, caching, cost optimization, and production-ready architectures for OpenAI, Anthropic, and other providers.

Clever Ops AI Team

Integrating with LLM APIs seems straightforward—send a request, get a response. But production systems face rate limits, network failures, cost overruns, and latency spikes. The difference between a prototype that works in demo and a production system that reliably serves thousands of users is robust API integration patterns.

This guide covers battle-tested patterns for production LLM applications: exponential backoff for transient failures, circuit breakers to prevent cascade failures, rate limiting to stay within quotas, intelligent caching to reduce costs and latency, fallback strategies when primary providers fail, and comprehensive monitoring to detect issues early.

Whether you're building with OpenAI, Anthropic Claude, Google Gemini, or multiple providers, these patterns apply universally. You'll learn not just how to make API calls, but how to build resilient systems that gracefully handle the inevitable failures and scale from hundreds to millions of requests per day.

Key Takeaways

  • Use exponential backoff with jitter for retry logic—prevents thundering herd and respects rate limits. Libraries like tenacity make implementation trivial.
  • Implement circuit breakers to prevent cascade failures—after 5 consecutive errors, stop making requests for 60 seconds to allow recovery time
  • Rate limiting is mandatory: use token bucket algorithm respecting provider limits (OpenAI: 3-10K RPM, Anthropic: 5-4K RPM depending on tier)
  • Caching reduces costs 30-70%: exact match caching with 1-hour TTL for deterministic queries, semantic caching for similar questions, cache warming for common queries
  • Multi-provider fallback prevents downtime: configure OpenAI → Anthropic → Google fallback chain, with graceful degradation to rule-based responses as last resort
  • Track costs per request and enforce daily budgets—alert at 80% budget utilization, block requests at 100% to prevent surprise bills
  • Monitor latency (p50, p95, p99), error rate, cost per request, and provider health—alert when metrics deviate >50% from baseline

Error Handling and Retry Strategies

API calls fail. Networks have hiccups, services have outages, and rate limits are hit. Robust error handling is non-negotiable.

Understanding Error Types

Error Type HTTP Code Retry? Strategy
Rate Limit429YesExponential backoff with jitter
Server Error500, 502, 503YesRetry 2-3 times
Timeout-YesRetry with longer timeout
Invalid Request400NoLog and return error to user
Auth Error401, 403NoCheck API key, alert team
Not Found404NoInvalid endpoint/model

Exponential Backoff with Jitter

Exponential Backoff Implementationpython
import time
import random
from typing import Callable, Any

class ExponentialBackoff:
    def __init__(self, max_retries=3, base_delay=1, max_delay=60):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay

    def execute(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with exponential backoff retry."""
        for attempt in range(self.max_retries):
            try:
                return func(*args, **kwargs)

            except RateLimitError as e:
                if attempt == self.max_retries - 1:
                    raise  # Final attempt, re-raise

                # Calculate delay with exponential backoff
                delay = min(
                    self.base_delay * (2 ** attempt),
                    self.max_delay
                )

                # Add jitter (randomness) to prevent thundering herd
                jitter = random.uniform(0, delay * 0.1)
                total_delay = delay + jitter

                logging.warning(f"Rate limited. Retrying in {total_delay:.1f}s...")
                time.sleep(total_delay)

            except (ServerError, TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    raise

                delay = self.base_delay * (2 ** attempt)
                logging.warning(f"Transient error. Retrying in {delay}s...")
                time.sleep(delay)

            except ClientError as e:
                # Don't retry client errors (4xx)
                logging.error(f"Client error: {e}")
                raise

# Usage
backoff = ExponentialBackoff(max_retries=3, base_delay=1, max_delay=60)

def call_openai():
    return client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello"}]
    )

response = backoff.execute(call_openai)

Using Tenacity Library (Recommended)

Using Tenacity for Retriespython
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from openai import RateLimitError, APIError

@retry(
    retry=retry_if_exception_type((RateLimitError, APIError)),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    stop=stop_after_attempt(3),
    before_sleep=lambda retry_state: logging.info(f"Retrying after {retry_state.next_action.sleep}s...")
)
def call_llm_with_retry(prompt):
    """Call LLM with automatic retry logic."""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        timeout=30
    )
    return response.choices[0].message.content

# Automatically handles retries
result = call_llm_with_retry("What is AI?")

Circuit Breaker Pattern

Prevent cascading failures when API is consistently failing:

Circuit Breaker Implementationpython
from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout  # seconds
        self.failures = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = None

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection."""
        if self.state == CircuitState.OPEN:
            # Check if timeout has passed
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = CircuitState.HALF_OPEN
                logging.info("Circuit breaker: Entering HALF_OPEN state")
            else:
                raise CircuitBreakerOpenError("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)

            # Success - reset failures
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                logging.info("Circuit breaker: Recovered, entering CLOSED state")

            self.failures = 0
            return result

        except Exception as e:
            self.failures += 1
            self.last_failure_time = datetime.now()

            if self.failures >= self.failure_threshold:
                self.state = CircuitState.OPEN
                logging.error(f"Circuit breaker: OPEN after {self.failures} failures")

            raise

# Usage
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)

def make_api_call():
    try:
        return circuit_breaker.call(call_openai)
    except CircuitBreakerOpenError:
        # Fallback behavior
        return use_cached_response() or return_generic_error()

Rate Limiting and Quota Management

API providers impose rate limits. Exceeding them causes errors and potential bans. Proactive rate limiting is essential.

Understanding Rate Limits

OpenAI (as of early 2025):

  • Tier 1 (Free): 3 RPM (requests per minute), 200 RPD (requests per day)
  • Tier 2 ($5+ spent): 3,500 RPM, 10,000 RPD
  • Tier 3 ($50+ spent): 5,000 RPM, 10,000 RPD
  • Tier 4 ($1,000+ spent): 10,000 RPM, no daily limit

Anthropic Claude:

  • Free tier: 5 RPM
  • Paid tiers: 50-4,000 RPM depending on spend

Token Bucket Rate Limiter

Token Bucket Rate Limiterpython
import time
import threading

class TokenBucketRateLimiter:
    def __init__(self, rate, capacity):
        """
        rate: tokens added per second
        capacity: maximum tokens in bucket
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = threading.Lock()

    def acquire(self, tokens=1):
        """Acquire tokens, block if not available."""
        with self.lock:
            while True:
                now = time.time()
                elapsed = now - self.last_update

                # Add new tokens based on elapsed time
                self.tokens = min(
                    self.capacity,
                    self.tokens + elapsed * self.rate
                )
                self.last_update = now

                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return

                # Not enough tokens, wait
                sleep_time = (tokens - self.tokens) / self.rate
                time.sleep(sleep_time)

# Usage: 60 requests per minute = 1 request per second
rate_limiter = TokenBucketRateLimiter(rate=1, capacity=60)

def call_api():
    rate_limiter.acquire()  # Blocks until token available
    return client.chat.completions.create(...)

Multi-Provider Rate Limiting

Multi-Provider Rate Limiterpython
class MultiProviderRateLimiter:
    def __init__(self):
        self.limiters = {
            "openai": TokenBucketRateLimiter(rate=10000/60, capacity=10000),  # 10K RPM
            "anthropic": TokenBucketRateLimiter(rate=50/60, capacity=50),  # 50 RPM
            "google": TokenBucketRateLimiter(rate=60/60, capacity=60)  # 60 RPM
        }

    def call_with_limit(self, provider, func, *args, **kwargs):
        """Call API respecting provider-specific rate limits."""
        self.limiters[provider].acquire()
        return func(*args, **kwargs)

# Usage
limiter = MultiProviderRateLimiter()

# OpenAI call
response = limiter.call_with_limit(
    "openai",
    lambda: client.chat.completions.create(...)
)

# Claude call
response = limiter.call_with_limit(
    "anthropic",
    lambda: anthropic_client.messages.create(...)
)

Dynamic Rate Adjustment

Adaptive Rate Limiterpython
class AdaptiveRateLimiter:
    def __init__(self, initial_rate):
        self.current_rate = initial_rate
        self.min_rate = initial_rate * 0.1
        self.max_rate = initial_rate * 2
        self.consecutive_errors = 0

    def on_success(self):
        """Increase rate after successful calls."""
        self.consecutive_errors = 0

        # Gradually increase rate
        self.current_rate = min(
            self.max_rate,
            self.current_rate * 1.1
        )

    def on_rate_limit(self):
        """Decrease rate when hitting limits."""
        self.consecutive_errors += 1

        # Aggressively decrease rate
        self.current_rate = max(
            self.min_rate,
            self.current_rate * 0.5
        )

        logging.warning(f"Rate limit hit. Reducing to {self.current_rate:.1f} req/s")

    def get_delay(self):
        """Get current delay between requests."""
        return 1.0 / self.current_rate

# Usage
limiter = AdaptiveRateLimiter(initial_rate=100)  # 100 req/s

while True:
    try:
        time.sleep(limiter.get_delay())
        response = call_api()
        limiter.on_success()
    except RateLimitError:
        limiter.on_rate_limit()

Intelligent Caching for Cost and Performance

Caching can reduce API costs by 30-70% and improve latency dramatically. Let's implement smart caching strategies.

Response Caching with TTL

Response Cache with TTLpython
import hashlib
import json
from datetime import datetime, timedelta
from typing import Any, Optional

class ResponseCache:
    def __init__(self, ttl_seconds=3600):
        self.cache = {}  # In production: use Redis
        self.ttl = ttl_seconds

    def _generate_key(self, model, messages, **kwargs):
        """Generate cache key from request parameters."""
        # Create deterministic hash
        cache_data = {
            "model": model,
            "messages": messages,
            "temperature": kwargs.get("temperature", 0),
            "max_tokens": kwargs.get("max_tokens")
        }

        key_string = json.dumps(cache_data, sort_keys=True)
        return hashlib.md5(key_string.encode()).hexdigest()

    def get(self, model, messages, **kwargs) -> Optional[str]:
        """Get cached response if available and fresh."""
        key = self._generate_key(model, messages, **kwargs)
        cached = self.cache.get(key)

        if not cached:
            return None

        # Check if expired
        if datetime.now() > cached["expires_at"]:
            del self.cache[key]
            return None

        return cached["response"]

    def set(self, model, messages, response, **kwargs):
        """Cache a response."""
        key = self._generate_key(model, messages, **kwargs)

        self.cache[key] = {
            "response": response,
            "cached_at": datetime.now(),
            "expires_at": datetime.now() + timedelta(seconds=self.ttl)
        }

# Usage
cache = ResponseCache(ttl_seconds=3600)  # 1 hour TTL

def cached_llm_call(model, messages, **kwargs):
    """LLM call with caching."""
    # Check cache
    cached_response = cache.get(model, messages, **kwargs)
    if cached_response:
        logging.info("Cache hit!")
        return cached_response

    # Cache miss - make API call
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        **kwargs
    )

    result = response.choices[0].message.content

    # Cache response
    cache.set(model, messages, result, **kwargs)

    return result

Semantic Caching

Cache based on semantic similarity, not exact match:

Semantic Cache Implementationpython
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class SemanticCache:
    def __init__(self, similarity_threshold=0.95):
        self.cache = []  # List of (embedding, response) tuples
        self.threshold = similarity_threshold

    def _embed(self, text):
        """Generate embedding for text."""
        # Use cheap, fast embedding model
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding

    def get(self, query: str) -> Optional[str]:
        """Get cached response if semantically similar query exists."""
        if not self.cache:
            return None

        # Embed query
        query_embedding = self._embed(query)

        # Find most similar cached query
        similarities = []
        for cached_embedding, _ in self.cache:
            sim = cosine_similarity(
                [query_embedding],
                [cached_embedding]
            )[0][0]
            similarities.append(sim)

        max_similarity = max(similarities)

        # Return if above threshold
        if max_similarity >= self.threshold:
            idx = similarities.index(max_similarity)
            logging.info(f"Semantic cache hit! Similarity: {max_similarity:.3f}")
            return self.cache[idx][1]

        return None

    def set(self, query: str, response: str):
        """Cache a query-response pair."""
        query_embedding = self._embed(query)
        self.cache.append((query_embedding, response))

        # Limit cache size
        if len(self.cache) > 1000:
            self.cache.pop(0)  # Remove oldest

# Usage
semantic_cache = SemanticCache(similarity_threshold=0.95)

def cached_query(query):
    # Check semantic cache
    cached = semantic_cache.get(query)
    if cached:
        return cached

    # Make API call
    response = call_llm(query)

    # Cache result
    semantic_cache.set(query, response)

    return response

# These will hit cache despite different wording:
result1 = cached_query("What is machine learning?")
result2 = cached_query("Can you explain machine learning?")  # Cache hit!

Cache Warming

Cache Warming Strategypython
class CacheWarmer:
    """Pre-populate cache with common queries."""

    def __init__(self, cache, llm_client):
        self.cache = cache
        self.client = llm_client

    def warm_cache(self, common_queries):
        """Warm cache with common queries."""
        logging.info(f"Warming cache with {len(common_queries)} queries...")

        for query in common_queries:
            # Check if already cached
            if self.cache.get(query):
                continue

            # Generate and cache response
            response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": query}]
            )

            self.cache.set(query, response.choices[0].message.content)

            time.sleep(0.1)  # Rate limit

# Usage: Warm cache during off-peak hours
common_queries = [
    "How do I reset my password?",
    "What are your business hours?",
    "How do I contact support?",
    # ... more common questions
]

warmer = CacheWarmer(cache, client)
warmer.warm_cache(common_queries)

Fallback Strategies and Multi-Provider Resilience

When your primary LLM provider fails, having fallbacks prevents downtime.

Multi-Provider Fallback

Multi-Provider Client with Fallbackpython
from typing import List, Callable

class MultiProviderClient:
    def __init__(self, providers):
        """
        providers: List of (name, client_func, model) tuples
        """
        self.providers = providers

    def call_with_fallback(self, messages, **kwargs):
        """Try providers in order until one succeeds."""
        errors = []

        for name, client_func, model in self.providers:
            try:
                logging.info(f"Trying provider: {name}")

                response = client_func(
                    model=model,
                    messages=messages,
                    **kwargs
                )

                logging.info(f"Success with provider: {name}")
                return {
                    "content": response,
                    "provider": name,
                    "model": model
                }

            except Exception as e:
                logging.warning(f"Provider {name} failed: {e}")
                errors.append((name, str(e)))
                continue

        # All providers failed
        raise AllProvidersFailedError(f"All providers failed: {errors}")

# Setup providers in order of preference
providers = [
    ("openai-primary", lambda **kw: openai_client.chat.completions.create(**kw), "gpt-4o-mini"),
    ("anthropic-fallback", lambda **kw: anthropic_client.messages.create(**kw), "claude-3-5-haiku-20241022"),
    ("google-fallback", lambda **kw: google_client.generate_content(**kw), "gemini-1.5-flash")
]

multi_client = MultiProviderClient(providers)

# Automatically falls back if OpenAI is down
response = multi_client.call_with_fallback(
    messages=[{"role": "user", "content": "Hello"}]
)

Graceful Degradation

Graceful Degradation Strategypython
class GracefulDegradation:
    """Provide degraded service when primary LLM fails."""

    def __init__(self, llm_client, fallback_responses):
        self.client = llm_client
        self.fallback_responses = fallback_responses  # Simple rule-based responses

    def get_response(self, query):
        """Get response with graceful degradation."""
        try:
            # Try LLM first
            response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": query}],
                timeout=5
            )
            return {
                "content": response.choices[0].message.content,
                "source": "llm"
            }

        except Exception as e:
            logging.warning(f"LLM failed, using fallback: {e}")

            # Try rule-based fallback
            for pattern, response in self.fallback_responses.items():
                if pattern.lower() in query.lower():
                    return {
                        "content": response,
                        "source": "rule-based",
                        "degraded": True
                    }

            # Ultimate fallback
            return {
                "content": "I'm experiencing technical difficulties. Please try again or contact support at support@example.com",
                "source": "generic-fallback",
                "degraded": True
            }

# Define simple fallback responses
fallback_responses = {
    "password": "To reset your password, visit https://example.com/reset or contact support.",
    "hours": "Our support hours are Monday-Friday 9 AM - 5 PM AEST.",
    "contact": "Contact us at support@example.com or call 1-800-EXAMPLE."
}

degradation = GracefulDegradation(client, fallback_responses)
response = degradation.get_response("How do I reset my password?")

Cost Optimization Strategies

LLM API costs can spiral quickly. Let's implement cost controls.

Request-Level Cost Tracking

Cost Trackerpython
class CostTracker:
    def __init__(self):
        self.costs = []
        self.pricing = {
            "gpt-4o": {"input": 5.00, "output": 15.00},  # per 1M tokens
            "gpt-4o-mini": {"input": 0.15, "output": 0.60},
            "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
            "claude-3-5-haiku": {"input": 0.25, "output": 1.25}
        }

    def track_request(self, model, input_tokens, output_tokens):
        """Track cost of a request."""
        if model not in self.pricing:
            logging.warning(f"Unknown model for pricing: {model}")
            return

        rates = self.pricing[model]
        cost = (
            (input_tokens / 1_000_000) * rates["input"] +
            (output_tokens / 1_000_000) * rates["output"]
        )

        self.costs.append({
            "timestamp": datetime.now(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost
        })

        return cost

    def get_total_cost(self, since=None):
        """Get total cost, optionally since a timestamp."""
        if since:
            relevant = [c for c in self.costs if c["timestamp"] >= since]
        else:
            relevant = self.costs

        return sum(c["cost_usd"] for c in relevant)

    def get_cost_by_model(self):
        """Get costs broken down by model."""
        by_model = {}
        for cost_entry in self.costs:
            model = cost_entry["model"]
            by_model[model] = by_model.get(model, 0) + cost_entry["cost_usd"]
        return by_model

# Usage
tracker = CostTracker()

response = client.chat.completions.create(...)

# Track cost
cost = tracker.track_request(
    model="gpt-4o-mini",
    input_tokens=response.usage.prompt_tokens,
    output_tokens=response.usage.completion_tokens
)

# Check daily spending
daily_cost = tracker.get_total_cost(since=datetime.now() - timedelta(days=1))
if daily_cost > 100:  # $100 daily budget
    alert("Daily budget exceeded!", {"cost": daily_cost})

Model Routing for Cost Optimization

Cost-Optimized Model Routerpython
class CostOptimizedRouter:
    """Route requests to cheapest model that meets requirements."""

    def route_request(self, query, required_quality="medium"):
        """Route to appropriate model based on requirements."""
        # Classify query complexity
        complexity = self._estimate_complexity(query)

        # Route based on complexity and required quality
        if required_quality == "highest" or complexity > 0.8:
            return {"provider": "openai", "model": "gpt-4o"}

        elif complexity > 0.5 or required_quality == "high":
            return {"provider": "anthropic", "model": "claude-3-5-sonnet"}

        else:  # Simple query
            return {"provider": "openai", "model": "gpt-4o-mini"}

    def _estimate_complexity(self, query):
        """Estimate query complexity (0-1)."""
        # Simple heuristics
        indicators = {
            "length": len(query.split()) > 50,
            "technical": any(term in query.lower() for term in ["code", "algorithm", "calculate"]),
            "multi_part": any(word in query.lower() for word in ["first", "then", "also", "additionally"])
        }

        return sum(indicators.values()) / len(indicators)

# Usage
router = CostOptimizedRouter()
routing = router.route_request("What's 2+2?")  # → gpt-4o-mini (cheap)
routing = router.route_request("Explain quantum computing and write code to simulate a qubit")  # → gpt-4o (complex)

Budget Enforcement

Budget Enforcerpython
class BudgetEnforcer:
    def __init__(self, daily_budget_usd):
        self.daily_budget = daily_budget_usd
        self.tracker = CostTracker()

    def check_budget(self):
        """Check if budget allows more requests."""
        daily_cost = self.tracker.get_total_cost(
            since=datetime.now() - timedelta(days=1)
        )

        if daily_cost >= self.daily_budget:
            return False, f"Daily budget exceeded: ${daily_cost:.2f} / ${self.daily_budget:.2f}"

        return True, f"Budget OK: ${daily_cost:.2f} / ${self.daily_budget:.2f}"

    def call_with_budget(self, func, *args, **kwargs):
        """Execute function only if budget allows."""
        allowed, message = self.check_budget()

        if not allowed:
            raise BudgetExceededError(message)

        # Execute and track
        response = func(*args, **kwargs)

        self.tracker.track_request(
            model=kwargs.get("model"),
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens
        )

        return response

# Usage
enforcer = BudgetEnforcer(daily_budget_usd=100)

try:
    response = enforcer.call_with_budget(
        client.chat.completions.create,
        model="gpt-4o-mini",
        messages=[...]
    )
except BudgetExceededError as e:
    # Use cached response or return error
    return "Service temporarily limited. Please try again later."

Monitoring and Observability

You can't fix what you can't see. Comprehensive monitoring is essential.

Request Metrics

Request Metrics Trackingpython
from dataclasses import dataclass, asdict
import json

@dataclass
class RequestMetrics:
    request_id: str
    timestamp: datetime
    provider: str
    model: str
    latency_ms: float
    input_tokens: int
    output_tokens: int
    cost_usd: float
    success: bool
    error: str = None

class MetricsCollector:
    def __init__(self):
        self.metrics = []

    def record_request(self, metrics: RequestMetrics):
        """Record request metrics."""
        self.metrics.append(metrics)

        # Send to monitoring service (DataDog, CloudWatch, etc.)
        self._send_to_datadog(metrics)

        # Alert on anomalies
        self._check_for_anomalies(metrics)

    def _send_to_datadog(self, metrics):
        """Send metrics to DataDog."""
        # Use datadog library
        from datadog import statsd

        statsd.increment('llm.requests',
            tags=[f"provider:{metrics.provider}", f"model:{metrics.model}"])

        statsd.histogram('llm.latency', metrics.latency_ms,
            tags=[f"provider:{metrics.provider}"])

        statsd.gauge('llm.cost', metrics.cost_usd,
            tags=[f"model:{metrics.model}"])

        if not metrics.success:
            statsd.increment('llm.errors',
                tags=[f"provider:{metrics.provider}", f"error:{metrics.error}"])

    def _check_for_anomalies(self, metrics):
        """Check for anomalous behavior."""
        # High latency
        if metrics.latency_ms > 10000:  # 10 seconds
            alert("High latency detected", {"latency": metrics.latency_ms})

        # Unexpected cost
        expected_cost = (metrics.input_tokens + metrics.output_tokens) / 1_000_000 * 0.60  # rough estimate
        if metrics.cost_usd > expected_cost * 10:
            alert("Unexpected high cost", {"cost": metrics.cost_usd, "expected": expected_cost})

# Usage with request tracking
def tracked_llm_call(prompt, model="gpt-4o-mini"):
    """LLM call with full metrics tracking."""
    request_id = str(uuid.uuid4())
    start_time = time.time()

    try:
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )

        latency = (time.time() - start_time) * 1000

        metrics = RequestMetrics(
            request_id=request_id,
            timestamp=datetime.now(),
            provider="openai",
            model=model,
            latency_ms=latency,
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens,
            cost_usd=calculate_cost(model, response.usage),
            success=True
        )

        metrics_collector.record_request(metrics)

        return response.choices[0].message.content

    except Exception as e:
        latency = (time.time() - start_time) * 1000

        metrics = RequestMetrics(
            request_id=request_id,
            timestamp=datetime.now(),
            provider="openai",
            model=model,
            latency_ms=latency,
            input_tokens=0,
            output_tokens=0,
            cost_usd=0,
            success=False,
            error=str(e)
        )

        metrics_collector.record_request(metrics)
        raise

Health Checks

Provider Health Checkspython
class HealthChecker:
    """Check health of LLM integrations."""

    def check_all_providers(self):
        """Check health of all providers."""
        results = {}

        for provider in ["openai", "anthropic", "google"]:
            results[provider] = self.check_provider(provider)

        return results

    def check_provider(self, provider):
        """Check if provider is healthy."""
        start_time = time.time()

        try:
            # Make simple test request
            if provider == "openai":
                response = openai_client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": "test"}],
                    max_tokens=5,
                    timeout=5
                )

            latency = (time.time() - start_time) * 1000

            return {
                "healthy": True,
                "latency_ms": latency,
                "error": None
            }

        except Exception as e:
            return {
                "healthy": False,
                "latency_ms": None,
                "error": str(e)
            }

# Run health checks every minute
def run_health_checks():
    checker = HealthChecker()
    while True:
        results = checker.check_all_providers()

        for provider, health in results.items():
            if not health["healthy"]:
                alert(f"Provider {provider} is unhealthy", health)

        time.sleep(60)

# Run in background thread
threading.Thread(target=run_health_checks, daemon=True).start()

Conclusion

Building reliable LLM API integrations requires more than just calling an endpoint. Production systems need comprehensive error handling with exponential backoff, proactive rate limiting to avoid hitting quotas, intelligent caching to reduce costs and latency, multi-provider fallbacks for resilience, budget enforcement to prevent cost overruns, and continuous monitoring to detect issues early.

The patterns in this guide—circuit breakers, semantic caching, graceful degradation, cost tracking, and health checks—transform fragile prototypes into robust production systems. Start with the basics (retry logic, rate limiting, simple caching) and incrementally add sophistication (semantic caching, multi-provider fallbacks, predictive budgeting) as your application scales.

Remember that API integration is not "set and forget." Monitor continuously, alert on anomalies, and be prepared to adjust strategies as your usage patterns evolve, providers change their limits, and costs fluctuate. The investment in robust integration patterns pays dividends in reliability, cost savings, and peace of mind.

Frequently Asked Questions

What is the most important API integration pattern to implement first?

How much can caching really save on API costs?

Should I implement multi-provider fallback or stick with one provider?

How do I prevent API cost overruns?

What monitoring metrics are most important for LLM APIs?

How do I handle rate limits from multiple concurrent requests?

Should I cache API responses in Redis or a database?

How do I test API integration code before production?

What should I do when all providers fail simultaneously?

How often do I need to update API integration code?

Ready to Implement?

This guide provides the knowledge, but implementation requires expertise. Our team has done this 500+ times and can get you production-ready in weeks.

✓ FT Fast 500 APAC Winner✓ 500+ Implementations✓ Results in Weeks
AI Implementation Guide - Learn AI Automation | Clever Ops