Skip to content

Monitoring & Observability

This guide shows you how to monitor your agent systems effectively using Observability Hub.

from acenta import AcentaClient
client = AcentaClient(api_key="...")
async def process_document(document_id):
async with client.observability.span(
name="process_document",
attributes={"document_id": document_id}
) as span:
# Step 1: Extract
async with client.observability.span(name="extract") as extract_span:
text = await extract_text(document_id)
extract_span.set_attribute("chars", len(text))
# Step 2: Analyze
async with client.observability.span(name="analyze") as analyze_span:
result = await analyze(text)
analyze_span.set_attribute("entities_found", len(result.entities))
span.set_attribute("status", "success")
return result
from datetime import datetime, timedelta
# Recent traces
traces = await client.observability.query_traces(
start_time=datetime.now() - timedelta(hours=1),
end_time=datetime.now(),
limit=100
)
# Filter by agent
traces = await client.observability.query_traces(
filters={"agent_id": "document-processor"}
)
# Filter by status
traces = await client.observability.query_traces(
filters={"status": "error"}
)
trace = await client.observability.get_trace(trace_id="tr_xxx")
print(f"Duration: {trace.duration_ms}ms")
for span in trace.spans:
print(f" {span.name}: {span.duration_ms}ms")
async with client.observability.span(name="llm_call") as span:
response = await call_openai(prompt)
span.set_cost(
model="gpt-4",
input_tokens=count_tokens(prompt),
output_tokens=count_tokens(response),
cost_usd=calculate_cost(...)
)
# Daily costs by agent
costs = await client.observability.get_costs(
start_time=datetime.now() - timedelta(days=7),
group_by=["agent_id", "date"]
)
for item in costs:
print(f"{item.date} - {item.agent_id}: ${item.total_cost:.2f}")
summary = await client.observability.get_cost_summary(
start_time=datetime.now() - timedelta(days=30)
)
print(f"Total: ${summary.total:.2f}")
print(f"By model:")
for model, cost in summary.by_model.items():
print(f" {model}: ${cost:.2f}")
await client.observability.log(
level="info",
message="Document processed successfully",
attributes={
"document_id": "doc-123",
"duration_ms": 1500
}
)
# Levels: debug, info, warn, error
await client.observability.log(
level="error",
message="Processing failed",
attributes={
"document_id": "doc-456",
"error": "File not found"
}
)

Logs inside spans are automatically correlated:

async with client.observability.span(name="process") as span:
# This log is linked to the span
await client.observability.log(
level="info",
message="Starting processing"
)
result = await process()
await client.observability.log(
level="info",
message="Processing complete",
attributes={"result_size": len(result)}
)
logs = await client.observability.query_logs(
start_time=datetime.now() - timedelta(hours=1),
level="error",
search="connection"
)
for log in logs:
print(f"[{log.level}] {log.timestamp}: {log.message}")
# Alert on high error rate
alert = await client.observability.create_alert(
name="High Error Rate",
condition={
"type": "threshold",
"metric": "error_count",
"operator": ">",
"threshold": 10,
"window": "5m"
},
channels=[
{"type": "email", "to": "ops@example.com"}
]
)
# Alert on unusual latency
alert = await client.observability.create_alert(
name="Latency Anomaly",
condition={
"type": "anomaly",
"metric": "request_latency_ms",
"sensitivity": 2.0, # Standard deviations
"baseline_window": "24h"
},
channels=[
{"type": "slack", "webhook": "https://hooks.slack.com/..."}
]
)
# Alert when no data received
alert = await client.observability.create_alert(
name="Missing Heartbeats",
condition={
"type": "absence",
"metric": "heartbeat_count",
"missing_for": "5m"
},
channels=[...]
)
# Email
{"type": "email", "to": "team@example.com"}
# Slack
{"type": "slack", "webhook": "https://hooks.slack.com/..."}
# Webhook
{"type": "webhook", "url": "https://api.example.com/alerts"}
dashboard = await client.observability.get_dashboard(
time_range="1h"
)
print(f"Requests: {dashboard.total_requests}")
print(f"Errors: {dashboard.error_count} ({dashboard.error_rate:.2%})")
print(f"P95 Latency: {dashboard.p95_latency_ms}ms")
print(f"Cost: ${dashboard.total_cost:.2f}")
# Query specific metrics
metrics = await client.observability.query_metrics(
metric="request_latency_ms",
start_time=datetime.now() - timedelta(hours=24),
aggregation="p95",
interval="1h",
group_by=["agent_id"]
)
for point in metrics:
print(f"{point.timestamp}: {point.value}ms ({point.agent_id})")

Export to external systems:

# Configure export to external collector
await client.observability.configure_export(
endpoint="https://otel-collector.example.com:4318",
headers={"Authorization": "Bearer xxx"},
export_traces=True,
export_logs=True,
sample_rate=0.1 # 10%
)
# Good
async with client.observability.span(name="process_invoice"):
...
# Bad
async with client.observability.span(name="do_stuff"):
...
span.set_attribute("document_id", doc_id)
span.set_attribute("document_type", "invoice")
span.set_attribute("page_count", 5)
try:
result = await process()
except Exception as e:
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
span.set_status("error")
raise
  • High error rate
  • Unusual latency
  • Missing heartbeats
  • Budget exceeded