Skip to main content

Observability Architecture

CitadelMesh implements comprehensive observability using OpenTelemetry for traces, metrics, and logs. This document describes the observability stack, instrumentation patterns, and monitoring dashboards.

Observability Stack

graph LR
subgraph "Edge"
Agent[Agents]
Adapter[Adapters]
OTelEdge[OTel Collector]

Agent --> OTelEdge
Adapter --> OTelEdge
end

subgraph "Cloud"
OTelCloud[OTel Collector]
Prometheus[Prometheus]
Tempo[Tempo]
Loki[Loki]
Grafana[Grafana]

OTelEdge --> OTelCloud
OTelCloud --> Prometheus
OTelCloud --> Tempo
OTelCloud --> Loki

Prometheus --> Grafana
Tempo --> Grafana
Loki --> Grafana
end

OpenTelemetry Instrumentation

Traces

Distributed tracing across agents, adapters, and cloud services.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Initialize tracer
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(
endpoint="otel-collector.citadel.svc:4317"
))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("citadel.agent.energy")

# Trace agent execution
async def optimize_hvac_setpoint(zone_id: str):
with tracer.start_as_current_span("optimize_hvac") as span:
span.set_attribute("zone.id", zone_id)
span.set_attribute("agent.type", "energy")

# Read current state
with tracer.start_as_current_span("read_zone_state"):
temp = await read_temperature(zone_id)
span.set_attribute("temp.current", temp)

# Calculate optimal setpoint
with tracer.start_as_current_span("calculate_setpoint"):
target = await calculate_optimal_temp(zone_id)
span.set_attribute("temp.target", target)

# Validate with OPA
with tracer.start_as_current_span("opa_validate"):
decision = await opa_client.evaluate("hvac.setpoint", {
"value": target
})
span.set_attribute("opa.allow", decision["allow"])

# Execute command
if decision["allow"]:
with tracer.start_as_current_span("write_setpoint"):
result = await write_setpoint(zone_id, target)
span.set_attribute("result.success", result.success)

return result

Trace Visualization (Grafana Tempo):

optimize_hvac (42ms)
├─ read_zone_state (8ms)
│ └─ ebo_adapter.read_point (6ms)
├─ calculate_setpoint (12ms)
├─ opa_validate (5ms)
│ └─ opa.evaluate (4ms)
└─ write_setpoint (15ms)
└─ ebo_adapter.write_point (12ms)

Metrics

Prometheus-compatible metrics for SLIs/SLOs.

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.prometheus import PrometheusMetricReader

# Initialize meter
provider = MeterProvider(metric_readers=[
PrometheusMetricReader(port=9090),
PeriodicExportingMetricReader(OTLPMetricExporter())
])
metrics.set_meter_provider(provider)

meter = metrics.get_meter("citadel.agent.security")

# Create metrics
command_counter = meter.create_counter(
"citadel.commands.total",
description="Total commands executed",
unit="1"
)

command_duration = meter.create_histogram(
"citadel.commands.duration_ms",
description="Command execution duration",
unit="ms"
)

policy_evaluations = meter.create_counter(
"citadel.policy.evaluations.total",
description="OPA policy evaluations",
unit="1"
)

policy_denials = meter.create_counter(
"citadel.policy.denials.total",
description="OPA policy denials",
unit="1"
)

# Record metrics
async def execute_command(command: Command):
start = time.time()

command_counter.add(1, {
"agent": config.agent_type,
"action": command.action,
"target": command.target_id
})

try:
result = await adapter.execute(command)

duration_ms = (time.time() - start) * 1000
command_duration.record(duration_ms, {
"agent": config.agent_type,
"success": str(result.success)
})

return result

except PolicyViolation as e:
policy_denials.add(1, {
"policy": e.policy_name,
"reason": e.reason
})
raise

Logs

Structured logging with OpenTelemetry correlation.

import structlog
from opentelemetry import trace

# Configure structlog with OTel context
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)

logger = structlog.get_logger("citadel.agent.security")

# Log with trace context
async def handle_security_incident(incident: Incident):
span = trace.get_current_span()

logger.info(
"security_incident_detected",
incident_id=incident.id,
severity=incident.severity,
trace_id=format(span.get_span_context().trace_id, '032x'),
span_id=format(span.get_span_context().span_id, '016x')
)

# Logs automatically correlated with traces in Grafana

Log Output:

{
"event": "security_incident_detected",
"incident_id": "inc-001",
"severity": "high",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"timestamp": "2025-09-30T15:30:00.123Z",
"level": "info"
}

Key Metrics

Agent Metrics

# Agent execution rate
rate(citadel_commands_total[5m])

# Command latency (P99)
histogram_quantile(0.99, citadel_commands_duration_ms_bucket)

# Policy denial rate
rate(citadel_policy_denials_total[5m]) /
rate(citadel_policy_evaluations_total[5m])

# Agent health
citadel_agent_healthy == 1

System Metrics

# Event bus throughput
rate(nats_in_msgs[5m])

# Event bus latency
nats_msg_latency_ms

# TimescaleDB write rate
rate(timescaledb_rows_inserted[5m])

# SPIRE SVID rotation
spire_svid_rotations_total

Business Metrics

