Deep dive into multi-agent system architecture for AI applications. Learn communication protocols, orchestration patterns, and implementation strategies with production-ready code examples.
Single-agent AI systems hit their limits quickly when facing complex, multi-step tasks. Multi-agent systems solve this by coordinating multiple specialised agents—each with distinct capabilities—to tackle problems no single agent could handle alone. From customer service platforms routing queries to specialists, to autonomous research systems that plan, execute, and synthesise, multi-agent architectures are becoming the foundation of sophisticated AI applications.
This guide covers the architecture, patterns, and implementation details you need to build production multi-agent systems. We'll explore communication protocols, orchestration strategies, state management, and error handling—with working code examples in Python and TypeScript that you can adapt for your own systems.
Before diving into implementation, let's understand when multi-agent systems provide value over single-agent approaches:
Multi-agent architecture is appropriate when your application has:
Multi-agent systems add architectural complexity. For simple tasks, a well-prompted single agent often outperforms a poorly-designed multi-agent system. Start with the simplest architecture that meets your requirements, then refactor to multi-agent when you hit clear limitations.
Three primary patterns dominate multi-agent system design. Your choice depends on task structure, coordination requirements, and failure tolerance needs.
A central supervisor agent coordinates worker agents, delegating tasks and aggregating results. This is the most common pattern for its simplicity and control.
1from typing import List, Dict, Any
2from dataclasses import dataclass
3from enum import Enum
4
5class AgentRole(Enum):
6 SUPERVISOR = "supervisor"
7 RESEARCHER = "researcher"
8 WRITER = "writer"
9 REVIEWER = "reviewer"
10
11@dataclass
12class AgentMessage:
13 from_agent: str
14 to_agent: str
15 content: str
16 metadata: Dict[str, Any] = None
17
18class SupervisorAgent:
19 def __init__(self, worker_agents: Dict[str, 'BaseAgent']):
20 self.workers = worker_agents
21 self.task_history = []
22
23 async def process_task(self, task: str) -> str:
24 # 1. Analyze task and create execution plan
25 plan = await self._create_plan(task)
26
27 # 2. Execute plan steps with appropriate workers
28 results = []
29 for step in plan.steps:
30 worker = self.workers[step.agent_role]
31 result = await worker.execute(step.instruction)
32 results.append(result)
33
34 # Check if we need to adjust plan based on result
35 if result.requires_replanning:
36 plan = await self._replan(plan, results)
37
38 # 3. Synthesize final response
39 return await self._synthesize(task, results)
40
41 async def _create_plan(self, task: str) -> 'ExecutionPlan':
42 # Use LLM to decompose task into steps
43 planning_prompt = f"""
44 Analyze this task and create an execution plan:
45 Task: {task}
46
47 Available agents: {list(self.workers.keys())}
48
49 Return a structured plan with steps.
50 """
51 # ... LLM call to create plan
52 passMultiple layers of supervisors create a tree structure. Useful for complex domains where subtasks themselves need coordination.
[Executive Agent]
│
┌─────────────┼─────────────┐
│ │ │
[Research Lead] [Content Lead] [QA Lead]
│ │ │
┌────┴────┐ ┌────┴────┐ ┌───┴───┐
│ │ │ │ │ │
[Web] [Database] [Writer] [Editor] [Fact] [Style]
Agents communicate directly without central coordination. Best for collaborative tasks where agents build on each other's work.
1interface AgentMessage {
2 fromAgent: string;
3 toAgent: string | 'broadcast';
4 messageType: 'request' | 'response' | 'notification';
5 content: string;
6 correlationId: string;
7}
8
9class PeerAgent {
10 private messageQueue: AgentMessage[] = [];
11 private peers: Map<string, PeerAgent> = new Map();
12
13 constructor(
14 public readonly id: string,
15 public readonly capabilities: string[]
16 ) {}
17
18 async sendMessage(to: string, content: string): Promise<void> {
19 const message: AgentMessage = {
20 fromAgent: this.id,
21 toAgent: to,
22 messageType: 'request',
23 content,
24 correlationId: crypto.randomUUID()
25 };
26
27 if (to === 'broadcast') {
28 // Send to all peers
29 for (const peer of this.peers.values()) {
30 await peer.receiveMessage(message);
31 }
32 } else {
33 const peer = this.peers.get(to);
34 if (peer) await peer.receiveMessage(message);
35 }
36 }
37
38 async receiveMessage(message: AgentMessage): Promise<void> {
39 this.messageQueue.push(message);
40 await this.processMessage(message);
41 }
42
43 private async processMessage(message: AgentMessage): Promise<void> {
44 // Agent-specific processing logic
45 const response = await this.generateResponse(message);
46 if (response) {
47 await this.sendMessage(message.fromAgent, response);
48 }
49 }
50}Reliable communication between agents is fundamental to system stability. Here are the key patterns and their implementations.
Asynchronous message passing decouples agents, allowing independent scaling and failure isolation:
1import asyncio
2from typing import Callable, Dict, List
3from dataclasses import dataclass, field
4from datetime import datetime
5
6@dataclass
7class Message:
8 id: str
9 sender: str
10 recipient: str
11 payload: Dict
12 timestamp: datetime = field(default_factory=datetime.now)
13 ttl_seconds: int = 300 # Message expires after 5 minutes
14
15class MessageBroker:
16 def __init__(self):
17 self.queues: Dict[str, asyncio.Queue] = {}
18 self.subscribers: Dict[str, List[Callable]] = {}
19 self.dead_letter_queue: asyncio.Queue = asyncio.Queue()
20
21 def register_agent(self, agent_id: str):
22 """Create a message queue for an agent."""
23 self.queues[agent_id] = asyncio.Queue()
24 self.subscribers[agent_id] = []
25
26 async def publish(self, message: Message):
27 """Send message to recipient's queue."""
28 if message.recipient in self.queues:
29 await self.queues[message.recipient].put(message)
30 # Notify subscribers
31 for callback in self.subscribers.get(message.recipient, []):
32 asyncio.create_task(callback(message))
33 else:
34 # Recipient not found - dead letter
35 await self.dead_letter_queue.put(message)
36
37 async def consume(self, agent_id: str, timeout: float = 30) -> Message:
38 """Consume next message from agent's queue."""
39 try:
40 message = await asyncio.wait_for(
41 self.queues[agent_id].get(),
42 timeout=timeout
43 )
44 # Check TTL
45 if self._is_expired(message):
46 await self.dead_letter_queue.put(message)
47 return await self.consume(agent_id, timeout)
48 return message
49 except asyncio.TimeoutError:
50 return None
51
52 def _is_expired(self, message: Message) -> bool:
53 age = (datetime.now() - message.timestamp).seconds
54 return age > message.ttl_secondsFor synchronous interactions where an agent needs a response before proceeding:
1class RequestResponseHandler {
2 private pendingRequests: Map<string, {
3 resolve: (value: any) => void;
4 reject: (error: Error) => void;
5 timeout: NodeJS.Timeout;
6 }> = new Map();
7
8 async sendRequest(
9 broker: MessageBroker,
10 to: string,
11 payload: any,
12 timeoutMs: number = 30000
13 ): Promise<any> {
14 const correlationId = crypto.randomUUID();
15
16 return new Promise((resolve, reject) => {
17 // Set up timeout
18 const timeout = setTimeout(() => {
19 this.pendingRequests.delete(correlationId);
20 reject(new Error(`Request to ${to} timed out`));
21 }, timeoutMs);
22
23 // Store pending request
24 this.pendingRequests.set(correlationId, { resolve, reject, timeout });
25
26 // Send request
27 broker.publish({
28 id: correlationId,
29 type: 'request',
30 to,
31 payload,
32 replyTo: this.agentId,
33 correlationId
34 });
35 });
36 }
37
38 handleResponse(message: Message): void {
39 const pending = this.pendingRequests.get(message.correlationId);
40 if (pending) {
41 clearTimeout(pending.timeout);
42 this.pendingRequests.delete(message.correlationId);
43
44 if (message.error) {
45 pending.reject(new Error(message.error));
46 } else {
47 pending.resolve(message.payload);
48 }
49 }
50 }
51}Publish-subscribe patterns enable loose coupling and reactive architectures:
Multi-agent systems need careful state management to maintain consistency and enable recovery. Here are proven approaches:
A central state store provides consistency but requires careful concurrency handling:
1import asyncio
2from typing import Any, Dict, Optional
3from dataclasses import dataclass, field
4from datetime import datetime
5
6@dataclass
7class StateEntry:
8 value: Any
9 version: int
10 updated_by: str
11 updated_at: datetime
12
13class SharedStateStore:
14 def __init__(self):
15 self._state: Dict[str, StateEntry] = {}
16 self._lock = asyncio.Lock()
17 self._watchers: Dict[str, List[Callable]] = {}
18
19 async def get(self, key: str) -> Optional[Any]:
20 async with self._lock:
21 entry = self._state.get(key)
22 return entry.value if entry else None
23
24 async def set(
25 self,
26 key: str,
27 value: Any,
28 agent_id: str,
29 expected_version: int = None
30 ) -> bool:
31 """
32 Set value with optimistic locking.
33 Returns False if version mismatch (concurrent modification).
34 """
35 async with self._lock:
36 current = self._state.get(key)
37
38 # Version check for optimistic locking
39 if expected_version is not None:
40 if current and current.version != expected_version:
41 return False # Concurrent modification detected
42
43 new_version = (current.version + 1) if current else 1
44 self._state[key] = StateEntry(
45 value=value,
46 version=new_version,
47 updated_by=agent_id,
48 updated_at=datetime.now()
49 )
50
51 # Notify watchers
52 await self._notify_watchers(key, value)
53 return True
54
55 async def watch(self, key: str, callback: Callable):
56 """Register callback for state changes."""
57 if key not in self._watchers:
58 self._watchers[key] = []
59 self._watchers[key].append(callback)
60
61 async def _notify_watchers(self, key: str, value: Any):
62 for callback in self._watchers.get(key, []):
63 asyncio.create_task(callback(key, value))Recording all state changes as events provides full auditability and enables replay:
1interface StateEvent {
2 id: string;
3 timestamp: Date;
4 agentId: string;
5 eventType: string;
6 payload: any;
7 previousState?: any;
8}
9
10class EventSourcedState {
11 private events: StateEvent[] = [];
12 private currentState: Map<string, any> = new Map();
13 private snapshots: Map<number, Map<string, any>> = new Map();
14
15 appendEvent(event: Omit<StateEvent, 'id' | 'timestamp'>): void {
16 const fullEvent: StateEvent = {
17 ...event,
18 id: crypto.randomUUID(),
19 timestamp: new Date(),
20 previousState: this.currentState.get(event.payload.key)
21 };
22
23 this.events.push(fullEvent);
24 this.applyEvent(fullEvent);
25
26 // Create snapshot every 100 events
27 if (this.events.length % 100 === 0) {
28 this.snapshots.set(
29 this.events.length,
30 new Map(this.currentState)
31 );
32 }
33 }
34
35 private applyEvent(event: StateEvent): void {
36 switch (event.eventType) {
37 case 'SET':
38 this.currentState.set(event.payload.key, event.payload.value);
39 break;
40 case 'DELETE':
41 this.currentState.delete(event.payload.key);
42 break;
43 }
44 }
45
46 rebuildState(toEventIndex?: number): Map<string, any> {
47 // Find nearest snapshot
48 const targetIndex = toEventIndex ?? this.events.length;
49 let snapshotIndex = 0;
50 let state = new Map<string, any>();
51
52 for (const [idx, snapshot] of this.snapshots) {
53 if (idx <= targetIndex) {
54 snapshotIndex = idx;
55 state = new Map(snapshot);
56 }
57 }
58
59 // Apply events from snapshot to target
60 for (let i = snapshotIndex; i < targetIndex; i++) {
61 this.applyEventToState(this.events[i], state);
62 }
63
64 return state;
65 }
66
67 getAuditTrail(key: string): StateEvent[] {
68 return this.events.filter(e => e.payload.key === key);
69 }
70}Let's build a complete orchestration system that ties together our patterns. This example implements a research assistant with multiple specialised agents.
1from typing import List, Dict, Any, Optional
2from dataclasses import dataclass
3from enum import Enum
4import asyncio
5from abc import ABC, abstractmethod
6
7class TaskStatus(Enum):
8 PENDING = "pending"
9 IN_PROGRESS = "in_progress"
10 COMPLETED = "completed"
11 FAILED = "failed"
12
13@dataclass
14class Task:
15 id: str
16 description: str
17 assigned_agent: Optional[str] = None
18 status: TaskStatus = TaskStatus.PENDING
19 result: Optional[Any] = None
20 error: Optional[str] = None
21 dependencies: List[str] = None # Task IDs this depends on
22
23class BaseAgent(ABC):
24 def __init__(self, agent_id: str, llm_client):
25 self.id = agent_id
26 self.llm = llm_client
27
28 @abstractmethod
29 async def execute(self, task: Task, context: Dict) -> Any:
30 pass
31
32 @abstractmethod
33 def can_handle(self, task: Task) -> bool:
34 pass
35
36class ResearchAgent(BaseAgent):
37 async def execute(self, task: Task, context: Dict) -> Any:
38 prompt = f"""
39 Research task: {task.description}
40
41 Previous context: {context.get('research_notes', 'None')}
42
43 Provide comprehensive research findings.
44 """
45 return await self.llm.complete(prompt)
46
47 def can_handle(self, task: Task) -> bool:
48 return 'research' in task.description.lower()
49
50class WriterAgent(BaseAgent):
51 async def execute(self, task: Task, context: Dict) -> Any:
52 prompt = f"""
53 Writing task: {task.description}
54
55 Research to incorporate: {context.get('research_findings', '')}
56
57 Write clear, well-structured content.
58 """
59 return await self.llm.complete(prompt)
60
61 def can_handle(self, task: Task) -> bool:
62 return 'write' in task.description.lower()
63
64class Orchestrator:
65 def __init__(self, agents: List[BaseAgent]):
66 self.agents = {agent.id: agent for agent in agents}
67 self.task_queue: asyncio.Queue = asyncio.Queue()
68 self.completed_tasks: Dict[str, Task] = {}
69 self.context: Dict[str, Any] = {}
70 self.state_store = SharedStateStore()
71
72 async def submit_workflow(self, tasks: List[Task]) -> Dict[str, Any]:
73 """Execute a workflow of interdependent tasks."""
74 # Build dependency graph
75 task_map = {t.id: t for t in tasks}
76
77 # Process tasks respecting dependencies
78 while not all(t.status == TaskStatus.COMPLETED for t in tasks):
79 ready_tasks = [
80 t for t in tasks
81 if t.status == TaskStatus.PENDING
82 and self._dependencies_met(t, task_map)
83 ]
84
85 if not ready_tasks:
86 # Check for deadlock
87 pending = [t for t in tasks if t.status == TaskStatus.PENDING]
88 if pending:
89 raise RuntimeError("Workflow deadlock detected")
90 break
91
92 # Execute ready tasks in parallel
93 results = await asyncio.gather(
94 *[self._execute_task(t) for t in ready_tasks],
95 return_exceptions=True
96 )
97
98 # Process results
99 for task, result in zip(ready_tasks, results):
100 if isinstance(result, Exception):
101 task.status = TaskStatus.FAILED
102 task.error = str(result)
103 else:
104 task.status = TaskStatus.COMPLETED
105 task.result = result
106 self._update_context(task, result)
107
108 return self.context
109
110 async def _execute_task(self, task: Task) -> Any:
111 # Find capable agent
112 agent = self._select_agent(task)
113 if not agent:
114 raise ValueError(f"No agent available for task: {task.description}")
115
116 task.assigned_agent = agent.id
117 task.status = TaskStatus.IN_PROGRESS
118
119 return await agent.execute(task, self.context)
120
121 def _select_agent(self, task: Task) -> Optional[BaseAgent]:
122 for agent in self.agents.values():
123 if agent.can_handle(task):
124 return agent
125 return None
126
127 def _dependencies_met(self, task: Task, task_map: Dict) -> bool:
128 if not task.dependencies:
129 return True
130 return all(
131 task_map[dep_id].status == TaskStatus.COMPLETED
132 for dep_id in task.dependencies
133 )
134
135 def _update_context(self, task: Task, result: Any):
136 # Store results in context for dependent tasks
137 self.context[f"{task.id}_result"] = result1async def main():
2 # Initialize agents
3 llm_client = OpenAIClient() # Your LLM client
4 agents = [
5 ResearchAgent("researcher", llm_client),
6 WriterAgent("writer", llm_client),
7 ]
8
9 orchestrator = Orchestrator(agents)
10
11 # Define workflow
12 tasks = [
13 Task(
14 id="research_topic",
15 description="Research the benefits of multi-agent AI systems"
16 ),
17 Task(
18 id="write_intro",
19 description="Write an introduction based on research",
20 dependencies=["research_topic"]
21 ),
22 Task(
23 id="write_conclusion",
24 description="Write a conclusion summarizing key points",
25 dependencies=["research_topic", "write_intro"]
26 ),
27 ]
28
29 # Execute workflow
30 results = await orchestrator.submit_workflow(tasks)
31 print(results)
32
33asyncio.run(main())Production multi-agent systems must handle failures gracefully. Here are essential patterns:
1import asyncio
2from functools import wraps
3from typing import TypeVar, Callable
4
5T = TypeVar('T')
6
7def with_retry(
8 max_attempts: int = 3,
9 base_delay: float = 1.0,
10 max_delay: float = 60.0,
11 exponential_base: float = 2.0,
12 retryable_exceptions: tuple = (Exception,)
13):
14 def decorator(func: Callable[..., T]) -> Callable[..., T]:
15 @wraps(func)
16 async def wrapper(*args, **kwargs) -> T:
17 last_exception = None
18
19 for attempt in range(max_attempts):
20 try:
21 return await func(*args, **kwargs)
22 except retryable_exceptions as e:
23 last_exception = e
24
25 if attempt < max_attempts - 1:
26 delay = min(
27 base_delay * (exponential_base ** attempt),
28 max_delay
29 )
30 # Add jitter to prevent thundering herd
31 delay *= (0.5 + random.random())
32
33 logging.warning(
34 f"Attempt {attempt + 1} failed: {e}. "
35 f"Retrying in {delay:.2f}s"
36 )
37 await asyncio.sleep(delay)
38
39 raise last_exception
40
41 return wrapper
42 return decorator
43
44class ResilientAgent(BaseAgent):
45 @with_retry(max_attempts=3, retryable_exceptions=(TimeoutError, APIError))
46 async def execute(self, task: Task, context: Dict) -> Any:
47 return await self._do_execute(task, context)Prevent cascade failures by temporarily disabling failing agents:
1enum CircuitState {
2 CLOSED = 'closed', // Normal operation
3 OPEN = 'open', // Failing, reject requests
4 HALF_OPEN = 'half_open' // Testing if recovered
5}
6
7class CircuitBreaker {
8 private state: CircuitState = CircuitState.CLOSED;
9 private failureCount: number = 0;
10 private lastFailureTime: Date | null = null;
11 private successCount: number = 0;
12
13 constructor(
14 private readonly failureThreshold: number = 5,
15 private readonly resetTimeoutMs: number = 30000,
16 private readonly halfOpenSuccessThreshold: number = 3
17 ) {}
18
19 async execute<T>(fn: () => Promise<T>): Promise<T> {
20 if (this.state === CircuitState.OPEN) {
21 if (this.shouldAttemptReset()) {
22 this.state = CircuitState.HALF_OPEN;
23 } else {
24 throw new Error('Circuit breaker is OPEN');
25 }
26 }
27
28 try {
29 const result = await fn();
30 this.onSuccess();
31 return result;
32 } catch (error) {
33 this.onFailure();
34 throw error;
35 }
36 }
37
38 private onSuccess(): void {
39 if (this.state === CircuitState.HALF_OPEN) {
40 this.successCount++;
41 if (this.successCount >= this.halfOpenSuccessThreshold) {
42 this.reset();
43 }
44 } else {
45 this.failureCount = 0;
46 }
47 }
48
49 private onFailure(): void {
50 this.failureCount++;
51 this.lastFailureTime = new Date();
52 this.successCount = 0;
53
54 if (this.failureCount >= this.failureThreshold) {
55 this.state = CircuitState.OPEN;
56 }
57 }
58
59 private shouldAttemptReset(): boolean {
60 if (!this.lastFailureTime) return true;
61 const elapsed = Date.now() - this.lastFailureTime.getTime();
62 return elapsed >= this.resetTimeoutMs;
63 }
64
65 private reset(): void {
66 this.state = CircuitState.CLOSED;
67 this.failureCount = 0;
68 this.successCount = 0;
69 }
70}Multi-agent systems can be resource-intensive. These optimisations ensure efficient operation at scale.
Maximise throughput by executing independent tasks concurrently:
1import asyncio
2from asyncio import Semaphore
3
4class ParallelExecutor:
5 def __init__(self, max_concurrency: int = 10):
6 self.semaphore = Semaphore(max_concurrency)
7 self.active_tasks: Dict[str, asyncio.Task] = {}
8
9 async def execute_parallel(
10 self,
11 tasks: List[Task],
12 executor: Callable
13 ) -> List[Any]:
14 async def bounded_execute(task: Task) -> Any:
15 async with self.semaphore:
16 self.active_tasks[task.id] = asyncio.current_task()
17 try:
18 return await executor(task)
19 finally:
20 del self.active_tasks[task.id]
21
22 return await asyncio.gather(
23 *[bounded_execute(t) for t in tasks],
24 return_exceptions=True
25 )
26
27 async def cancel_all(self):
28 for task in self.active_tasks.values():
29 task.cancel()
30 await asyncio.gather(*self.active_tasks.values(), return_exceptions=True)Cache LLM responses for repeated queries to reduce latency and cost:
1import { createHash } from 'crypto';
2
3interface CacheEntry {
4 response: string;
5 timestamp: Date;
6 hitCount: number;
7}
8
9class LLMCache {
10 private cache: Map<string, CacheEntry> = new Map();
11 private readonly maxSize: number;
12 private readonly ttlMs: number;
13
14 constructor(maxSize: number = 1000, ttlMs: number = 3600000) {
15 this.maxSize = maxSize;
16 this.ttlMs = ttlMs;
17 }
18
19 private hashPrompt(prompt: string, model: string): string {
20 return createHash('sha256')
21 .update(`${model}:${prompt}`)
22 .digest('hex');
23 }
24
25 get(prompt: string, model: string): string | null {
26 const key = this.hashPrompt(prompt, model);
27 const entry = this.cache.get(key);
28
29 if (!entry) return null;
30
31 // Check TTL
32 if (Date.now() - entry.timestamp.getTime() > this.ttlMs) {
33 this.cache.delete(key);
34 return null;
35 }
36
37 entry.hitCount++;
38 return entry.response;
39 }
40
41 set(prompt: string, model: string, response: string): void {
42 // Evict if at capacity (LRU-style)
43 if (this.cache.size >= this.maxSize) {
44 this.evictLeastUsed();
45 }
46
47 const key = this.hashPrompt(prompt, model);
48 this.cache.set(key, {
49 response,
50 timestamp: new Date(),
51 hitCount: 1
52 });
53 }
54
55 private evictLeastUsed(): void {
56 let minHits = Infinity;
57 let evictKey: string | null = null;
58
59 for (const [key, entry] of this.cache) {
60 if (entry.hitCount < minHits) {
61 minHits = entry.hitCount;
62 evictKey = key;
63 }
64 }
65
66 if (evictKey) this.cache.delete(evictKey);
67 }
68}| Metric | Target | How to Measure |
|---|---|---|
| Task Latency (P95) | < 5 seconds | Time from task submission to completion |
| Throughput | 100+ tasks/minute | Tasks processed per time unit |
| Cache Hit Rate | > 30% | Cached responses / total requests |
| Error Rate | < 1% | Failed tasks / total tasks |
Multi-agent systems represent a significant step forward in AI application architecture. By decomposing complex tasks across specialised agents and coordinating their efforts through well-designed communication and orchestration patterns, you can build systems that handle complexity no single agent could manage.
The patterns we've covered—supervisor hierarchies, message-based communication, event sourcing, and resilient error handling—form the foundation of production-grade multi-agent systems. Start simple with a supervisor pattern, add complexity only as requirements demand, and always prioritise observability and error handling.
Remember that multi-agent systems are a means to an end, not the end itself. The goal is solving complex problems reliably and efficiently. Sometimes the right answer is a well-designed single agent. When you do need multiple agents, the patterns in this guide will help you build systems that are maintainable, scalable, and robust.
Discover how AI agents go beyond chatbots to autonomously accomplish tasks using tools and reasoning. Learn agent architectures, capabilities, business applications, and implementation strategies.
Build intelligent search systems with knowledge graphs. Learn graph database selection, ontology design, entity extraction, and RAG integration with production code examples.
Secure your AI systems against emerging threats. Learn prompt injection prevention, data protection strategies, access control patterns, and Australian Privacy Act compliance with practical code examples.