LearnTechnical Deep DivesMulti-Agent Systems Architecture: Building Coordinated AI
advanced
18 min read
25 January 2025

Multi-Agent Systems Architecture: Building Coordinated AI

Deep dive into multi-agent system architecture for AI applications. Learn communication protocols, orchestration patterns, and implementation strategies with production-ready code examples.

Clever Ops Team

Single-agent AI systems hit their limits quickly when facing complex, multi-step tasks. Multi-agent systems solve this by coordinating multiple specialised agents—each with distinct capabilities—to tackle problems no single agent could handle alone. From customer service platforms routing queries to specialists, to autonomous research systems that plan, execute, and synthesise, multi-agent architectures are becoming the foundation of sophisticated AI applications.

This guide covers the architecture, patterns, and implementation details you need to build production multi-agent systems. We'll explore communication protocols, orchestration strategies, state management, and error handling—with working code examples in Python and TypeScript that you can adapt for your own systems.

Key Takeaways

  • Use multi-agent systems when you need specialisation, parallel processing, or fault isolation
  • The Supervisor pattern is the most common and easiest to implement—start there
  • Message-based communication decouples agents and enables independent scaling
  • Event sourcing provides complete audit trails and enables state recovery
  • Implement retry logic, circuit breakers, and fallbacks for production reliability
  • Cache LLM responses and execute independent tasks in parallel for performance
  • Monitor task latency, throughput, and error rates to maintain system health

Why Multi-Agent Architecture?

Before diving into implementation, let's understand when multi-agent systems provide value over single-agent approaches:

Single Agent Limitations

  • • Context window constraints on complex tasks
  • • No specialisation—jack of all trades
  • • Difficult to maintain and debug
  • • Single point of failure
  • • Hard to scale specific capabilities

Multi-Agent Advantages

  • • Specialised agents for specific tasks
  • • Parallel processing of subtasks
  • • Modular, testable components
  • • Graceful degradation on failures
  • • Independent scaling per capability

When to Use Multi-Agent Systems

Multi-agent architecture is appropriate when your application has:

  • Diverse Task Types: Different subtasks benefit from different prompts, models, or tools
  • Complex Workflows: Tasks that require planning, execution, review, and iteration
  • Quality Requirements: Separate reviewer agents can catch errors specialist agents miss
  • Scale Requirements: High-volume systems where different capabilities need independent scaling
  • Tool Diversity: Different subtasks require access to different external tools or APIs

Complexity Trade-off

Multi-agent systems add architectural complexity. For simple tasks, a well-prompted single agent often outperforms a poorly-designed multi-agent system. Start with the simplest architecture that meets your requirements, then refactor to multi-agent when you hit clear limitations.

Core Architecture Patterns

Three primary patterns dominate multi-agent system design. Your choice depends on task structure, coordination requirements, and failure tolerance needs.

1. Supervisor Pattern

A central supervisor agent coordinates worker agents, delegating tasks and aggregating results. This is the most common pattern for its simplicity and control.

Supervisor Pattern Implementationpython
1from typing import List, Dict, Any
2from dataclasses import dataclass
3from enum import Enum
4
5class AgentRole(Enum):
6    SUPERVISOR = "supervisor"
7    RESEARCHER = "researcher"
8    WRITER = "writer"
9    REVIEWER = "reviewer"
10
11@dataclass
12class AgentMessage:
13    from_agent: str
14    to_agent: str
15    content: str
16    metadata: Dict[str, Any] = None
17
18class SupervisorAgent:
19    def __init__(self, worker_agents: Dict[str, 'BaseAgent']):
20        self.workers = worker_agents
21        self.task_history = []
22
23    async def process_task(self, task: str) -> str:
24        # 1. Analyze task and create execution plan
25        plan = await self._create_plan(task)
26
27        # 2. Execute plan steps with appropriate workers
28        results = []
29        for step in plan.steps:
30            worker = self.workers[step.agent_role]
31            result = await worker.execute(step.instruction)
32            results.append(result)
33
34            # Check if we need to adjust plan based on result
35            if result.requires_replanning:
36                plan = await self._replan(plan, results)
37
38        # 3. Synthesize final response
39        return await self._synthesize(task, results)
40
41    async def _create_plan(self, task: str) -> 'ExecutionPlan':
42        # Use LLM to decompose task into steps
43        planning_prompt = f"""
44        Analyze this task and create an execution plan:
45        Task: {task}
46
47        Available agents: {list(self.workers.keys())}
48
49        Return a structured plan with steps.
50        """
51        # ... LLM call to create plan
52        pass

