Observability Architecture
CitadelMesh implements comprehensive observability using OpenTelemetry for traces, metrics, and logs. This document describes the observability stack, instrumentation patterns, and monitoring dashboards.
Why this matters: Observability is not an afterthought bolted onto alarmsβit's how we prove the agent mesh acted within policy. Metrics and traces power compliance attestations just as much as SRE dashboards.
Observability Stackβ
OpenTelemetry Instrumentationβ
Tracesβ
Distributed tracing across agents, adapters, and cloud services.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Initialize tracer
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(
endpoint="otel-collector.citadel.svc:4317"
))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("citadel.agent.energy")
# Trace agent execution
async def optimize_hvac_setpoint(zone_id: str):
with tracer.start_as_current_span("optimize_hvac") as span:
span.set_attribute("zone.id", zone_id)
span.set_attribute("agent.type", "energy")
# Read current state
with tracer.start_as_current_span("read_zone_state"):
temp = await read_temperature(zone_id)
span.set_attribute("temp.current", temp)
# Calculate optimal setpoint
with tracer.start_as_current_span("calculate_setpoint"):
target = await calculate_optimal_temp(zone_id)
span.set_attribute("temp.target", target)
# Validate with OPA
with tracer.start_as_current_span("opa_validate"):
decision = await opa_client.evaluate("hvac.setpoint", {
"value": target
})
span.set_attribute("opa.allow", decision["allow"])
# Execute command
if decision["allow"]:
with tracer.start_as_current_span("write_setpoint"):
result = await write_setpoint(zone_id, target)
span.set_attribute("result.success", result.success)
return result
Trace Visualization (Grafana Tempo):
optimize_hvac (42ms)
ββ read_zone_state (8ms)
β ββ ebo_adapter.read_point (6ms)
ββ calculate_setpoint (12ms)
ββ opa_validate (5ms)
β ββ opa.evaluate (4ms)
ββ write_setpoint (15ms)
ββ ebo_adapter.write_point (12ms)
Metricsβ
Prometheus-compatible metrics for SLIs/SLOs.
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.prometheus import PrometheusMetricReader
# Initialize meter
provider = MeterProvider(metric_readers=[
PrometheusMetricReader(port=9090),
PeriodicExportingMetricReader(OTLPMetricExporter())
])
metrics.set_meter_provider(provider)
meter = metrics.get_meter("citadel.agent.security")
# Create metrics
command_counter = meter.create_counter(
"citadel.commands.total",
description="Total commands executed",
unit="1"
)
command_duration = meter.create_histogram(
"citadel.commands.duration_ms",
description="Command execution duration",
unit="ms"
)
policy_evaluations = meter.create_counter(
"citadel.policy.evaluations.total",
description="OPA policy evaluations",
unit="1"
)
policy_denials = meter.create_counter(
"citadel.policy.denials.total",
description="OPA policy denials",
unit="1"
)
# Record metrics
async def execute_command(command: Command):
start = time.time()
command_counter.add(1, {
"agent": config.agent_type,
"action": command.action,
"target": command.target_id
})
try:
result = await adapter.execute(command)
duration_ms = (time.time() - start) * 1000
command_duration.record(duration_ms, {
"agent": config.agent_type,
"success": str(result.success)
})
return result
except PolicyViolation as e:
policy_denials.add(1, {
"policy": e.policy_name,
"reason": e.reason
})
raise
Logsβ
Structured logging with OpenTelemetry correlation.
import structlog
from opentelemetry import trace
# Configure structlog with OTel context
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger("citadel.agent.security")
# Log with trace context
async def handle_security_incident(incident: Incident):
span = trace.get_current_span()
logger.info(
"security_incident_detected",
incident_id=incident.id,
severity=incident.severity,
trace_id=format(span.get_span_context().trace_id, '032x'),
span_id=format(span.get_span_context().span_id, '016x')
)
# Logs automatically correlated with traces in Grafana
Log Output:
{
"event": "security_incident_detected",
"incident_id": "inc-001",
"severity": "high",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"timestamp": "2025-09-30T15:30:00.123Z",
"level": "info"
}
Key Metricsβ
Agent Metricsβ
# Agent execution rate
rate(citadel_commands_total[5m])
# Command latency (P99)
histogram_quantile(0.99, citadel_commands_duration_ms_bucket)
# Policy denial rate
rate(citadel_policy_denials_total[5m]) /
rate(citadel_policy_evaluations_total[5m])
# Agent health
citadel_agent_healthy == 1
System Metricsβ
# Event bus throughput
rate(nats_in_msgs[5m])
# Event bus latency
nats_msg_latency_ms
# TimescaleDB write rate
rate(timescaledb_rows_inserted[5m])
# SPIRE SVID rotation
spire_svid_rotations_total
Business Metricsβ
# Energy savings (vs baseline)
(hvac_energy_baseline_kwh - hvac_energy_actual_kwh) /
hvac_energy_baseline_kwh * 100
# Security response time
histogram_quantile(0.95,
citadel_security_incident_response_time_seconds_bucket
)
# Comfort compliance rate
avg_over_time(citadel_comfort_score[1h])
Dashboardsβ
Agent Dashboard (Grafana)β
{
"dashboard": {
"title": "CitadelMesh Agents",
"panels": [
{
"title": "Command Execution Rate",
"targets": [
{
"expr": "sum(rate(citadel_commands_total[5m])) by (agent)",
"legendFormat": "{{agent}}"
}
]
},
{
"title": "Command Latency (P99)",
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(citadel_commands_duration_ms_bucket[5m])) by (agent, le))",
"legendFormat": "{{agent}}"
}
]
},
{
"title": "Policy Denials",
"targets": [
{
"expr": "sum(rate(citadel_policy_denials_total[5m])) by (policy, reason)"
}
]
}
]
}
}
.NET Aspire Dashboardβ
Built-in Aspire dashboard for .NET services:
# Access Aspire dashboard
http://localhost:18888
Shows:
- Service topology
- Dependency graph
- Resource utilization
- Distributed traces
- Logs with filtering
Building Operations Dashboardβ
# Custom dashboard queries
{
"building_state": {
"hvac_zones": await query_hvac_status(),
"security_status": await query_security_incidents(),
"energy_kpis": await query_energy_metrics(),
"comfort_score": await query_comfort_score()
}
}
Alertingβ
Prometheus Alerting Rulesβ
groups:
- name: citadel_agents
interval: 30s
rules:
- alert: AgentDown
expr: citadel_agent_healthy == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Agent {{$labels.agent_id}} is down"
- alert: HighPolicyDenialRate
expr: |
rate(citadel_policy_denials_total[5m]) /
rate(citadel_policy_evaluations_total[5m]) > 0.10
for: 5m
labels:
severity: warning
annotations:
summary: "High policy denial rate: {{$value}}%"
- alert: CommandLatencyHigh
expr: |
histogram_quantile(0.99,
citadel_commands_duration_ms_bucket
) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "P99 command latency > 1s"
- alert: SecurityIncidentCritical
expr: citadel_security_incidents{severity="critical"} > 0
for: 0s
labels:
severity: page
annotations:
summary: "Critical security incident detected"
Alert Routing (Alertmanager)β
route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 5m
repeat_interval: 3h
receiver: 'ops-team'
routes:
- match:
severity: critical
receiver: 'pagerduty'
- match:
severity: page
receiver: 'pagerduty-immediate'
receivers:
- name: 'ops-team'
slack_configs:
- api_url: 'https://hooks.slack.com/...'
channel: '#citadel-alerts'
- name: 'pagerduty'
pagerduty_configs:
- service_key: '<pd-integration-key>'
Distributed Tracing Examplesβ
Cross-Service Traceβ
Energy optimization triggering multiple services:
TRACE: energy_optimization (152ms)
ββ energy_agent.optimize (150ms) [edge]
β ββ ebo_adapter.read_point (12ms)
β ββ twin_service.get_entity (18ms) [cloud gRPC]
β ββ weather_api.get_forecast (45ms) [external API]
β ββ opa.evaluate_policy (8ms)
β ββ ebo_adapter.write_setpoint (22ms)
ββ telemetry.publish_result (2ms)
Error Trace with Contextβ
Failed command with full context:
ERROR TRACE: door_unlock_failed (534ms)
ββ security_agent.handle_incident (532ms) [edge]
β ββ avigilon.get_camera_feed (89ms) β
β ββ opa.evaluate_policy (6ms) β
β ββ security_expert.unlock_door (425ms) β
β β ββ http_error: 503 Service Unavailable
β ββ incident.escalate_to_ops (12ms) β
ββ audit.log_failure (2ms) β
Error Details:
- Service: security-expert-adapter
- Error: Connection timeout after 30s
- Retry Count: 3
- Last Error Time: 2025-09-30T15:30:25Z
Performance Monitoringβ
SLI/SLO Trackingβ
# SLI: Command Success Rate
sum(rate(citadel_commands_total{result="success"}[7d])) /
sum(rate(citadel_commands_total[7d]))
# SLO: 99.9% success rate
# Error Budget: 0.1% = 604 minutes/month
# Burn Rate Alert
(1 - sli) > 14.4 * (1 - slo) # Fast burn (1 hour window)
Resource Utilizationβ
# CPU usage by agent
rate(container_cpu_usage_seconds_total{
namespace="citadel-agents"
}[5m])
# Memory usage by agent
container_memory_working_set_bytes{
namespace="citadel-agents"
}
# Disk I/O
rate(container_fs_writes_bytes_total{
namespace="citadel-agents"
}[5m])
Incident Investigationβ
Query Recent Tracesβ
# Tempo TraceQL query
{
service_name="energy-agent" &&
status=error &&
duration > 1s
}
| select(span.zone_id, span.error_message)
Correlate Logs and Tracesβ
# Loki query with trace correlation
{namespace="citadel-agents"} |= "error"
| json
| trace_id != ""
| line_format "{{.timestamp}} [{{.level}}] {{.message}} trace={{.trace_id}}"
Related Documentationβ
- Protocol Strategy - Instrumentation patterns
- Edge Architecture - Edge observability stack
- Cloud Integration - Cloud observability
- Agent Topology - Agent instrumentation