Safety Guardrails and Policy Architecture
Autonomous building operations demand rigorous safety guarantees. CitadelMesh implements defense-in-depth safety guardrails using policy-as-code, shadow mode evaluation, multi-step approvals, and circuit breakers. This document explains the safety architecture and fail-safe design.
Why this matters: Legacy automation often relies on a single BACnet permission or operator role before executing a control command. CitadelMesh requires every action to win consensus across policy, simulation, and human review layers so no single failure can create unsafe outcomes.
Safety Philosophyβ
Safe by Default, Autonomous by Permission
Every control action in CitadelMesh must pass through multiple safety layers:
- Static Policies: Hard-coded constraints (e.g., temperature bounds, emergency egress)
- OPA Policies: Dynamic policy evaluation with explain capability
- Shadow Mode: Test new policies without actuation
- Human Approvals: Multi-step gates for high-impact actions
- Circuit Breakers: Automatic rollback on anomaly detection
No single failure can cause unsafe building operations.
How This Differs From Legacy BMSβ
- Policy Transparency: Rego decisions explain βwhyβ in plain language; typical controller logic is an opaque ladder diagram or vendor scripting language.
- Shadow Evaluation: New policies run in simulation against live telemetry before they can actuate real equipmentβtraditional systems deploy and hope for the best.
- Human-in-the-Loop by Design: High-impact actions automatically seek approvals with full context; most BMS rely on manual phone calls or email threads.
- Telemetry for Audit: Every allow/deny path emits CloudEvents and OTEL traces so compliance teams can replay decisions on demand.
Safety Architectureβ
Layer 1: OPA Policy Engineβ
All control commands are evaluated by Open Policy Agent (OPA) using Rego policies.
Policy Structureβ
Policies are organized by domain:
policies/
βββ security/
β βββ door_access.rego
β βββ camera_control.rego
β βββ intrusion_response.rego
βββ hvac/
β βββ setpoint_control.rego
β βββ occupancy_modes.rego
β βββ demand_response.rego
βββ lighting/
β βββ scene_control.rego
β βββ emergency_lighting.rego
βββ common/
βββ time_constraints.rego
βββ emergency_override.rego
Example: HVAC Setpoint Policyβ
# policies/hvac/setpoint_control.rego
package citadel.hvac.setpoint
import rego.v1
# Default deny all actions
default allow := false
# Temperature limits (Fahrenheit)
min_temp := 65
max_temp := 78
# Allow setpoint changes within safe range
allow if {
input.action == "write_setpoint"
input.value >= min_temp
input.value <= max_temp
valid_entity_id
not emergency_mode
}
# Validate entity ID format
valid_entity_id if {
startswith(input.entity_id, "hvac.")
count(split(input.entity_id, ".")) >= 3
}
# Check for emergency lockout
emergency_mode if {
data.system_state.emergency_active == true
}
# Read operations always allowed
allow if {
input.action == "read_point"
}
# Deny reasons for explain
deny_reason := "Temperature setpoint outside safe range" if {
input.action == "write_setpoint"
not (input.value >= min_temp && input.value <= max_temp)
}
deny_reason := "Invalid entity ID format" if {
input.action == "write_setpoint"
not valid_entity_id
}
deny_reason := "System in emergency mode" if {
input.action == "write_setpoint"
emergency_mode
}
# Time-based constraints (night setback)
allow if {
input.action == "write_setpoint"
is_nighttime
input.value >= 60
input.value <= max_temp
valid_entity_id
not emergency_mode
}
is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour >= 22
}
is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour <= 6
}
Policy Evaluation Flowβ
from opa_client import OPAClient
from citadel.v1 import commands_pb2
# Agent creates command
command = commands_pb2.Command(
id=ulid(),
target_id="hvac.zone1.setpoint",
action="write_setpoint",
params={"value": "72"},
issued_by="spiffe://citadel.mesh/energy-agent"
)
# Evaluate against OPA policy
opa = OPAClient("http://opa:8181")
decision = await opa.evaluate(
policy="citadel.hvac.setpoint",
input={
"action": command.action,
"entity_id": command.target_id,
"value": float(command.params["value"]),
"issued_by": command.issued_by
},
explain=True
)
if decision["allow"]:
# Issue safety token
command.safety_token = opa.issue_token(decision)
await execute_command(command)
else:
# Log denial with explanation
logger.warning(
f"Command denied: {decision.get('deny_reason')}",
extra={
"command_id": command.id,
"explain_trace": decision["explain"]
}
)
Policy Testingβ
All policies have comprehensive test suites:
# policies/hvac/setpoint_control_test.rego
package citadel.hvac.setpoint
test_allow_valid_setpoint {
allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 72
} with data.system_state as {"emergency_active": false}
}
test_deny_too_hot {
not allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 85
}
}
test_deny_emergency_mode {
not allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 72
} with data.system_state as {"emergency_active": true}
}
test_allow_nighttime_setback {
allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 62
} with time.clock as [[23, 0, 0]]
with data.system_state as {"emergency_active": false}
}
Run tests in CI:
# Validate all policies
opa test policies/ -v
# Check for coverage
opa test policies/ --coverage
Layer 2: Shadow Modeβ
New policies and learning controllers can be evaluated in shadow mode without affecting real systems.
Shadow Mode Architectureβ
Shadow Policy Configurationβ
# config/policies/shadow.yaml
shadow_policies:
- name: hvac.setpoint.v2
base_policy: hvac.setpoint.v1
changes:
- Tighter temperature bounds (67-76Β°F)
- Occupancy-based overrides
duration: 7d
metrics:
- would_allow_rate
- would_deny_rate
- agreement_rate
- name: security.door.ml_anomaly
base_policy: security.door.v1
changes:
- Add ML anomaly detection
duration: 14d
alert_on_divergence: true
Shadow Evaluationβ
async def evaluate_with_shadow(command: Command, shadow_policy: str):
# Evaluate production policy
prod_decision = await opa.evaluate(
policy="citadel.hvac.setpoint.v1",
input=command_to_input(command)
)
# Evaluate shadow policy
shadow_decision = await opa.evaluate(
policy=shadow_policy,
input=command_to_input(command)
)
# Record metrics
metrics.counter("policy_shadow_evaluation", {
"policy": shadow_policy,
"prod_allow": prod_decision["allow"],
"shadow_allow": shadow_decision["allow"],
"agreement": prod_decision["allow"] == shadow_decision["allow"]
})
# Alert on divergence
if prod_decision["allow"] != shadow_decision["allow"]:
logger.info(
f"Shadow policy divergence: {shadow_policy}",
extra={
"command": command.id,
"prod": prod_decision["allow"],
"shadow": shadow_decision["allow"],
"prod_reason": prod_decision.get("deny_reason"),
"shadow_reason": shadow_decision.get("deny_reason")
}
)
# Execute based on production policy only
return prod_decision
Shadow Graduationβ
Promote shadow policies to production when metrics show safety:
# Promotion criteria
shadow_graduation:
hvac.setpoint.v2:
required_metrics:
- agreement_rate > 95%
- would_deny_unsafe > 0
- eval_count > 10000
required_duration: 7d
approval_required: true
rollback_plan: revert to v1
Layer 3: Approval Gatesβ
High-impact actions require human-in-the-loop approval.
Approval Triggersβ
Actions requiring approval:
- Emergency overrides: Bypassing standard safety policies
- Large setpoint changes: > 5Β°F temperature adjustment
- Access control changes: Modifying user permissions or door schedules
- Policy changes: Updating OPA policies in production
- System mode changes: Emergency mode, maintenance mode
Approval Workflowβ
Approval Implementationβ
from citadel.approval import ApprovalGate, ApprovalRequest
async def execute_high_impact_command(command: Command):
# Check if approval required
if requires_approval(command):
# Create approval request
approval = ApprovalRequest(
command_id=command.id,
action=command.action,
target=command.target_id,
params=command.params,
risk_level="high",
justification=get_justification(command),
timeout_seconds=300 # 5 minutes
)
# Submit to approval gate
gate = ApprovalGate()
result = await gate.request_approval(approval)
if result.approved:
# Record approval in command
command.safety_token = result.approval_token
logger.info(
f"Command approved by {result.approver}",
extra={"command_id": command.id}
)
else:
raise PermissionError(
f"Command rejected: {result.rejection_reason}"
)
# Execute command with safety token
await actuate(command)
Approval UIβ
Approvers receive rich context:
{
"approval_request_id": "01HQZXYZ9ABCDEF",
"command_id": "01HQZXYZ9GHIJKL",
"timestamp": "2025-09-30T15:30:00Z",
"action": "Emergency Override: Unlock All Doors",
"target": "building_a.all_doors",
"risk_level": "critical",
"justification": "Fire alarm detected in Zone 3",
"policy_explain": {
"production_policy": "citadel.security.door",
"decision": "deny",
"reason": "Emergency egress requires override",
"constraints": ["no_remote_unlock_all"]
},
"related_incidents": ["incident_01HQZXYZ9MNOPQR"],
"approvers": ["ops.manager", "security.lead"],
"timeout": 300
}
Layer 4: Circuit Breakersβ
Automatic rollback on anomaly detection protects against cascading failures.
Circuit Breaker Statesβ
Circuit Breaker Configurationβ
from citadel.safety import CircuitBreaker
# HVAC setpoint circuit breaker
hvac_breaker = CircuitBreaker(
name="hvac_setpoint",
failure_threshold=5, # Open after 5 failures
success_threshold=2, # Close after 2 successes
timeout_seconds=60, # Half-open after 60s
exceptions=[
DeviceUnreachableError,
SetpointOutOfBoundsError
]
)
@hvac_breaker.protect
async def write_hvac_setpoint(entity_id: str, value: float):
# This function is protected by circuit breaker
command = Command(
target_id=entity_id,
action="write_setpoint",
params={"value": str(value)}
)
return await adapter.execute(command)
Anomaly Detectionβ
ML-based anomaly detection triggers circuit breakers:
from citadel.safety import AnomalyDetector
detector = AnomalyDetector(
model="isolation_forest",
features=["setpoint_delta", "execution_time", "error_rate"],
threshold=0.95
)
async def execute_with_anomaly_check(command: Command):
# Extract features
features = {
"setpoint_delta": abs(current_setpoint - target_setpoint),
"execution_time": last_execution_time_ms,
"error_rate": recent_error_rate
}
# Check for anomaly
is_anomaly, score = detector.predict(features)
if is_anomaly:
logger.warning(
f"Anomaly detected for command {command.id}",
extra={"score": score, "features": features}
)
# Trigger circuit breaker
raise AnomalyDetectedError(
f"Anomaly score {score} exceeds threshold"
)
# Execute normally
return await execute(command)
Layer 5: Rollback and Recoveryβ
Automated rollback on policy violations or failures.
Rollback Strategyβ
from citadel.safety import RollbackManager
rollback = RollbackManager()
async def execute_with_rollback(command: Command):
# Capture current state
snapshot = await capture_state(command.target_id)
rollback.save_snapshot(command.id, snapshot)
try:
# Execute command
result = await execute(command)
# Verify result
new_state = await get_state(command.target_id)
if not verify_state(new_state, command):
raise StateVerificationError("State verification failed")
return result
except Exception as e:
logger.error(
f"Command failed, initiating rollback",
extra={"command_id": command.id, "error": str(e)}
)
# Rollback to previous state
await rollback.restore_snapshot(command.id)
# Publish rollback event
await event_bus.publish(CloudEvent(
type="citadel.safety.rollback",
source="safety-manager",
subject=command.target_id,
data={"command_id": command.id, "reason": str(e)}
))
raise
Safety Scorecardβ
Track safety metrics across all operations:
# Safety metrics dashboard
safety_metrics = {
"policy_evaluations_total": 125_430,
"policy_denials_total": 3_245,
"policy_denial_rate": 0.026,
"shadow_divergence_rate": 0.012,
"shadow_would_block_unsafe": 42,
"approvals_requested": 156,
"approvals_granted": 142,
"approvals_rejected": 8,
"approvals_expired": 6,
"circuit_breaker_opens": 3,
"rollbacks_executed": 2,
"safe_days": 47, # Days since last safety incident
}
Compliance Integrationβ
Safety policies map to compliance requirements:
# Policy compliance mapping
policies:
citadel.security.door:
compliance_frameworks:
- IEC-62443: SC-3.3 (Access Control)
- NIST-800-82: AC-2 (Account Management)
controls:
- Emergency egress never blocked
- Audit trail for all access changes
- Multi-person approval for bulk changes
citadel.hvac.setpoint:
compliance_frameworks:
- ASHRAE-90.1: Temperature setpoint limits
- ISO-50001: Energy management constraints
controls:
- Temperature bounds enforced
- Occupancy-based optimization
- Demand response compliance
Related Documentationβ
- Protocol Strategy - Safety tokens and signed commands
- Identity Foundation - SPIFFE-based authentication
- Observability - Safety metrics and alerting
- Agent Topology - Agent safety integration