2. Hierarchical Pattern

Multiple layers of supervisors create a tree structure. Useful for complex domains where subtasks themselves need coordination.

Hierarchical Architecture

                    [Executive Agent]
                          │
            ┌─────────────┼─────────────┐
            │             │             │
    [Research Lead]  [Content Lead]  [QA Lead]
         │                │              │
    ┌────┴────┐      ┌────┴────┐    ┌───┴───┐
    │         │      │         │    │       │
[Web]  [Database] [Writer] [Editor] [Fact] [Style]
            

3. Peer-to-Peer Pattern

Agents communicate directly without central coordination. Best for collaborative tasks where agents build on each other's work.

Peer-to-Peer Agent Communicationtypescript
1interface AgentMessage {
2  fromAgent: string;
3  toAgent: string | 'broadcast';
4  messageType: 'request' | 'response' | 'notification';
5  content: string;
6  correlationId: string;
7}
8
9class PeerAgent {
10  private messageQueue: AgentMessage[] = [];
11  private peers: Map<string, PeerAgent> = new Map();
12
13  constructor(
14    public readonly id: string,
15    public readonly capabilities: string[]
16  ) {}
17
18  async sendMessage(to: string, content: string): Promise<void> {
19    const message: AgentMessage = {
20      fromAgent: this.id,
21      toAgent: to,
22      messageType: 'request',
23      content,
24      correlationId: crypto.randomUUID()
25    };
26
27    if (to === 'broadcast') {
28      // Send to all peers
29      for (const peer of this.peers.values()) {
30        await peer.receiveMessage(message);
31      }
32    } else {
33      const peer = this.peers.get(to);
34      if (peer) await peer.receiveMessage(message);
35    }
36  }
37
38  async receiveMessage(message: AgentMessage): Promise<void> {
39    this.messageQueue.push(message);
40    await this.processMessage(message);
41  }
42
43  private async processMessage(message: AgentMessage): Promise<void> {
44    // Agent-specific processing logic
45    const response = await this.generateResponse(message);
46    if (response) {
47      await this.sendMessage(message.fromAgent, response);
48    }
49  }
50}

Agent Communication Protocols

Reliable communication between agents is fundamental to system stability. Here are the key patterns and their implementations.

Message Passing Architecture

Asynchronous message passing decouples agents, allowing independent scaling and failure isolation:

Message Queue Implementationpython
1import asyncio
2from typing import Callable, Dict, List
3from dataclasses import dataclass, field
4from datetime import datetime
5
6@dataclass
7class Message:
8    id: str
9    sender: str
10    recipient: str
11    payload: Dict
12    timestamp: datetime = field(default_factory=datetime.now)
13    ttl_seconds: int = 300  # Message expires after 5 minutes
14
15class MessageBroker:
16    def __init__(self):
17        self.queues: Dict[str, asyncio.Queue] = {}
18        self.subscribers: Dict[str, List[Callable]] = {}
19        self.dead_letter_queue: asyncio.Queue = asyncio.Queue()
20
21    def register_agent(self, agent_id: str):
22        """Create a message queue for an agent."""
23        self.queues[agent_id] = asyncio.Queue()
24        self.subscribers[agent_id] = []
25
26    async def publish(self, message: Message):
27        """Send message to recipient's queue."""
28        if message.recipient in self.queues:
29            await self.queues[message.recipient].put(message)
30            # Notify subscribers
31            for callback in self.subscribers.get(message.recipient, []):
32                asyncio.create_task(callback(message))
33        else:
34            # Recipient not found - dead letter
35            await self.dead_letter_queue.put(message)
36
37    async def consume(self, agent_id: str, timeout: float = 30) -> Message:
38        """Consume next message from agent's queue."""
39        try:
40            message = await asyncio.wait_for(
41                self.queues[agent_id].get(),
42                timeout=timeout
43            )
44            # Check TTL
45            if self._is_expired(message):
46                await self.dead_letter_queue.put(message)
47                return await self.consume(agent_id, timeout)
48            return message
49        except asyncio.TimeoutError:
50            return None
51
52    def _is_expired(self, message: Message) -> bool:
53        age = (datetime.now() - message.timestamp).seconds
54        return age > message.ttl_seconds

