Skip to content

Building Workflows

This guide shows you how to build and execute multi-agent workflows using Coordination Hub.

Let’s create a simple document processing workflow:

from acenta import AcentaClient
client = AcentaClient(api_key="...")
# Define the plan
plan = await client.coordination.create_plan(
alias="document-processing",
description="Extract, analyze, and store document data",
steps=[
{
"id": "extract",
"type": "agent",
"agent": "document-extractor",
"input": {
"document_id": "{{ input.document_id }}"
}
},
{
"id": "analyze",
"type": "agent",
"agent": "text-analyzer",
"depends_on": ["extract"],
"input": {
"text": "{{ steps.extract.output.text }}"
}
},
{
"id": "store",
"type": "agent",
"agent": "storage-service",
"depends_on": ["analyze"],
"input": {
"document_id": "{{ input.document_id }}",
"analysis": "{{ steps.analyze.output }}"
}
}
]
)
# Execute the plan
run = await client.coordination.execute(
plan_id=plan.id,
input={"document_id": "doc-12345"}
)
print(f"Run started: {run.id}")

Execute a task via an agent:

{
"id": "process",
"type": "agent",
"agent": "document-processor", # Agent ID or capability
"input": {"document": "{{ input.doc }}"},
"timeout": 300, # 5 minutes
"retry": {
"max_attempts": 3,
"backoff": "exponential"
}
}

Wait for human approval:

{
"id": "approval",
"type": "human_approval",
"prompt": "Please review the analysis results",
"assignee": "{{ input.reviewer_email }}",
"timeout": 86400, # 24 hours
"depends_on": ["analyze"]
}

Collect data from a human:

{
"id": "collect_info",
"type": "human_input",
"prompt": "Please provide additional details",
"form": {
"fields": [
{"name": "priority", "type": "select", "options": ["low", "medium", "high"]},
{"name": "notes", "type": "text", "required": False}
]
},
"assignee": "{{ input.owner }}"
}

Branch based on a condition:

{
"id": "check_size",
"type": "condition",
"expression": "{{ steps.extract.output.page_count > 10 }}",
"depends_on": ["extract"],
"then": ["detailed_analysis"],
"else": ["quick_analysis"]
}

Run steps concurrently:

{
"id": "parallel_analysis",
"type": "parallel",
"depends_on": ["extract"],
"branches": [
{"id": "sentiment", "type": "agent", "agent": "sentiment-analyzer"},
{"id": "entities", "type": "agent", "agent": "entity-extractor"},
{"id": "keywords", "type": "agent", "agent": "keyword-extractor"}
],
"concurrency": 3,
"on_failure": "continue" # or "fail_fast", "wait_all"
}

Wait for a duration or event:

# Wait for duration
{
"id": "cooldown",
"type": "wait",
"duration": "5m"
}
# Wait for event
{
"id": "wait_approval",
"type": "wait",
"event": "document.approved",
"timeout": "24h"
}

Execute another plan:

{
"id": "process_order",
"type": "sub_plan",
"plan": "order-processing",
"input": {
"order_id": "{{ input.order_id }}"
}
}

Transform data:

{
"id": "combine_results",
"type": "transform",
"depends_on": ["step_a", "step_b"],
"input": {
"combined": {
"a_result": "{{ steps.step_a.output.value }}",
"b_result": "{{ steps.step_b.output.value }}",
"timestamp": "{{ now() }}"
}
}
}

Let AI generate your workflow:

generated = await client.coordination.generate_plan(
goal="Create a workflow that processes customer orders: validate the order, check inventory, charge payment, and ship the package. If payment fails, cancel the order.",
constraints={
"require_human_approval": False,
"max_steps": 10,
"available_agents": [
"order-validator",
"inventory-service",
"payment-processor",
"shipping-service"
]
}
)
# Review the generated plan
print(f"Generated {len(generated.steps)} steps:")
for step in generated.steps:
print(f" {step['id']}: {step['type']}")
# Save if satisfied
plan = await client.coordination.create_plan(
alias="order-processing",
steps=generated.steps,
description=generated.description
)

Enable self-revising workflows:

run = await client.coordination.execute(
plan_id=plan.id,
input={"order_id": "ord-123"},
adaptive={
"enabled": True,
"max_revisions": 5,
"max_added_steps": 10,
"triggers": [
{"condition": "step.status == 'failed'"},
{"condition": "step.output.confidence < 0.7"}
],
"escalation": {
"notify": "ops@example.com",
"after_revisions": 3
}
}
)

Track execution progress:

# Get run status
run = await client.coordination.get_run(run_id=run.id)
print(f"Status: {run.status}")
print(f"Current step: {run.current_step}")
print(f"Progress: {run.completed_steps}/{run.total_steps}")
# Get step details
for step in run.steps:
print(f"{step.id}: {step.status}")
if step.error:
print(f" Error: {step.error}")
run = await client.coordination.execute(
plan_id=plan.id,
input={...}
)
trigger = await client.coordination.create_trigger(
plan_id=plan.id,
type="schedule",
config={"cron": "0 9 * * MON-FRI"}, # Weekdays at 9 AM
input={"type": "daily-report"}
)
trigger = await client.coordination.create_trigger(
plan_id=plan.id,
type="event",
config={
"topic": "documents.uploaded",
"filter": {"type": "invoice"}
},
input_mapping={
"document_id": "{{ event.document_id }}"
}
)
trigger = await client.coordination.create_trigger(
plan_id=plan.id,
type="webhook",
config={
"path": "/webhooks/process-order",
"method": "POST",
"auth": "bearer"
}
)
# Returns webhook URL: https://api.acenta.ai/hooks/xxx

Configure error handling per step:

{
"id": "risky_operation",
"type": "agent",
"agent": "external-api",
"on_failure": "human_review", # or "continue", "fail"
"retry": {
"max_attempts": 3,
"backoff": "exponential",
"initial_delay": "1s",
"max_delay": "30s"
},
"timeout": 60
}