Safety Guardrails and Policy Architecture
Autonomous building operations demand rigorous safety guarantees. CitadelMesh implements defense-in-depth safety guardrails using policy-as-code, shadow mode evaluation, multi-step approvals, and circuit breakers. This document explains the safety architecture and fail-safe design.
Safety Philosophy
Safe by Default, Autonomous by Permission
Every control action in CitadelMesh must pass through multiple safety layers:
- Static Policies: Hard-coded constraints (e.g., temperature bounds, emergency egress)
- OPA Policies: Dynamic policy evaluation with explain capability
- Shadow Mode: Test new policies without actuation
- Human Approvals: Multi-step gates for high-impact actions
- Circuit Breakers: Automatic rollback on anomaly detection
No single failure can cause unsafe building operations.
Safety Architecture
graph TB
Agent[Agent] --> Draft[Draft Command]
Draft --> Policy[OPA Policy Engine]
Policy --> Decision{Allow?}
Decision -->|Deny| Audit1[Audit: Blocked]
Decision -->|Allow + Shadow| Shadow[Shadow Evaluation]
Decision -->|Allow + Production| Token[Issue Safety Token]
Shadow --> Metrics[Metrics: Would-Allow/Deny]
Shadow --> Audit2[Audit: Shadow Result]
Token --> Approval{High Impact?}
Approval -->|Yes| Human[Human Approval Gate]
Approval -->|No| Execute[Execute Command]
Human -->|Approved| Execute
Human -->|Rejected| Audit3[Audit: Rejected]
Execute --> Monitor[Monitor Result]
Monitor --> CB{Circuit Breaker?}
CB -->|Anomaly| Rollback[Rollback + Alert]
CB -->|Normal| Success[Success]
Layer 1: OPA Policy Engine
All control commands are evaluated by Open Policy Agent (OPA) using Rego policies.
Policy Structure
Policies are organized by domain:
policies/
├── security/
│ ├── door_access.rego
│ ├── camera_control.rego
│ └── intrusion_response.rego
├── hvac/
│ ├── setpoint_control.rego
│ ├── occupancy_modes.rego
│ └── demand_response.rego
├── lighting/
│ ├── scene_control.rego
│ └── emergency_lighting.rego
└── common/
├── time_constraints.rego
└── emergency_override.rego
Example: HVAC Setpoint Policy
# policies/hvac/setpoint_control.rego
package citadel.hvac.setpoint
import rego.v1
# Default deny all actions
default allow := false
# Temperature limits (Fahrenheit)
min_temp := 65
max_temp := 78
# Allow setpoint changes within safe range
allow if {
input.action == "write_setpoint"
input.value >= min_temp
input.value <= max_temp
valid_entity_id
not emergency_mode
}
# Validate entity ID format
valid_entity_id if {
startswith(input.entity_id, "hvac.")
count(split(input.entity_id, ".")) >= 3
}
# Check for emergency lockout
emergency_mode if {
data.system_state.emergency_active == true
}
# Read operations always allowed
allow if {
input.action == "read_point"
}
# Deny reasons for explain
deny_reason := "Temperature setpoint outside safe range" if {
input.action == "write_setpoint"
not (input.value >= min_temp && input.value <= max_temp)
}
deny_reason := "Invalid entity ID format" if {
input.action == "write_setpoint"
not valid_entity_id
}
deny_reason := "System in emergency mode" if {
input.action == "write_setpoint"
emergency_mode
}
# Time-based constraints (night setback)
allow if {
input.action == "write_setpoint"
is_nighttime
input.value >= 60
input.value <= max_temp
valid_entity_id
not emergency_mode
}
is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour >= 22
}
is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour <= 6
}
Policy Evaluation Flow
from opa_client import OPAClient
from citadel.v1 import commands_pb2
# Agent creates command
command = commands_pb2.Command(
id=ulid(),
target_id="hvac.zone1.setpoint",
action="write_setpoint",
params={"value": "72"},
issued_by="spiffe://citadel.mesh/energy-agent"
)
# Evaluate against OPA policy
opa = OPAClient("http://opa:8181")
decision = await opa.evaluate(
policy="citadel.hvac.setpoint",
input={
"action": command.action,
"entity_id": command.target_id,
"value": float(command.params["value"]),
"issued_by": command.issued_by
},
explain=True
)
if decision["allow"]:
# Issue safety token
command.safety_token = opa.issue_token(decision)
await execute_command(command)
else:
# Log denial with explanation
logger.warning(
f"Command denied: {decision.get('deny_reason')}",
extra={
"command_id": command.id,
"explain_trace": decision["explain"]
}
)
Policy Testing
All policies have comprehensive test suites:
# policies/hvac/setpoint_control_test.rego
package citadel.hvac.setpoint
test_allow_valid_setpoint {
allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 72
} with data.system_state as {"emergency_active": false}
}
test_deny_too_hot {
not allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 85
}
}
test_deny_emergency_mode {
not allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 72
} with data.system_state as {"emergency_active": true}
}
test_allow_nighttime_setback {
allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 62
} with time.clock as [[23, 0, 0]]
with data.system_state as {"emergency_active": false}
}
Run tests in CI:
# Validate all policies
opa test policies/ -v
# Check for coverage
opa test policies/ --coverage
Layer 2: Shadow Mode
New policies and learning controllers can be evaluated in shadow mode without affecting real systems.
Shadow Mode Architecture
sequenceDiagram
participant Agent
participant OPA
participant Metrics
participant Actuator
Agent->>OPA: Evaluate (mode=shadow)
OPA->>OPA: Run production policy
OPA->>OPA: Run shadow policy
OPA-->>Agent: Production: allow<br/>Shadow: deny
OPA->>Metrics: Record shadow decision
Agent->>Actuator: Execute (production allowed)
Agent->>Metrics: Record would-block
Shadow Policy Configuration
# config/policies/shadow.yaml
shadow_policies:
- name: hvac.setpoint.v2
base_policy: hvac.setpoint.v1
changes:
- Tighter temperature bounds (67-76°F)
- Occupancy-based overrides
duration: 7d
metrics:
- would_allow_rate
- would_deny_rate
- agreement_rate
- name: security.door.ml_anomaly
base_policy: security.door.v1
changes:
- Add ML anomaly detection
duration: 14d
alert_on_divergence: true
Shadow Evaluation
async def evaluate_with_shadow(command: Command, shadow_policy: str):
# Evaluate production policy
prod_decision = await opa.evaluate(
policy="citadel.hvac.setpoint.v1",
input=command_to_input(command)
)
# Evaluate shadow policy
shadow_decision = await opa.evaluate(
policy=shadow_policy,
input=command_to_input(command)
)
# Record metrics
metrics.counter("policy_shadow_evaluation", {
"policy": shadow_policy,
"prod_allow": prod_decision["allow"],
"shadow_allow": shadow_decision["allow"],
"agreement": prod_decision["allow"] == shadow_decision["allow"]
})
# Alert on divergence
if prod_decision["allow"] != shadow_decision["allow"]:
logger.info(
f"Shadow policy divergence: {shadow_policy}",
extra={
"command": command.id,
"prod": prod_decision["allow"],
"shadow": shadow_decision["allow"],
"prod_reason": prod_decision.get("deny_reason"),
"shadow_reason": shadow_decision.get("deny_reason")
}
)
# Execute based on production policy only
return prod_decision
Shadow Graduation
Promote shadow policies to production when metrics show safety:
# Promotion criteria
shadow_graduation:
hvac.setpoint.v2:
required_metrics:
- agreement_rate > 95%
- would_deny_unsafe > 0
- eval_count > 10000
required_duration: 7d
approval_required: true
rollback_plan: revert to v1
Layer 3: Approval Gates
High-impact actions require human-in-the-loop approval.
Approval Triggers
Actions requiring approval:
- Emergency overrides: Bypassing standard safety policies
- Large setpoint changes: > 5°F temperature adjustment
- Access control changes: Modifying user permissions or door schedules
- Policy changes: Updating OPA policies in production
- System mode changes: Emergency mode, maintenance mode
Approval Workflow
stateDiagram-v2
[*] --> PendingApproval: Command Issued
PendingApproval --> Approved: Human Approves
PendingApproval --> Rejected: Human Rejects
PendingApproval --> Expired: Timeout (5min)
Approved --> Executing: Command Sent
Executing --> Success: Actuated
Executing --> Failed: Error
Rejected --> [*]
Expired --> [*]
Success --> [*]
Failed --> [*]
Approval Implementation
from citadel.approval import ApprovalGate, ApprovalRequest
async def execute_high_impact_command(command: Command):
# Check if approval required
if requires_approval(command):
# Create approval request
approval = ApprovalRequest(
command_id=command.id,
action=command.action,
target=command.target_id,
params=command.params,
risk_level="high",
justification=get_justification(command),
timeout_seconds=300 # 5 minutes
)
# Submit to approval gate
gate = ApprovalGate()
result = await gate.request_approval(approval)
if result.approved:
# Record approval in command
command.safety_token = result.approval_token
logger.info(
f"Command approved by {result.approver}",
extra={"command_id": command.id}
)
else:
raise PermissionError(
f"Command rejected: {result.rejection_reason}"
)
# Execute command with safety token
await actuate(command)
Approval UI
Approvers receive rich context:
{
"approval_request_id": "01HQZXYZ9ABCDEF",
"command_id": "01HQZXYZ9GHIJKL",
"timestamp": "2025-09-30T15:30:00Z",
"action": "Emergency Override: Unlock All Doors",
"target": "building_a.all_doors",
"risk_level": "critical",
"justification": "Fire alarm detected in Zone 3",
"policy_explain": {
"production_policy": "citadel.security.door",
"decision": "deny",
"reason": "Emergency egress requires override",
"constraints": ["no_remote_unlock_all"]
},
"related_incidents": ["incident_01HQZXYZ9MNOPQR"],
"approvers": ["ops.manager", "security.lead"],
"timeout": 300
}
Layer 4: Circuit Breakers
Automatic rollback on anomaly detection protects against cascading failures.
Circuit Breaker States
stateDiagram-v2
[*] --> Closed: Normal Operation
Closed --> Open: Failure Threshold
Open --> HalfOpen: Timeout
HalfOpen --> Closed: Success
HalfOpen --> Open: Failure
Closed: Requests Pass Through
Open: Fail Fast (Return Error)
HalfOpen: Test Single Request
Circuit Breaker Configuration
from citadel.safety import CircuitBreaker
# HVAC setpoint circuit breaker
hvac_breaker = CircuitBreaker(
name="hvac_setpoint",
failure_threshold=5, # Open after 5 failures
success_threshold=2, # Close after 2 successes
timeout_seconds=60, # Half-open after 60s
exceptions=[
DeviceUnreachableError,
SetpointOutOfBoundsError
]
)
@hvac_breaker.protect
async def write_hvac_setpoint(entity_id: str, value: float):
# This function is protected by circuit breaker
command = Command(
target_id=entity_id,
action="write_setpoint",
params={"value": str(value)}
)
return await adapter.execute(command)
Anomaly Detection
ML-based anomaly detection triggers circuit breakers:
from citadel.safety import AnomalyDetector
detector = AnomalyDetector(
model="isolation_forest",
features=["setpoint_delta", "execution_time", "error_rate"],
threshold=0.95
)
async def execute_with_anomaly_check(command: Command):
# Extract features
features = {
"setpoint_delta": abs(current_setpoint - target_setpoint),
"execution_time": last_execution_time_ms,
"error_rate": recent_error_rate
}
# Check for anomaly
is_anomaly, score = detector.predict(features)
if is_anomaly:
logger.warning(
f"Anomaly detected for command {command.id}",
extra={"score": score, "features": features}
)
# Trigger circuit breaker
raise AnomalyDetectedError(
f"Anomaly score {score} exceeds threshold"
)
# Execute normally
return await execute(command)
Layer 5: Rollback and Recovery
Automated rollback on policy violations or failures.
Rollback Strategy
from citadel.safety import RollbackManager
rollback = RollbackManager()
async def execute_with_rollback(command: Command):
# Capture current state
snapshot = await capture_state(command.target_id)
rollback.save_snapshot(command.id, snapshot)
try:
# Execute command
result = await execute(command)
# Verify result
new_state = await get_state(command.target_id)
if not verify_state(new_state, command):
raise StateVerificationError("State verification failed")
return result
except Exception as e:
logger.error(
f"Command failed, initiating rollback",
extra={"command_id": command.id, "error": str(e)}
)
# Rollback to previous state
await rollback.restore_snapshot(command.id)
# Publish rollback event
await event_bus.publish(CloudEvent(
type="citadel.safety.rollback",
source="safety-manager",
subject=command.target_id,
data={"command_id": command.id, "reason": str(e)}
))
raise
Safety Scorecard
Track safety metrics across all operations:
# Safety metrics dashboard
safety_metrics = {
"policy_evaluations_total": 125_430,
"policy_denials_total": 3_245,
"policy_denial_rate": 0.026,
"shadow_divergence_rate": 0.012,
"shadow_would_block_unsafe": 42,
"approvals_requested": 156,
"approvals_granted": 142,
"approvals_rejected": 8,
"approvals_expired": 6,
"circuit_breaker_opens": 3,
"rollbacks_executed": 2,
"safe_days": 47, # Days since last safety incident
}
Compliance Integration
Safety policies map to compliance requirements:
# Policy compliance mapping
policies:
citadel.security.door:
compliance_frameworks:
- IEC-62443: SC-3.3 (Access Control)
- NIST-800-82: AC-2 (Account Management)
controls:
- Emergency egress never blocked
- Audit trail for all access changes
- Multi-person approval for bulk changes
citadel.hvac.setpoint:
compliance_frameworks:
- ASHRAE-90.1: Temperature setpoint limits
- ISO-50001: Energy management constraints
controls:
- Temperature bounds enforced
- Occupancy-based optimization
- Demand response compliance
Related Documentation
- Protocol Strategy - Safety tokens and signed commands
- Identity Foundation - SPIFFE-based authentication
- Observability - Safety metrics and alerting
- Agent Topology - Agent safety integration