# Energy savings (vs baseline)
(hvac_energy_baseline_kwh - hvac_energy_actual_kwh) /
hvac_energy_baseline_kwh * 100

# Security response time
histogram_quantile(0.95,
citadel_security_incident_response_time_seconds_bucket
)

# Comfort compliance rate
avg_over_time(citadel_comfort_score[1h])

Dashboards

Agent Dashboard (Grafana)

{
"dashboard": {
"title": "CitadelMesh Agents",
"panels": [
{
"title": "Command Execution Rate",
"targets": [
{
"expr": "sum(rate(citadel_commands_total[5m])) by (agent)",
"legendFormat": "{{agent}}"
}
]
},
{
"title": "Command Latency (P99)",
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(citadel_commands_duration_ms_bucket[5m])) by (agent, le))",
"legendFormat": "{{agent}}"
}
]
},
{
"title": "Policy Denials",
"targets": [
{
"expr": "sum(rate(citadel_policy_denials_total[5m])) by (policy, reason)"
}
]
}
]
}
}

.NET Aspire Dashboard

Built-in Aspire dashboard for .NET services:

# Access Aspire dashboard
http://localhost:18888

Shows:

  • Service topology
  • Dependency graph
  • Resource utilization
  • Distributed traces
  • Logs with filtering

Building Operations Dashboard

# Custom dashboard queries
{
"building_state": {
"hvac_zones": await query_hvac_status(),
"security_status": await query_security_incidents(),
"energy_kpis": await query_energy_metrics(),
"comfort_score": await query_comfort_score()
}
}

Alerting

Prometheus Alerting Rules

groups:
- name: citadel_agents
interval: 30s
rules:
- alert: AgentDown
expr: citadel_agent_healthy == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Agent {{$labels.agent_id}} is down"

- alert: HighPolicyDenialRate
expr: |
rate(citadel_policy_denials_total[5m]) /
rate(citadel_policy_evaluations_total[5m]) > 0.10
for: 5m
labels:
severity: warning
annotations:
summary: "High policy denial rate: {{$value}}%"

- alert: CommandLatencyHigh
expr: |
histogram_quantile(0.99,
citadel_commands_duration_ms_bucket
) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "P99 command latency > 1s"

- alert: SecurityIncidentCritical
expr: citadel_security_incidents{severity="critical"} > 0
for: 0s
labels:
severity: page
annotations:
summary: "Critical security incident detected"

Alert Routing (Alertmanager)

route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 5m
repeat_interval: 3h
receiver: 'ops-team'

routes:
- match:
severity: critical
receiver: 'pagerduty'

- match:
severity: page
receiver: 'pagerduty-immediate'

receivers:
- name: 'ops-team'
slack_configs:
- api_url: 'https://hooks.slack.com/...'
channel: '#citadel-alerts'

- name: 'pagerduty'
pagerduty_configs:
- service_key: '<pd-integration-key>'

Distributed Tracing Examples

Cross-Service Trace

Energy optimization triggering multiple services:

TRACE: energy_optimization (152ms)
├─ energy_agent.optimize (150ms) [edge]
│ ├─ ebo_adapter.read_point (12ms)
│ ├─ twin_service.get_entity (18ms) [cloud gRPC]
│ ├─ weather_api.get_forecast (45ms) [external API]
│ ├─ opa.evaluate_policy (8ms)
│ └─ ebo_adapter.write_setpoint (22ms)
└─ telemetry.publish_result (2ms)

Error Trace with Context

Failed command with full context:

ERROR TRACE: door_unlock_failed (534ms)
├─ security_agent.handle_incident (532ms) [edge]
│ ├─ avigilon.get_camera_feed (89ms) ✓
│ ├─ opa.evaluate_policy (6ms) ✓
│ ├─ security_expert.unlock_door (425ms) ❌
│ │ └─ http_error: 503 Service Unavailable
│ └─ incident.escalate_to_ops (12ms) ✓
└─ audit.log_failure (2ms) ✓

Error Details:
- Service: security-expert-adapter
- Error: Connection timeout after 30s
- Retry Count: 3
- Last Error Time: 2025-09-30T15:30:25Z

Performance Monitoring

SLI/SLO Tracking

# SLI: Command Success Rate
sum(rate(citadel_commands_total{result="success"}[7d])) /
sum(rate(citadel_commands_total[7d]))

# SLO: 99.9% success rate
# Error Budget: 0.1% = 604 minutes/month

# Burn Rate Alert
(1 - sli) > 14.4 * (1 - slo) # Fast burn (1 hour window)

Resource Utilization

# CPU usage by agent
rate(container_cpu_usage_seconds_total{
namespace="citadel-agents"
}[5m])

# Memory usage by agent
container_memory_working_set_bytes{
namespace="citadel-agents"
}

# Disk I/O
rate(container_fs_writes_bytes_total{
namespace="citadel-agents"
}[5m])

Incident Investigation

Query Recent Traces

# Tempo TraceQL query
{
service_name="energy-agent" &&
status=error &&
duration > 1s
}
| select(span.zone_id, span.error_message)

Correlate Logs and Traces

# Loki query with trace correlation
{namespace="citadel-agents"} |= "error"
| json
| trace_id != ""
| line_format "{{.timestamp}} [{{.level}}] {{.message}} trace={{.trace_id}}"

See Also