Your First Agent
In this tutorial, you’ll build a document processing agent that:
- Registers its capabilities with Discovery Hub
- Listens for processing requests
- Stores results in Artifact Hub
- Reports progress via Observability Hub
Prerequisites
Section titled “Prerequisites”- Completed the Quickstart
- An Acenta API key
Project Setup
Section titled “Project Setup”Create a new directory for your project:
mkdir document-agentcd document-agentpip install acentaThe Agent Code
Section titled “The Agent Code”Create agent.py:
import asyncioimport jsonfrom datetime import datetimefrom acenta import AcentaClient
class DocumentAgent: def __init__(self, api_key: str): self.client = AcentaClient(api_key=api_key) self.agent_id = "document-processor-001"
async def start(self): """Start the agent.""" # Register capabilities await self.register()
print(f"Agent {self.agent_id} started. Listening for messages...")
# Listen for messages async for message in self.client.messaging.subscribe(agent_id=self.agent_id): await self.handle_message(message)
async def register(self): """Register agent capabilities with Discovery Hub.""" await self.client.discovery.register_capability( agent_id=self.agent_id, capability="document.process", metadata={ "supported_formats": ["pdf", "docx", "txt"], "max_file_size_mb": 100, "version": "1.0.0" } )
await self.client.discovery.register_capability( agent_id=self.agent_id, capability="document.summarize", metadata={ "max_length": 1000, "languages": ["en", "es", "fr"] } )
print("Capabilities registered")
async def handle_message(self, message): """Handle incoming messages.""" # Create a span for observability async with self.client.observability.span( name="handle_message", attributes={ "message_id": message.message_id, "event_name": message.event_name } ) as span: try: if message.event_name == "process_document": await self.process_document(message, span) elif message.event_name == "summarize_document": await self.summarize_document(message, span) else: print(f"Unknown event: {message.event_name}")
# Acknowledge successful processing await self.client.messaging.acknowledge(message.message_id)
except Exception as e: span.set_attribute("error", str(e)) span.set_status("error") raise
async def process_document(self, message, span): """Process a document and store results.""" doc_id = message.payload.get("document_id") span.set_attribute("document_id", doc_id)
print(f"Processing document: {doc_id}")
# Simulate document processing await asyncio.sleep(2)
# Create result artifact result = { "document_id": doc_id, "processed_at": datetime.now().isoformat(), "word_count": 1500, "page_count": 5, "extracted_entities": [ {"type": "person", "value": "John Doe"}, {"type": "organization", "value": "Acenta Inc"} ] }
# Store result in Artifact Hub artifact = await self.client.artifact.create( name=f"processed-{doc_id}.json", content_type="application/json", data=json.dumps(result).encode(), metadata={ "source_document": doc_id, "processor": self.agent_id } )
span.set_attribute("artifact_id", artifact.id)
# Send response to requester await self.client.messaging.send( destination=message.source, event_name="document_processed", payload={ "document_id": doc_id, "artifact_id": artifact.id, "status": "completed" } )
print(f"Document processed: {doc_id} -> {artifact.id}")
async def summarize_document(self, message, span): """Summarize a document.""" doc_id = message.payload.get("document_id") max_length = message.payload.get("max_length", 500)
span.set_attribute("document_id", doc_id) span.set_attribute("max_length", max_length)
print(f"Summarizing document: {doc_id}")
# Simulate summarization await asyncio.sleep(1)
summary = "This is a sample summary of the document..."
# Send response await self.client.messaging.send( destination=message.source, event_name="document_summarized", payload={ "document_id": doc_id, "summary": summary, "word_count": len(summary.split()) } )
print(f"Document summarized: {doc_id}")
async def main(): import os api_key = os.getenv("ACENTA_API_KEY") if not api_key: print("Please set ACENTA_API_KEY environment variable") return
agent = DocumentAgent(api_key) await agent.start()
if __name__ == "__main__": asyncio.run(main())Running the Agent
Section titled “Running the Agent”Set your API key and run:
export ACENTA_API_KEY="ak_your_api_key"python agent.pyTesting the Agent
Section titled “Testing the Agent”Create a test script test_agent.py:
import asynciofrom acenta import AcentaClientimport os
async def main(): client = AcentaClient(api_key=os.getenv("ACENTA_API_KEY"))
# Find agents that can process documents agents = await client.discovery.find_agents( capability="document.process", routing_strategy="least_loaded" )
if not agents: print("No document processors found") return
target_agent = agents[0].agent_id print(f"Found agent: {target_agent}")
# Send a processing request response = await client.messaging.send( destination=target_agent, event_name="process_document", payload={"document_id": "doc-12345"} )
print(f"Request sent: {response.message_id}")
# Wait for response async for message in client.messaging.subscribe( agent_id="test-client", timeout=30 ): if message.event_name == "document_processed": print(f"Result: {message.payload}") break
if __name__ == "__main__": asyncio.run(main())Run the test:
python test_agent.pyWhat You Built
Section titled “What You Built”Your agent now:
- Registers capabilities - Other agents can discover it via Discovery Hub
- Handles messages - Processes requests asynchronously
- Stores artifacts - Saves results to Artifact Hub
- Reports traces - Creates spans for observability
Next Steps
Section titled “Next Steps”- Messaging Patterns - Learn about different messaging patterns
- Building Workflows - Coordinate multiple agents
- Monitoring - Set up alerts and dashboards