Request-Response Pattern

For synchronous interactions where an agent needs a response before proceeding:

Request-Response with Correlationtypescript
1class RequestResponseHandler {
2  private pendingRequests: Map<string, {
3    resolve: (value: any) => void;
4    reject: (error: Error) => void;
5    timeout: NodeJS.Timeout;
6  }> = new Map();
7
8  async sendRequest(
9    broker: MessageBroker,
10    to: string,
11    payload: any,
12    timeoutMs: number = 30000
13  ): Promise<any> {
14    const correlationId = crypto.randomUUID();
15
16    return new Promise((resolve, reject) => {
17      // Set up timeout
18      const timeout = setTimeout(() => {
19        this.pendingRequests.delete(correlationId);
20        reject(new Error(`Request to ${to} timed out`));
21      }, timeoutMs);
22
23      // Store pending request
24      this.pendingRequests.set(correlationId, { resolve, reject, timeout });
25
26      // Send request
27      broker.publish({
28        id: correlationId,
29        type: 'request',
30        to,
31        payload,
32        replyTo: this.agentId,
33        correlationId
34      });
35    });
36  }
37
38  handleResponse(message: Message): void {
39    const pending = this.pendingRequests.get(message.correlationId);
40    if (pending) {
41      clearTimeout(pending.timeout);
42      this.pendingRequests.delete(message.correlationId);
43
44      if (message.error) {
45        pending.reject(new Error(message.error));
46      } else {
47        pending.resolve(message.payload);
48      }
49    }
50  }
51}

Event-Driven Communication

Publish-subscribe patterns enable loose coupling and reactive architectures:

Common Event Types

  • task.created: New task available for processing
  • task.completed: Agent finished processing a task
  • task.failed: Agent encountered an error
  • agent.available: Agent ready for new work
  • context.updated: Shared context has changed

State Management Strategies

Multi-agent systems need careful state management to maintain consistency and enable recovery. Here are proven approaches:

Shared State Store

A central state store provides consistency but requires careful concurrency handling:

Thread-Safe Shared Statepython
1import asyncio
2from typing import Any, Dict, Optional
3from dataclasses import dataclass, field
4from datetime import datetime
5
6@dataclass
7class StateEntry:
8    value: Any
9    version: int
10    updated_by: str
11    updated_at: datetime
12
13class SharedStateStore:
14    def __init__(self):
15        self._state: Dict[str, StateEntry] = {}
16        self._lock = asyncio.Lock()
17        self._watchers: Dict[str, List[Callable]] = {}
18
19    async def get(self, key: str) -> Optional[Any]:
20        async with self._lock:
21            entry = self._state.get(key)
22            return entry.value if entry else None
23
24    async def set(
25        self,
26        key: str,
27        value: Any,
28        agent_id: str,
29        expected_version: int = None
30    ) -> bool:
31        """
32        Set value with optimistic locking.
33        Returns False if version mismatch (concurrent modification).
34        """
35        async with self._lock:
36            current = self._state.get(key)
37
38            # Version check for optimistic locking
39            if expected_version is not None:
40                if current and current.version != expected_version:
41                    return False  # Concurrent modification detected
42
43            new_version = (current.version + 1) if current else 1
44            self._state[key] = StateEntry(
45                value=value,
46                version=new_version,
47                updated_by=agent_id,
48                updated_at=datetime.now()
49            )
50
51            # Notify watchers
52            await self._notify_watchers(key, value)
53            return True
54
55    async def watch(self, key: str, callback: Callable):
56        """Register callback for state changes."""
57        if key not in self._watchers:
58            self._watchers[key] = []
59        self._watchers[key].append(callback)
60
61    async def _notify_watchers(self, key: str, value: Any):
62        for callback in self._watchers.get(key, []):
63            asyncio.create_task(callback(key, value))

Event Sourcing for Audit Trails

Recording all state changes as events provides full auditability and enables replay:

Event Sourcing Implementationtypescript
1interface StateEvent {
2  id: string;
3  timestamp: Date;
4  agentId: string;
5  eventType: string;
6  payload: any;
7  previousState?: any;
8}
9
10class EventSourcedState {
11  private events: StateEvent[] = [];
12  private currentState: Map<string, any> = new Map();
13  private snapshots: Map<number, Map<string, any>> = new Map();
14
15  appendEvent(event: Omit<StateEvent, 'id' | 'timestamp'>): void {
16    const fullEvent: StateEvent = {
17      ...event,
18      id: crypto.randomUUID(),
19      timestamp: new Date(),
20      previousState: this.currentState.get(event.payload.key)
21    };
22
23    this.events.push(fullEvent);
24    this.applyEvent(fullEvent);
25
26    // Create snapshot every 100 events
27    if (this.events.length % 100 === 0) {
28      this.snapshots.set(
29        this.events.length,
30        new Map(this.currentState)
31      );
32    }
33  }
34
35  private applyEvent(event: StateEvent): void {
36    switch (event.eventType) {
37      case 'SET':
38        this.currentState.set(event.payload.key, event.payload.value);
39        break;
40      case 'DELETE':
41        this.currentState.delete(event.payload.key);
42        break;
43    }
44  }
45
46  rebuildState(toEventIndex?: number): Map<string, any> {
47    // Find nearest snapshot
48    const targetIndex = toEventIndex ?? this.events.length;
49    let snapshotIndex = 0;
50    let state = new Map<string, any>();
51
52    for (const [idx, snapshot] of this.snapshots) {
53      if (idx <= targetIndex) {
54        snapshotIndex = idx;
55        state = new Map(snapshot);
56      }
57    }
58
59    // Apply events from snapshot to target
60    for (let i = snapshotIndex; i < targetIndex; i++) {
61      this.applyEventToState(this.events[i], state);
62    }
63
64    return state;
65  }
66
67  getAuditTrail(key: string): StateEvent[] {
68    return this.events.filter(e => e.payload.key === key);
69  }
70}

State Management Best Practices

  • Minimize shared state: Prefer message passing over shared memory
  • Use immutable updates: Create new state objects rather than mutating
  • Version everything: Enable conflict detection and resolution
  • Plan for recovery: Persist state to enable system restart
  • Scope carefully: Not all agents need access to all state

Orchestration Implementation

Let's build a complete orchestration system that ties together our patterns. This example implements a research assistant with multiple specialised agents.

