Observability Architecture
CitadelMesh implements comprehensive observability using OpenTelemetry for traces, metrics, and logs. This document describes the observability stack, instrumentation patterns, and monitoring dashboards.
Observability Stack
graph LR
subgraph "Edge"
Agent[Agents]
Adapter[Adapters]
OTelEdge[OTel Collector]
Agent --> OTelEdge
Adapter --> OTelEdge
end
subgraph "Cloud"
OTelCloud[OTel Collector]
Prometheus[Prometheus]
Tempo[Tempo]
Loki[Loki]
Grafana[Grafana]
OTelEdge --> OTelCloud
OTelCloud --> Prometheus
OTelCloud --> Tempo
OTelCloud --> Loki
Prometheus --> Grafana
Tempo --> Grafana
Loki --> Grafana
end
OpenTelemetry Instrumentation
Traces
Distributed tracing across agents, adapters, and cloud services.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Initialize tracer
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(
endpoint="otel-collector.citadel.svc:4317"
))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("citadel.agent.energy")
# Trace agent execution
async def optimize_hvac_setpoint(zone_id: str):
with tracer.start_as_current_span("optimize_hvac") as span:
span.set_attribute("zone.id", zone_id)
span.set_attribute("agent.type", "energy")
# Read current state
with tracer.start_as_current_span("read_zone_state"):
temp = await read_temperature(zone_id)
span.set_attribute("temp.current", temp)
# Calculate optimal setpoint
with tracer.start_as_current_span("calculate_setpoint"):
target = await calculate_optimal_temp(zone_id)
span.set_attribute("temp.target", target)
# Validate with OPA
with tracer.start_as_current_span("opa_validate"):
decision = await opa_client.evaluate("hvac.setpoint", {
"value": target
})
span.set_attribute("opa.allow", decision["allow"])
# Execute command
if decision["allow"]:
with tracer.start_as_current_span("write_setpoint"):
result = await write_setpoint(zone_id, target)
span.set_attribute("result.success", result.success)
return result
Trace Visualization (Grafana Tempo):
optimize_hvac (42ms)
├─ read_zone_state (8ms)
│ └─ ebo_adapter.read_point (6ms)
├─ calculate_setpoint (12ms)
├─ opa_validate (5ms)
│ └─ opa.evaluate (4ms)
└─ write_setpoint (15ms)
└─ ebo_adapter.write_point (12ms)
Metrics
Prometheus-compatible metrics for SLIs/SLOs.
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.prometheus import PrometheusMetricReader
# Initialize meter
provider = MeterProvider(metric_readers=[
PrometheusMetricReader(port=9090),
PeriodicExportingMetricReader(OTLPMetricExporter())
])
metrics.set_meter_provider(provider)
meter = metrics.get_meter("citadel.agent.security")
# Create metrics
command_counter = meter.create_counter(
"citadel.commands.total",
description="Total commands executed",
unit="1"
)
command_duration = meter.create_histogram(
"citadel.commands.duration_ms",
description="Command execution duration",
unit="ms"
)
policy_evaluations = meter.create_counter(
"citadel.policy.evaluations.total",
description="OPA policy evaluations",
unit="1"
)
policy_denials = meter.create_counter(
"citadel.policy.denials.total",
description="OPA policy denials",
unit="1"
)
# Record metrics
async def execute_command(command: Command):
start = time.time()
command_counter.add(1, {
"agent": config.agent_type,
"action": command.action,
"target": command.target_id
})
try:
result = await adapter.execute(command)
duration_ms = (time.time() - start) * 1000
command_duration.record(duration_ms, {
"agent": config.agent_type,
"success": str(result.success)
})
return result
except PolicyViolation as e:
policy_denials.add(1, {
"policy": e.policy_name,
"reason": e.reason
})
raise
Logs
Structured logging with OpenTelemetry correlation.
import structlog
from opentelemetry import trace
# Configure structlog with OTel context
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger("citadel.agent.security")
# Log with trace context
async def handle_security_incident(incident: Incident):
span = trace.get_current_span()
logger.info(
"security_incident_detected",
incident_id=incident.id,
severity=incident.severity,
trace_id=format(span.get_span_context().trace_id, '032x'),
span_id=format(span.get_span_context().span_id, '016x')
)
# Logs automatically correlated with traces in Grafana
Log Output:
{
"event": "security_incident_detected",
"incident_id": "inc-001",
"severity": "high",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"timestamp": "2025-09-30T15:30:00.123Z",
"level": "info"
}
Key Metrics
Agent Metrics
# Agent execution rate
rate(citadel_commands_total[5m])
# Command latency (P99)
histogram_quantile(0.99, citadel_commands_duration_ms_bucket)
# Policy denial rate
rate(citadel_policy_denials_total[5m]) /
rate(citadel_policy_evaluations_total[5m])
# Agent health
citadel_agent_healthy == 1
System Metrics
# Event bus throughput
rate(nats_in_msgs[5m])
# Event bus latency
nats_msg_latency_ms
# TimescaleDB write rate
rate(timescaledb_rows_inserted[5m])
# SPIRE SVID rotation
spire_svid_rotations_total
Business Metrics
# Energy savings (vs baseline)
(hvac_energy_baseline_kwh - hvac_energy_actual_kwh) /
hvac_energy_baseline_kwh * 100
# Security response time
histogram_quantile(0.95,
citadel_security_incident_response_time_seconds_bucket
)
# Comfort compliance rate
avg_over_time(citadel_comfort_score[1h])
Dashboards
Agent Dashboard (Grafana)
{
"dashboard": {
"title": "CitadelMesh Agents",
"panels": [
{
"title": "Command Execution Rate",
"targets": [
{
"expr": "sum(rate(citadel_commands_total[5m])) by (agent)",
"legendFormat": "{{agent}}"
}
]
},
{
"title": "Command Latency (P99)",
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(citadel_commands_duration_ms_bucket[5m])) by (agent, le))",
"legendFormat": "{{agent}}"
}
]
},
{
"title": "Policy Denials",
"targets": [
{
"expr": "sum(rate(citadel_policy_denials_total[5m])) by (policy, reason)"
}
]
}
]
}
}
.NET Aspire Dashboard
Built-in Aspire dashboard for .NET services:
# Access Aspire dashboard
http://localhost:18888
Shows:
- Service topology
- Dependency graph
- Resource utilization
- Distributed traces
- Logs with filtering
Building Operations Dashboard
# Custom dashboard queries
{
"building_state": {
"hvac_zones": await query_hvac_status(),
"security_status": await query_security_incidents(),
"energy_kpis": await query_energy_metrics(),
"comfort_score": await query_comfort_score()
}
}
Alerting
Prometheus Alerting Rules
groups:
- name: citadel_agents
interval: 30s
rules:
- alert: AgentDown
expr: citadel_agent_healthy == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Agent {{$labels.agent_id}} is down"
- alert: HighPolicyDenialRate
expr: |
rate(citadel_policy_denials_total[5m]) /
rate(citadel_policy_evaluations_total[5m]) > 0.10
for: 5m
labels:
severity: warning
annotations:
summary: "High policy denial rate: {{$value}}%"
- alert: CommandLatencyHigh
expr: |
histogram_quantile(0.99,
citadel_commands_duration_ms_bucket
) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "P99 command latency > 1s"
- alert: SecurityIncidentCritical
expr: citadel_security_incidents{severity="critical"} > 0
for: 0s
labels:
severity: page
annotations:
summary: "Critical security incident detected"
Alert Routing (Alertmanager)
route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 5m
repeat_interval: 3h
receiver: 'ops-team'
routes:
- match:
severity: critical
receiver: 'pagerduty'
- match:
severity: page
receiver: 'pagerduty-immediate'
receivers:
- name: 'ops-team'
slack_configs:
- api_url: 'https://hooks.slack.com/...'
channel: '#citadel-alerts'
- name: 'pagerduty'
pagerduty_configs:
- service_key: '<pd-integration-key>'
Distributed Tracing Examples
Cross-Service Trace
Energy optimization triggering multiple services:
TRACE: energy_optimization (152ms)
├─ energy_agent.optimize (150ms) [edge]
│ ├─ ebo_adapter.read_point (12ms)
│ ├─ twin_service.get_entity (18ms) [cloud gRPC]
│ ├─ weather_api.get_forecast (45ms) [external API]
│ ├─ opa.evaluate_policy (8ms)
│ └─ ebo_adapter.write_setpoint (22ms)
└─ telemetry.publish_result (2ms)
Error Trace with Context
Failed command with full context:
ERROR TRACE: door_unlock_failed (534ms)
├─ security_agent.handle_incident (532ms) [edge]
│ ├─ avigilon.get_camera_feed (89ms) ✓
│ ├─ opa.evaluate_policy (6ms) ✓
│ ├─ security_expert.unlock_door (425ms) ❌
│ │ └─ http_error: 503 Service Unavailable
│ └─ incident.escalate_to_ops (12ms) ✓
└─ audit.log_failure (2ms) ✓
Error Details:
- Service: security-expert-adapter
- Error: Connection timeout after 30s
- Retry Count: 3
- Last Error Time: 2025-09-30T15:30:25Z
Performance Monitoring
SLI/SLO Tracking
# SLI: Command Success Rate
sum(rate(citadel_commands_total{result="success"}[7d])) /
sum(rate(citadel_commands_total[7d]))
# SLO: 99.9% success rate
# Error Budget: 0.1% = 604 minutes/month
# Burn Rate Alert
(1 - sli) > 14.4 * (1 - slo) # Fast burn (1 hour window)
Resource Utilization
# CPU usage by agent
rate(container_cpu_usage_seconds_total{
namespace="citadel-agents"
}[5m])
# Memory usage by agent
container_memory_working_set_bytes{
namespace="citadel-agents"
}
# Disk I/O
rate(container_fs_writes_bytes_total{
namespace="citadel-agents"
}[5m])
Incident Investigation
Query Recent Traces
# Tempo TraceQL query
{
service_name="energy-agent" &&
status=error &&
duration > 1s
}
| select(span.zone_id, span.error_message)
Correlate Logs and Traces
# Loki query with trace correlation
{namespace="citadel-agents"} |= "error"
| json
| trace_id != ""
| line_format "{{.timestamp}} [{{.level}}] {{.message}} trace={{.trace_id}}"
Related Documentation
- Protocol Strategy - Instrumentation patterns
- Edge Architecture - Edge observability stack
- Cloud Integration - Cloud observability
- Agent Topology - Agent instrumentation