Coordination Hub
Coordination Hub enables you to build and execute multi-step workflows that involve multiple agents. It supports declarative plans, AI-powered plan generation, and adaptive execution.
Key Features
Section titled “Key Features”- Declarative plans - Define workflows in YAML or via API
- AI-powered generation - Generate plans from natural language goals
- Adaptive execution - Plans that revise themselves based on feedback
- Human-in-the-loop - Approval gates and input collection
- Parallel execution - Run steps concurrently for efficiency
A plan defines a workflow as a directed acyclic graph (DAG) of steps:
name: document-processingdescription: Process and summarize a document
steps: - id: extract type: agent agent: document-ocr input: document_id: "{{ input.document_id }}"
- id: summarize type: agent agent: summarizer depends_on: [extract] input: text: "{{ steps.extract.output.text }}"
- id: review type: human_approval depends_on: [summarize] prompt: "Please review the summary" assignee: "{{ input.reviewer }}"
- id: publish type: agent agent: publisher depends_on: [review] input: summary: "{{ steps.summarize.output.summary }}"Step Types
Section titled “Step Types”| Type | Description |
|---|---|
agent | Execute a task via an agent |
human_approval | Wait for human yes/no approval |
human_input | Collect data from a human |
condition | Branch based on an expression |
parallel | Run multiple steps concurrently |
wait | Wait for duration or event |
sub_plan | Execute another plan |
transform | Transform data with expressions |
AI-Powered Plan Generation
Section titled “AI-Powered Plan Generation”Generate plans from natural language:
generated = await client.coordination.generate_plan( goal="Create a customer onboarding workflow that verifies identity, sets up accounts, and sends a welcome email", constraints={ "require_human_approval": True, "max_steps": 10, "available_agents": ["kyc-agent", "account-service", "email-service"] })
# Review and save the generated planplan = await client.coordination.create_plan( alias="customer-onboarding", steps=generated.steps, description=generated.description)Adaptive Execution
Section titled “Adaptive Execution”Enable plans to revise themselves when steps fail:
run = await client.coordination.execute( plan_id=plan.id, input={"customer_id": "cust-123"}, adaptive={ "enabled": True, "max_revisions": 5, "triggers": [ {"condition": "step.status == 'failed'"}, {"condition": "step.output.confidence < 0.7"} ] })When a trigger condition is met, the system:
- Analyzes the failure context
- Uses an LLM to generate a revised plan
- Continues execution with the new plan
- Escalates to human review if revision limits are exceeded
Parallel Execution
Section titled “Parallel Execution”Run steps concurrently for efficiency:
steps = [ { "id": "parallel_tasks", "type": "parallel", "branches": [ {"id": "task_a", "type": "agent", "agent": "agent-a"}, {"id": "task_b", "type": "agent", "agent": "agent-b"}, {"id": "task_c", "type": "agent", "agent": "agent-c"} ], "concurrency": 3, "on_failure": "fail_fast" # or "continue", "wait_all" }]Expression Syntax
Section titled “Expression Syntax”Use {{ }} expressions to reference data:
| Expression | Description |
|---|---|
{{ input.field }} | Access plan input |
{{ steps.step_id.output.field }} | Access step output |
{{ steps.step_id.status }} | Access step status |
{{ now() }} | Current timestamp |
Triggers
Section titled “Triggers”Plans can be triggered in multiple ways:
| Trigger | Description |
|---|---|
| Manual | Execute via API call |
| Schedule | Cron-based scheduling |
| Event | Trigger on message event |
| Webhook | HTTP webhook trigger |
# Create a scheduled triggerawait client.coordination.create_trigger( plan_id=plan.id, type="schedule", config={"cron": "0 9 * * MON"} # Every Monday at 9 AM)Monitoring Runs
Section titled “Monitoring Runs”Track plan execution:
# Get run statusrun = await client.coordination.get_run(run_id=run.id)print(f"Status: {run.status}")print(f"Current step: {run.current_step}")
# List all runs for a planruns = await client.coordination.list_runs(plan_id=plan.id)Next Steps
Section titled “Next Steps”- Building Workflows Guide - Step-by-step workflow examples
- API Reference - Complete API documentation