Complete Multi-Agent Orchestratorpython
1from typing import List, Dict, Any, Optional
2from dataclasses import dataclass
3from enum import Enum
4import asyncio
5from abc import ABC, abstractmethod
6
7class TaskStatus(Enum):
8    PENDING = "pending"
9    IN_PROGRESS = "in_progress"
10    COMPLETED = "completed"
11    FAILED = "failed"
12
13@dataclass
14class Task:
15    id: str
16    description: str
17    assigned_agent: Optional[str] = None
18    status: TaskStatus = TaskStatus.PENDING
19    result: Optional[Any] = None
20    error: Optional[str] = None
21    dependencies: List[str] = None  # Task IDs this depends on
22
23class BaseAgent(ABC):
24    def __init__(self, agent_id: str, llm_client):
25        self.id = agent_id
26        self.llm = llm_client
27
28    @abstractmethod
29    async def execute(self, task: Task, context: Dict) -> Any:
30        pass
31
32    @abstractmethod
33    def can_handle(self, task: Task) -> bool:
34        pass
35
36class ResearchAgent(BaseAgent):
37    async def execute(self, task: Task, context: Dict) -> Any:
38        prompt = f"""
39        Research task: {task.description}
40
41        Previous context: {context.get('research_notes', 'None')}
42
43        Provide comprehensive research findings.
44        """
45        return await self.llm.complete(prompt)
46
47    def can_handle(self, task: Task) -> bool:
48        return 'research' in task.description.lower()
49
50class WriterAgent(BaseAgent):
51    async def execute(self, task: Task, context: Dict) -> Any:
52        prompt = f"""
53        Writing task: {task.description}
54
55        Research to incorporate: {context.get('research_findings', '')}
56
57        Write clear, well-structured content.
58        """
59        return await self.llm.complete(prompt)
60
61    def can_handle(self, task: Task) -> bool:
62        return 'write' in task.description.lower()
63
64class Orchestrator:
65    def __init__(self, agents: List[BaseAgent]):
66        self.agents = {agent.id: agent for agent in agents}
67        self.task_queue: asyncio.Queue = asyncio.Queue()
68        self.completed_tasks: Dict[str, Task] = {}
69        self.context: Dict[str, Any] = {}
70        self.state_store = SharedStateStore()
71
72    async def submit_workflow(self, tasks: List[Task]) -> Dict[str, Any]:
73        """Execute a workflow of interdependent tasks."""
74        # Build dependency graph
75        task_map = {t.id: t for t in tasks}
76
77        # Process tasks respecting dependencies
78        while not all(t.status == TaskStatus.COMPLETED for t in tasks):
79            ready_tasks = [
80                t for t in tasks
81                if t.status == TaskStatus.PENDING
82                and self._dependencies_met(t, task_map)
83            ]
84
85            if not ready_tasks:
86                # Check for deadlock
87                pending = [t for t in tasks if t.status == TaskStatus.PENDING]
88                if pending:
89                    raise RuntimeError("Workflow deadlock detected")
90                break
91
92            # Execute ready tasks in parallel
93            results = await asyncio.gather(
94                *[self._execute_task(t) for t in ready_tasks],
95                return_exceptions=True
96            )
97
98            # Process results
99            for task, result in zip(ready_tasks, results):
100                if isinstance(result, Exception):
101                    task.status = TaskStatus.FAILED
102                    task.error = str(result)
103                else:
104                    task.status = TaskStatus.COMPLETED
105                    task.result = result
106                    self._update_context(task, result)
107
108        return self.context
109
110    async def _execute_task(self, task: Task) -> Any:
111        # Find capable agent
112        agent = self._select_agent(task)
113        if not agent:
114            raise ValueError(f"No agent available for task: {task.description}")
115
116        task.assigned_agent = agent.id
117        task.status = TaskStatus.IN_PROGRESS
118
119        return await agent.execute(task, self.context)
120
121    def _select_agent(self, task: Task) -> Optional[BaseAgent]:
122        for agent in self.agents.values():
123            if agent.can_handle(task):
124                return agent
125        return None
126
127    def _dependencies_met(self, task: Task, task_map: Dict) -> bool:
128        if not task.dependencies:
129            return True
130        return all(
131            task_map[dep_id].status == TaskStatus.COMPLETED
132            for dep_id in task.dependencies
133        )
134
135    def _update_context(self, task: Task, result: Any):
136        # Store results in context for dependent tasks
137        self.context[f"{task.id}_result"] = result

Usage Example

Running the Orchestratorpython
1async def main():
2    # Initialize agents
3    llm_client = OpenAIClient()  # Your LLM client
4    agents = [
5        ResearchAgent("researcher", llm_client),
6        WriterAgent("writer", llm_client),
7    ]
8
9    orchestrator = Orchestrator(agents)
10
11    # Define workflow
12    tasks = [
13        Task(
14            id="research_topic",
15            description="Research the benefits of multi-agent AI systems"
16        ),
17        Task(
18            id="write_intro",
19            description="Write an introduction based on research",
20            dependencies=["research_topic"]
21        ),
22        Task(
23            id="write_conclusion",
24            description="Write a conclusion summarizing key points",
25            dependencies=["research_topic", "write_intro"]
26        ),
27    ]
28
29    # Execute workflow
30    results = await orchestrator.submit_workflow(tasks)
31    print(results)
32
33asyncio.run(main())

📚 Want to learn more?

Error Handling & Recovery

Production multi-agent systems must handle failures gracefully. Here are essential patterns:

Retry with Exponential Backoff

