Messaging Patterns
This guide covers common messaging patterns for agent-to-agent communication.
Request-Response
Section titled “Request-Response”The most common pattern: one agent sends a request, another responds.
# Requesterasync def request_processing(client, document_id): # Send request await client.messaging.send( destination="document-processor", event_name="process_document", payload={ "document_id": document_id, "reply_to": "my-agent" # Where to send response } )
# Wait for response async for message in client.messaging.subscribe( agent_id="my-agent", timeout=30 ): if message.event_name == "document_processed": return message.payload
# Processorasync def handle_requests(client): async for message in client.messaging.subscribe(agent_id="document-processor"): if message.event_name == "process_document": doc_id = message.payload["document_id"] result = await process(doc_id)
# Send response await client.messaging.send( destination=message.payload["reply_to"], event_name="document_processed", payload={"document_id": doc_id, "result": result} )
await client.messaging.acknowledge(message.message_id)Publish-Subscribe
Section titled “Publish-Subscribe”One agent publishes events, multiple agents subscribe.
# Publisherasync def publish_document_created(client, document): await client.messaging.publish( topic="documents.created", payload={ "document_id": document.id, "type": document.type, "size": document.size } )
# Subscriber 1: Indexerasync def index_new_documents(client): async for message in client.messaging.subscribe_topic(topic="documents.created"): doc_id = message.payload["document_id"] await index_document(doc_id) await client.messaging.acknowledge(message.message_id)
# Subscriber 2: Notifierasync def notify_on_documents(client): async for message in client.messaging.subscribe_topic(topic="documents.created"): doc_id = message.payload["document_id"] await send_notification(f"New document: {doc_id}") await client.messaging.acknowledge(message.message_id)Fan-Out
Section titled “Fan-Out”One request triggers multiple parallel operations.
async def fan_out_analysis(client, document_id): # Send to multiple specialized agents in parallel tasks = [ client.messaging.send( destination="ocr-agent", event_name="analyze", payload={"document_id": document_id, "type": "ocr"} ), client.messaging.send( destination="sentiment-agent", event_name="analyze", payload={"document_id": document_id, "type": "sentiment"} ), client.messaging.send( destination="entity-agent", event_name="analyze", payload={"document_id": document_id, "type": "entities"} ) ]
await asyncio.gather(*tasks)
# Collect results results = {} async for message in client.messaging.subscribe(agent_id="coordinator", timeout=30): if message.event_name == "analysis_complete": results[message.payload["type"]] = message.payload["result"]
if len(results) == 3: break
await client.messaging.acknowledge(message.message_id)
return resultsPipeline
Section titled “Pipeline”Chain multiple agents in sequence.
async def document_pipeline(client, document_id): # Stage 1: Extract text await client.messaging.send( destination="extractor", event_name="extract", payload={"document_id": document_id, "next": "translator"} )
# Extractor agentasync def handle_extract(client, message): text = await extract_text(message.payload["document_id"])
# Pass to next stage await client.messaging.send( destination=message.payload["next"], event_name="translate", payload={"text": text, "next": "summarizer"} )
# Translator agentasync def handle_translate(client, message): translated = await translate(message.payload["text"])
await client.messaging.send( destination=message.payload["next"], event_name="summarize", payload={"text": translated, "next": "publisher"} )
# ... continues through pipelineSaga Pattern
Section titled “Saga Pattern”Coordinate multiple operations with compensation on failure.
async def order_saga(client, order): try: # Step 1: Reserve inventory await client.messaging.send( destination="inventory-service", event_name="reserve", payload={"order_id": order.id, "items": order.items} ) reservation = await wait_for_response(client, "inventory_reserved")
# Step 2: Charge payment await client.messaging.send( destination="payment-service", event_name="charge", payload={"order_id": order.id, "amount": order.total} ) payment = await wait_for_response(client, "payment_charged")
# Step 3: Ship order await client.messaging.send( destination="shipping-service", event_name="ship", payload={"order_id": order.id, "address": order.address} ) shipment = await wait_for_response(client, "order_shipped")
return {"status": "completed", "tracking": shipment["tracking_number"]}
except Exception as e: # Compensate: undo previous steps await client.messaging.send( destination="payment-service", event_name="refund", payload={"order_id": order.id} ) await client.messaging.send( destination="inventory-service", event_name="release", payload={"order_id": order.id} ) raiseDead Letter Handling
Section titled “Dead Letter Handling”Handle messages that fail processing.
async def process_with_dlq(client): async for message in client.messaging.subscribe(agent_id="processor"): try: await process_message(message) await client.messaging.acknowledge(message.message_id)
except Exception as e: # After max retries, message goes to dead letter queue if message.retry_count >= 3: # Log for manual review await client.messaging.send( destination="dlq-monitor", event_name="dead_letter", payload={ "original": message.payload, "error": str(e), "retries": message.retry_count } ) await client.messaging.acknowledge(message.message_id) else: # Will be retried automatically await client.messaging.reject(message.message_id)Rate-Limited Processing
Section titled “Rate-Limited Processing”Control processing rate to avoid overwhelming downstream systems.
import asynciofrom datetime import datetime, timedelta
class RateLimitedProcessor: def __init__(self, client, max_per_second=10): self.client = client self.max_per_second = max_per_second self.last_process_time = datetime.min self.semaphore = asyncio.Semaphore(max_per_second)
async def process(self): async for message in self.client.messaging.subscribe(agent_id="processor"): async with self.semaphore: # Ensure we don't exceed rate limit elapsed = (datetime.now() - self.last_process_time).total_seconds() if elapsed < 1 / self.max_per_second: await asyncio.sleep(1 / self.max_per_second - elapsed)
await self.handle(message) self.last_process_time = datetime.now() await self.client.messaging.acknowledge(message.message_id)Next Steps
Section titled “Next Steps”- Building Workflows - Coordinate multiple agents
- API Reference - Complete messaging API