Skip to content

Messaging Patterns

This guide covers common messaging patterns for agent-to-agent communication.

The most common pattern: one agent sends a request, another responds.

# Requester
async def request_processing(client, document_id):
# Send request
await client.messaging.send(
destination="document-processor",
event_name="process_document",
payload={
"document_id": document_id,
"reply_to": "my-agent" # Where to send response
}
)
# Wait for response
async for message in client.messaging.subscribe(
agent_id="my-agent",
timeout=30
):
if message.event_name == "document_processed":
return message.payload
# Processor
async def handle_requests(client):
async for message in client.messaging.subscribe(agent_id="document-processor"):
if message.event_name == "process_document":
doc_id = message.payload["document_id"]
result = await process(doc_id)
# Send response
await client.messaging.send(
destination=message.payload["reply_to"],
event_name="document_processed",
payload={"document_id": doc_id, "result": result}
)
await client.messaging.acknowledge(message.message_id)

One agent publishes events, multiple agents subscribe.

# Publisher
async def publish_document_created(client, document):
await client.messaging.publish(
topic="documents.created",
payload={
"document_id": document.id,
"type": document.type,
"size": document.size
}
)
# Subscriber 1: Indexer
async def index_new_documents(client):
async for message in client.messaging.subscribe_topic(topic="documents.created"):
doc_id = message.payload["document_id"]
await index_document(doc_id)
await client.messaging.acknowledge(message.message_id)
# Subscriber 2: Notifier
async def notify_on_documents(client):
async for message in client.messaging.subscribe_topic(topic="documents.created"):
doc_id = message.payload["document_id"]
await send_notification(f"New document: {doc_id}")
await client.messaging.acknowledge(message.message_id)

One request triggers multiple parallel operations.

async def fan_out_analysis(client, document_id):
# Send to multiple specialized agents in parallel
tasks = [
client.messaging.send(
destination="ocr-agent",
event_name="analyze",
payload={"document_id": document_id, "type": "ocr"}
),
client.messaging.send(
destination="sentiment-agent",
event_name="analyze",
payload={"document_id": document_id, "type": "sentiment"}
),
client.messaging.send(
destination="entity-agent",
event_name="analyze",
payload={"document_id": document_id, "type": "entities"}
)
]
await asyncio.gather(*tasks)
# Collect results
results = {}
async for message in client.messaging.subscribe(agent_id="coordinator", timeout=30):
if message.event_name == "analysis_complete":
results[message.payload["type"]] = message.payload["result"]
if len(results) == 3:
break
await client.messaging.acknowledge(message.message_id)
return results

Chain multiple agents in sequence.

async def document_pipeline(client, document_id):
# Stage 1: Extract text
await client.messaging.send(
destination="extractor",
event_name="extract",
payload={"document_id": document_id, "next": "translator"}
)
# Extractor agent
async def handle_extract(client, message):
text = await extract_text(message.payload["document_id"])
# Pass to next stage
await client.messaging.send(
destination=message.payload["next"],
event_name="translate",
payload={"text": text, "next": "summarizer"}
)
# Translator agent
async def handle_translate(client, message):
translated = await translate(message.payload["text"])
await client.messaging.send(
destination=message.payload["next"],
event_name="summarize",
payload={"text": translated, "next": "publisher"}
)
# ... continues through pipeline

Coordinate multiple operations with compensation on failure.

async def order_saga(client, order):
try:
# Step 1: Reserve inventory
await client.messaging.send(
destination="inventory-service",
event_name="reserve",
payload={"order_id": order.id, "items": order.items}
)
reservation = await wait_for_response(client, "inventory_reserved")
# Step 2: Charge payment
await client.messaging.send(
destination="payment-service",
event_name="charge",
payload={"order_id": order.id, "amount": order.total}
)
payment = await wait_for_response(client, "payment_charged")
# Step 3: Ship order
await client.messaging.send(
destination="shipping-service",
event_name="ship",
payload={"order_id": order.id, "address": order.address}
)
shipment = await wait_for_response(client, "order_shipped")
return {"status": "completed", "tracking": shipment["tracking_number"]}
except Exception as e:
# Compensate: undo previous steps
await client.messaging.send(
destination="payment-service",
event_name="refund",
payload={"order_id": order.id}
)
await client.messaging.send(
destination="inventory-service",
event_name="release",
payload={"order_id": order.id}
)
raise

Handle messages that fail processing.

async def process_with_dlq(client):
async for message in client.messaging.subscribe(agent_id="processor"):
try:
await process_message(message)
await client.messaging.acknowledge(message.message_id)
except Exception as e:
# After max retries, message goes to dead letter queue
if message.retry_count >= 3:
# Log for manual review
await client.messaging.send(
destination="dlq-monitor",
event_name="dead_letter",
payload={
"original": message.payload,
"error": str(e),
"retries": message.retry_count
}
)
await client.messaging.acknowledge(message.message_id)
else:
# Will be retried automatically
await client.messaging.reject(message.message_id)

Control processing rate to avoid overwhelming downstream systems.

import asyncio
from datetime import datetime, timedelta
class RateLimitedProcessor:
def __init__(self, client, max_per_second=10):
self.client = client
self.max_per_second = max_per_second
self.last_process_time = datetime.min
self.semaphore = asyncio.Semaphore(max_per_second)
async def process(self):
async for message in self.client.messaging.subscribe(agent_id="processor"):
async with self.semaphore:
# Ensure we don't exceed rate limit
elapsed = (datetime.now() - self.last_process_time).total_seconds()
if elapsed < 1 / self.max_per_second:
await asyncio.sleep(1 / self.max_per_second - elapsed)
await self.handle(message)
self.last_process_time = datetime.now()
await self.client.messaging.acknowledge(message.message_id)