Skip to content

Your First Agent

In this tutorial, you’ll build a document processing agent that:

  1. Registers its capabilities with Discovery Hub
  2. Listens for processing requests
  3. Stores results in Artifact Hub
  4. Reports progress via Observability Hub

Create a new directory for your project:

Terminal window
mkdir document-agent
cd document-agent
pip install acenta

Create agent.py:

import asyncio
import json
from datetime import datetime
from acenta import AcentaClient
class DocumentAgent:
def __init__(self, api_key: str):
self.client = AcentaClient(api_key=api_key)
self.agent_id = "document-processor-001"
async def start(self):
"""Start the agent."""
# Register capabilities
await self.register()
print(f"Agent {self.agent_id} started. Listening for messages...")
# Listen for messages
async for message in self.client.messaging.subscribe(agent_id=self.agent_id):
await self.handle_message(message)
async def register(self):
"""Register agent capabilities with Discovery Hub."""
await self.client.discovery.register_capability(
agent_id=self.agent_id,
capability="document.process",
metadata={
"supported_formats": ["pdf", "docx", "txt"],
"max_file_size_mb": 100,
"version": "1.0.0"
}
)
await self.client.discovery.register_capability(
agent_id=self.agent_id,
capability="document.summarize",
metadata={
"max_length": 1000,
"languages": ["en", "es", "fr"]
}
)
print("Capabilities registered")
async def handle_message(self, message):
"""Handle incoming messages."""
# Create a span for observability
async with self.client.observability.span(
name="handle_message",
attributes={
"message_id": message.message_id,
"event_name": message.event_name
}
) as span:
try:
if message.event_name == "process_document":
await self.process_document(message, span)
elif message.event_name == "summarize_document":
await self.summarize_document(message, span)
else:
print(f"Unknown event: {message.event_name}")
# Acknowledge successful processing
await self.client.messaging.acknowledge(message.message_id)
except Exception as e:
span.set_attribute("error", str(e))
span.set_status("error")
raise
async def process_document(self, message, span):
"""Process a document and store results."""
doc_id = message.payload.get("document_id")
span.set_attribute("document_id", doc_id)
print(f"Processing document: {doc_id}")
# Simulate document processing
await asyncio.sleep(2)
# Create result artifact
result = {
"document_id": doc_id,
"processed_at": datetime.now().isoformat(),
"word_count": 1500,
"page_count": 5,
"extracted_entities": [
{"type": "person", "value": "John Doe"},
{"type": "organization", "value": "Acenta Inc"}
]
}
# Store result in Artifact Hub
artifact = await self.client.artifact.create(
name=f"processed-{doc_id}.json",
content_type="application/json",
data=json.dumps(result).encode(),
metadata={
"source_document": doc_id,
"processor": self.agent_id
}
)
span.set_attribute("artifact_id", artifact.id)
# Send response to requester
await self.client.messaging.send(
destination=message.source,
event_name="document_processed",
payload={
"document_id": doc_id,
"artifact_id": artifact.id,
"status": "completed"
}
)
print(f"Document processed: {doc_id} -> {artifact.id}")
async def summarize_document(self, message, span):
"""Summarize a document."""
doc_id = message.payload.get("document_id")
max_length = message.payload.get("max_length", 500)
span.set_attribute("document_id", doc_id)
span.set_attribute("max_length", max_length)
print(f"Summarizing document: {doc_id}")
# Simulate summarization
await asyncio.sleep(1)
summary = "This is a sample summary of the document..."
# Send response
await self.client.messaging.send(
destination=message.source,
event_name="document_summarized",
payload={
"document_id": doc_id,
"summary": summary,
"word_count": len(summary.split())
}
)
print(f"Document summarized: {doc_id}")
async def main():
import os
api_key = os.getenv("ACENTA_API_KEY")
if not api_key:
print("Please set ACENTA_API_KEY environment variable")
return
agent = DocumentAgent(api_key)
await agent.start()
if __name__ == "__main__":
asyncio.run(main())

Set your API key and run:

Terminal window
export ACENTA_API_KEY="ak_your_api_key"
python agent.py

Create a test script test_agent.py:

import asyncio
from acenta import AcentaClient
import os
async def main():
client = AcentaClient(api_key=os.getenv("ACENTA_API_KEY"))
# Find agents that can process documents
agents = await client.discovery.find_agents(
capability="document.process",
routing_strategy="least_loaded"
)
if not agents:
print("No document processors found")
return
target_agent = agents[0].agent_id
print(f"Found agent: {target_agent}")
# Send a processing request
response = await client.messaging.send(
destination=target_agent,
event_name="process_document",
payload={"document_id": "doc-12345"}
)
print(f"Request sent: {response.message_id}")
# Wait for response
async for message in client.messaging.subscribe(
agent_id="test-client",
timeout=30
):
if message.event_name == "document_processed":
print(f"Result: {message.payload}")
break
if __name__ == "__main__":
asyncio.run(main())

Run the test:

Terminal window
python test_agent.py

Your agent now:

  1. Registers capabilities - Other agents can discover it via Discovery Hub
  2. Handles messages - Processes requests asynchronously
  3. Stores artifacts - Saves results to Artifact Hub
  4. Reports traces - Creates spans for observability