Resilient Task Executionpython
1import asyncio
2from functools import wraps
3from typing import TypeVar, Callable
4
5T = TypeVar('T')
6
7def with_retry(
8    max_attempts: int = 3,
9    base_delay: float = 1.0,
10    max_delay: float = 60.0,
11    exponential_base: float = 2.0,
12    retryable_exceptions: tuple = (Exception,)
13):
14    def decorator(func: Callable[..., T]) -> Callable[..., T]:
15        @wraps(func)
16        async def wrapper(*args, **kwargs) -> T:
17            last_exception = None
18
19            for attempt in range(max_attempts):
20                try:
21                    return await func(*args, **kwargs)
22                except retryable_exceptions as e:
23                    last_exception = e
24
25                    if attempt < max_attempts - 1:
26                        delay = min(
27                            base_delay * (exponential_base ** attempt),
28                            max_delay
29                        )
30                        # Add jitter to prevent thundering herd
31                        delay *= (0.5 + random.random())
32
33                        logging.warning(
34                            f"Attempt {attempt + 1} failed: {e}. "
35                            f"Retrying in {delay:.2f}s"
36                        )
37                        await asyncio.sleep(delay)
38
39            raise last_exception
40
41        return wrapper
42    return decorator
43
44class ResilientAgent(BaseAgent):
45    @with_retry(max_attempts=3, retryable_exceptions=(TimeoutError, APIError))
46    async def execute(self, task: Task, context: Dict) -> Any:
47        return await self._do_execute(task, context)

Circuit Breaker Pattern

Prevent cascade failures by temporarily disabling failing agents:

Circuit Breaker Implementationtypescript
1enum CircuitState {
2  CLOSED = 'closed',      // Normal operation
3  OPEN = 'open',          // Failing, reject requests
4  HALF_OPEN = 'half_open' // Testing if recovered
5}
6
7class CircuitBreaker {
8  private state: CircuitState = CircuitState.CLOSED;
9  private failureCount: number = 0;
10  private lastFailureTime: Date | null = null;
11  private successCount: number = 0;
12
13  constructor(
14    private readonly failureThreshold: number = 5,
15    private readonly resetTimeoutMs: number = 30000,
16    private readonly halfOpenSuccessThreshold: number = 3
17  ) {}
18
19  async execute<T>(fn: () => Promise<T>): Promise<T> {
20    if (this.state === CircuitState.OPEN) {
21      if (this.shouldAttemptReset()) {
22        this.state = CircuitState.HALF_OPEN;
23      } else {
24        throw new Error('Circuit breaker is OPEN');
25      }
26    }
27
28    try {
29      const result = await fn();
30      this.onSuccess();
31      return result;
32    } catch (error) {
33      this.onFailure();
34      throw error;
35    }
36  }
37
38  private onSuccess(): void {
39    if (this.state === CircuitState.HALF_OPEN) {
40      this.successCount++;
41      if (this.successCount >= this.halfOpenSuccessThreshold) {
42        this.reset();
43      }
44    } else {
45      this.failureCount = 0;
46    }
47  }
48
49  private onFailure(): void {
50    this.failureCount++;
51    this.lastFailureTime = new Date();
52    this.successCount = 0;
53
54    if (this.failureCount >= this.failureThreshold) {
55      this.state = CircuitState.OPEN;
56    }
57  }
58
59  private shouldAttemptReset(): boolean {
60    if (!this.lastFailureTime) return true;
61    const elapsed = Date.now() - this.lastFailureTime.getTime();
62    return elapsed >= this.resetTimeoutMs;
63  }
64
65  private reset(): void {
66    this.state = CircuitState.CLOSED;
67    this.failureCount = 0;
68    this.successCount = 0;
69  }
70}

Fallback Strategies

When an Agent Fails

  • Retry with different agent: Route to backup agent with similar capabilities
  • Graceful degradation: Return partial results or cached data
  • Human escalation: Flag for human review when automated handling fails
  • Skip and continue: For non-critical tasks, mark as skipped and proceed

Performance Optimisation

Multi-agent systems can be resource-intensive. These optimisations ensure efficient operation at scale.

Parallel Execution

Maximise throughput by executing independent tasks concurrently:

