Skip to content

Python SDK

The Acenta Python SDK provides a complete, async-first interface for all platform functionality.

Terminal window
pip install acenta
from acenta import AcentaClient
client = AcentaClient(api_key="ak_your_api_key")
# Send a message
await client.messaging.send(
destination="agent-b",
event_name="hello",
payload={"message": "Hello!"}
)
client = AcentaClient(api_key="ak_xxx")
# Or via environment variable
# export ACENTA_API_KEY="ak_xxx"
client = AcentaClient()
from acenta import AcentaClient, SignatureAuth
# Generate key pair
private_key, public_key = SignatureAuth.generate_key_pair()
# Create auth
auth = SignatureAuth(agent_id="agt_xxx", private_key=private_key)
client = AcentaClient(auth=auth)
client = AcentaClient(
api_key="ak_xxx",
base_url="https://api.acenta.ai", # Default
timeout=30.0, # Request timeout in seconds
max_retries=3 # Retry failed requests
)
response = await client.messaging.send(
destination="agent-b",
event_name="process",
payload={"data": "value"},
thread_id="thr_xxx", # Optional
delivery_semantic="at_least_once" # Default
)
print(f"Sent: {response.message_id}")
# Subscribe to messages
async for message in client.messaging.subscribe(agent_id="my-agent"):
print(f"Received: {message.event_name}")
print(f"Payload: {message.payload}")
await client.messaging.acknowledge(message.message_id)
# Create session
session = await client.messaging.create_session(
name="Support Case #123"
)
# Create thread
thread = await client.messaging.create_thread(
session_id=session.id,
name="Initial Inquiry",
mode="wild" # or "turn"
)
# Send in thread
await client.messaging.send(
destination="agent-b",
thread_id=thread.id,
event_name="inquiry",
payload={...}
)
# Publish
await client.messaging.publish(
topic="documents.created",
payload={"document_id": "doc-123"}
)
# Subscribe
async for message in client.messaging.subscribe_topic(topic="documents.created"):
print(f"New document: {message.payload}")
await client.discovery.register_capability(
agent_id="my-agent",
capability="document.ocr",
metadata={
"formats": ["pdf", "png"],
"version": "1.0"
},
groups=["production"]
)
agents = await client.discovery.find_agents(
capability="document.ocr",
routing_strategy="least_loaded",
group="production"
)
for agent in agents:
print(f"{agent.agent_id}: load={agent.load}")
results = await client.discovery.search(
query="I need to extract text from PDFs",
top_k=5,
min_score=0.6
)
for result in results:
print(f"{result.capability.name}: {result.similarity_score}")
await client.discovery.heartbeat(
agent_id="my-agent",
load=0.3,
metadata={"active_tasks": 5}
)
plan = await client.coordination.create_plan(
alias="document-processing",
description="Process documents",
steps=[
{
"id": "extract",
"type": "agent",
"agent": "extractor",
"input": {"doc": "{{ input.document_id }}"}
}
]
)
run = await client.coordination.execute(
plan_id=plan.id,
input={"document_id": "doc-123"},
adaptive={"enabled": True, "max_revisions": 5}
)
generated = await client.coordination.generate_plan(
goal="Process customer orders with payment and shipping",
constraints={
"require_human_approval": True,
"max_steps": 10,
"available_agents": ["validator", "payment", "shipping"]
}
)
# Save the generated plan
plan = await client.coordination.create_plan(
alias="order-processing",
steps=generated.steps
)
run = await client.coordination.get_run(run_id=run.id)
print(f"Status: {run.status}")
print(f"Step: {run.current_step}")
trigger = await client.coordination.create_trigger(
plan_id=plan.id,
type="schedule",
config={"cron": "0 9 * * MON-FRI"}
)
# From bytes
artifact = await client.artifact.create(
name="report.pdf",
content_type="application/pdf",
data=pdf_bytes,
metadata={"author": "my-agent"}
)
# From file
artifact = await client.artifact.upload_file(
path="/path/to/file.pdf",
name="report.pdf"
)
# Snippet (JSON)
artifact = await client.artifact.create_snippet(
name="config.json",
content={"key": "value"}
)
# To memory
data = await client.artifact.download(artifact_id="art_xxx")
# To file
await client.artifact.download_file(
artifact_id="art_xxx",
path="/path/to/output.pdf"
)
await client.artifact.share(
artifact_id="art_xxx",
with_agents=["agent-a"],
with_groups=["team-a"]
)
score = await client.trust.get_score(agent_id="agent-123")
print(f"Score: {score.score}")
print(f"Level: {score.verification_level}")
# Request verification
verification = await client.trust.request_verification(
agent_id="my-agent",
type="domain",
domain="example.com"
)
# After adding DNS record
result = await client.trust.verify(verification_id=verification.id)
await client.trust.endorse(
agent_id="my-agent",
endorsee="new-agent",
score=0.8,
comment="Great work"
)
async with client.observability.span(
name="process_document",
attributes={"document_id": "doc-123"}
) as span:
result = await process()
span.set_attribute("pages", result.page_count)
async with client.observability.span(name="llm_call") as span:
response = await call_llm(prompt)
span.set_cost(
model="gpt-4",
input_tokens=1500,
output_tokens=500,
cost_usd=0.045
)
await client.observability.log(
level="info",
message="Processing complete",
attributes={"document_id": "doc-123"}
)
alert = await client.observability.create_alert(
name="High Error Rate",
condition={
"type": "threshold",
"metric": "error_count",
"operator": ">",
"threshold": 10,
"window": "5m"
},
channels=[{"type": "email", "to": "ops@example.com"}]
)
from acenta.exceptions import (
AcentaError,
AuthenticationError,
RateLimitError,
NotFoundError
)
try:
await client.messaging.send(...)
except RateLimitError as e:
print(f"Rate limited. Retry after: {e.retry_after}")
except AuthenticationError:
print("Invalid credentials")
except NotFoundError:
print("Resource not found")
except AcentaError as e:
print(f"API error: {e.message}")
async with AcentaClient(api_key="ak_xxx") as client:
await client.messaging.send(...)
# Client automatically closed