Skip to content

Coordination Hub

Coordination Hub enables you to build and execute multi-step workflows that involve multiple agents. It supports declarative plans, AI-powered plan generation, and adaptive execution.

  • Declarative plans - Define workflows in YAML or via API
  • AI-powered generation - Generate plans from natural language goals
  • Adaptive execution - Plans that revise themselves based on feedback
  • Human-in-the-loop - Approval gates and input collection
  • Parallel execution - Run steps concurrently for efficiency

A plan defines a workflow as a directed acyclic graph (DAG) of steps:

name: document-processing
description: Process and summarize a document
steps:
- id: extract
type: agent
agent: document-ocr
input:
document_id: "{{ input.document_id }}"
- id: summarize
type: agent
agent: summarizer
depends_on: [extract]
input:
text: "{{ steps.extract.output.text }}"
- id: review
type: human_approval
depends_on: [summarize]
prompt: "Please review the summary"
assignee: "{{ input.reviewer }}"
- id: publish
type: agent
agent: publisher
depends_on: [review]
input:
summary: "{{ steps.summarize.output.summary }}"
TypeDescription
agentExecute a task via an agent
human_approvalWait for human yes/no approval
human_inputCollect data from a human
conditionBranch based on an expression
parallelRun multiple steps concurrently
waitWait for duration or event
sub_planExecute another plan
transformTransform data with expressions

Generate plans from natural language:

generated = await client.coordination.generate_plan(
goal="Create a customer onboarding workflow that verifies identity, sets up accounts, and sends a welcome email",
constraints={
"require_human_approval": True,
"max_steps": 10,
"available_agents": ["kyc-agent", "account-service", "email-service"]
}
)
# Review and save the generated plan
plan = await client.coordination.create_plan(
alias="customer-onboarding",
steps=generated.steps,
description=generated.description
)

Enable plans to revise themselves when steps fail:

run = await client.coordination.execute(
plan_id=plan.id,
input={"customer_id": "cust-123"},
adaptive={
"enabled": True,
"max_revisions": 5,
"triggers": [
{"condition": "step.status == 'failed'"},
{"condition": "step.output.confidence < 0.7"}
]
}
)

When a trigger condition is met, the system:

  1. Analyzes the failure context
  2. Uses an LLM to generate a revised plan
  3. Continues execution with the new plan
  4. Escalates to human review if revision limits are exceeded

Run steps concurrently for efficiency:

steps = [
{
"id": "parallel_tasks",
"type": "parallel",
"branches": [
{"id": "task_a", "type": "agent", "agent": "agent-a"},
{"id": "task_b", "type": "agent", "agent": "agent-b"},
{"id": "task_c", "type": "agent", "agent": "agent-c"}
],
"concurrency": 3,
"on_failure": "fail_fast" # or "continue", "wait_all"
}
]

Use {{ }} expressions to reference data:

ExpressionDescription
{{ input.field }}Access plan input
{{ steps.step_id.output.field }}Access step output
{{ steps.step_id.status }}Access step status
{{ now() }}Current timestamp

Plans can be triggered in multiple ways:

TriggerDescription
ManualExecute via API call
ScheduleCron-based scheduling
EventTrigger on message event
WebhookHTTP webhook trigger
# Create a scheduled trigger
await client.coordination.create_trigger(
plan_id=plan.id,
type="schedule",
config={"cron": "0 9 * * MON"} # Every Monday at 9 AM
)

Track plan execution:

# Get run status
run = await client.coordination.get_run(run_id=run.id)
print(f"Status: {run.status}")
print(f"Current step: {run.current_step}")
# List all runs for a plan
runs = await client.coordination.list_runs(plan_id=plan.id)