Parallel Task Execution with Limitspython
1import asyncio
2from asyncio import Semaphore
3
4class ParallelExecutor:
5    def __init__(self, max_concurrency: int = 10):
6        self.semaphore = Semaphore(max_concurrency)
7        self.active_tasks: Dict[str, asyncio.Task] = {}
8
9    async def execute_parallel(
10        self,
11        tasks: List[Task],
12        executor: Callable
13    ) -> List[Any]:
14        async def bounded_execute(task: Task) -> Any:
15            async with self.semaphore:
16                self.active_tasks[task.id] = asyncio.current_task()
17                try:
18                    return await executor(task)
19                finally:
20                    del self.active_tasks[task.id]
21
22        return await asyncio.gather(
23            *[bounded_execute(t) for t in tasks],
24            return_exceptions=True
25        )
26
27    async def cancel_all(self):
28        for task in self.active_tasks.values():
29            task.cancel()
30        await asyncio.gather(*self.active_tasks.values(), return_exceptions=True)

Response Caching

Cache LLM responses for repeated queries to reduce latency and cost:

LLM Response Cachetypescript
1import { createHash } from 'crypto';
2
3interface CacheEntry {
4  response: string;
5  timestamp: Date;
6  hitCount: number;
7}
8
9class LLMCache {
10  private cache: Map<string, CacheEntry> = new Map();
11  private readonly maxSize: number;
12  private readonly ttlMs: number;
13
14  constructor(maxSize: number = 1000, ttlMs: number = 3600000) {
15    this.maxSize = maxSize;
16    this.ttlMs = ttlMs;
17  }
18
19  private hashPrompt(prompt: string, model: string): string {
20    return createHash('sha256')
21      .update(`${model}:${prompt}`)
22      .digest('hex');
23  }
24
25  get(prompt: string, model: string): string | null {
26    const key = this.hashPrompt(prompt, model);
27    const entry = this.cache.get(key);
28
29    if (!entry) return null;
30
31    // Check TTL
32    if (Date.now() - entry.timestamp.getTime() > this.ttlMs) {
33      this.cache.delete(key);
34      return null;
35    }
36
37    entry.hitCount++;
38    return entry.response;
39  }
40
41  set(prompt: string, model: string, response: string): void {
42    // Evict if at capacity (LRU-style)
43    if (this.cache.size >= this.maxSize) {
44      this.evictLeastUsed();
45    }
46
47    const key = this.hashPrompt(prompt, model);
48    this.cache.set(key, {
49      response,
50      timestamp: new Date(),
51      hitCount: 1
52    });
53  }
54
55  private evictLeastUsed(): void {
56    let minHits = Infinity;
57    let evictKey: string | null = null;
58
59    for (const [key, entry] of this.cache) {
60      if (entry.hitCount < minHits) {
61        minHits = entry.hitCount;
62        evictKey = key;
63      }
64    }
65
66    if (evictKey) this.cache.delete(evictKey);
67  }
68}

Performance Metrics

Metric Target How to Measure
Task Latency (P95) < 5 seconds Time from task submission to completion
Throughput 100+ tasks/minute Tasks processed per time unit
Cache Hit Rate > 30% Cached responses / total requests
Error Rate < 1% Failed tasks / total tasks

💡 Need expert help with this?

Conclusion

Multi-agent systems represent a significant step forward in AI application architecture. By decomposing complex tasks across specialised agents and coordinating their efforts through well-designed communication and orchestration patterns, you can build systems that handle complexity no single agent could manage.

The patterns we've covered—supervisor hierarchies, message-based communication, event sourcing, and resilient error handling—form the foundation of production-grade multi-agent systems. Start simple with a supervisor pattern, add complexity only as requirements demand, and always prioritise observability and error handling.

Remember that multi-agent systems are a means to an end, not the end itself. The goal is solving complex problems reliably and efficiently. Sometimes the right answer is a well-designed single agent. When you do need multiple agents, the patterns in this guide will help you build systems that are maintainable, scalable, and robust.

Frequently Asked Questions

When should I use multi-agent systems vs a single agent?

How do I prevent agents from entering infinite loops?

What's the best way to share context between agents?

How do I test multi-agent systems?

How do I handle rate limits with multiple agents calling LLMs?

What frameworks exist for building multi-agent systems?

How do I debug multi-agent systems?

How do I handle different agents needing different LLMs?

Ready to Implement?

This guide provides the knowledge, but implementation requires expertise. Our team has done this 500+ times and can get you production-ready in weeks.

✓ FT Fast 500 APAC Winner✓ 500+ Implementations✓ Results in Weeks
AI Implementation Guide - Learn AI Automation | Clever Ops