Skip to main content

Safety Guardrails and Policy Architecture

Autonomous building operations demand rigorous safety guarantees. CitadelMesh implements defense-in-depth safety guardrails using policy-as-code, shadow mode evaluation, multi-step approvals, and circuit breakers. This document explains the safety architecture and fail-safe design.

Safety Philosophy

Safe by Default, Autonomous by Permission

Every control action in CitadelMesh must pass through multiple safety layers:

  1. Static Policies: Hard-coded constraints (e.g., temperature bounds, emergency egress)
  2. OPA Policies: Dynamic policy evaluation with explain capability
  3. Shadow Mode: Test new policies without actuation
  4. Human Approvals: Multi-step gates for high-impact actions
  5. Circuit Breakers: Automatic rollback on anomaly detection

No single failure can cause unsafe building operations.

Safety Architecture

graph TB
Agent[Agent] --> Draft[Draft Command]
Draft --> Policy[OPA Policy Engine]
Policy --> Decision{Allow?}

Decision -->|Deny| Audit1[Audit: Blocked]
Decision -->|Allow + Shadow| Shadow[Shadow Evaluation]
Decision -->|Allow + Production| Token[Issue Safety Token]

Shadow --> Metrics[Metrics: Would-Allow/Deny]
Shadow --> Audit2[Audit: Shadow Result]

Token --> Approval{High Impact?}
Approval -->|Yes| Human[Human Approval Gate]
Approval -->|No| Execute[Execute Command]

Human -->|Approved| Execute
Human -->|Rejected| Audit3[Audit: Rejected]

Execute --> Monitor[Monitor Result]
Monitor --> CB{Circuit Breaker?}
CB -->|Anomaly| Rollback[Rollback + Alert]
CB -->|Normal| Success[Success]

Layer 1: OPA Policy Engine

All control commands are evaluated by Open Policy Agent (OPA) using Rego policies.

Policy Structure

Policies are organized by domain:

policies/
├── security/
│ ├── door_access.rego
│ ├── camera_control.rego
│ └── intrusion_response.rego
├── hvac/
│ ├── setpoint_control.rego
│ ├── occupancy_modes.rego
│ └── demand_response.rego
├── lighting/
│ ├── scene_control.rego
│ └── emergency_lighting.rego
└── common/
├── time_constraints.rego
└── emergency_override.rego

Example: HVAC Setpoint Policy

# policies/hvac/setpoint_control.rego
package citadel.hvac.setpoint

import rego.v1

# Default deny all actions
default allow := false

# Temperature limits (Fahrenheit)
min_temp := 65
max_temp := 78

# Allow setpoint changes within safe range
allow if {
input.action == "write_setpoint"
input.value >= min_temp
input.value <= max_temp
valid_entity_id
not emergency_mode
}

# Validate entity ID format
valid_entity_id if {
startswith(input.entity_id, "hvac.")
count(split(input.entity_id, ".")) >= 3
}

# Check for emergency lockout
emergency_mode if {
data.system_state.emergency_active == true
}

# Read operations always allowed
allow if {
input.action == "read_point"
}

# Deny reasons for explain
deny_reason := "Temperature setpoint outside safe range" if {
input.action == "write_setpoint"
not (input.value >= min_temp && input.value <= max_temp)
}

deny_reason := "Invalid entity ID format" if {
input.action == "write_setpoint"
not valid_entity_id
}

deny_reason := "System in emergency mode" if {
input.action == "write_setpoint"
emergency_mode
}

# Time-based constraints (night setback)
allow if {
input.action == "write_setpoint"
is_nighttime
input.value >= 60
input.value <= max_temp
valid_entity_id
not emergency_mode
}

is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour >= 22
}

is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour <= 6
}

Policy Evaluation Flow

from opa_client import OPAClient
from citadel.v1 import commands_pb2

# Agent creates command
command = commands_pb2.Command(
id=ulid(),
target_id="hvac.zone1.setpoint",
action="write_setpoint",
params={"value": "72"},
issued_by="spiffe://citadel.mesh/energy-agent"
)

# Evaluate against OPA policy
opa = OPAClient("http://opa:8181")
decision = await opa.evaluate(
policy="citadel.hvac.setpoint",
input={
"action": command.action,
"entity_id": command.target_id,
"value": float(command.params["value"]),
"issued_by": command.issued_by
},
explain=True
)

if decision["allow"]:
# Issue safety token
command.safety_token = opa.issue_token(decision)
await execute_command(command)
else:
# Log denial with explanation
logger.warning(
f"Command denied: {decision.get('deny_reason')}",
extra={
"command_id": command.id,
"explain_trace": decision["explain"]
}
)

Policy Testing

All policies have comprehensive test suites:

# policies/hvac/setpoint_control_test.rego
package citadel.hvac.setpoint

test_allow_valid_setpoint {
allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 72
} with data.system_state as {"emergency_active": false}
}

test_deny_too_hot {
not allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 85
}
}

test_deny_emergency_mode {
not allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 72
} with data.system_state as {"emergency_active": true}
}

test_allow_nighttime_setback {
allow with input as {
"action": "write_setpoint",
"entity_id": "hvac.zone1.setpoint",
"value": 62
} with time.clock as [[23, 0, 0]]
with data.system_state as {"emergency_active": false}
}

Run tests in CI:

# Validate all policies
opa test policies/ -v

# Check for coverage
opa test policies/ --coverage

Layer 2: Shadow Mode

New policies and learning controllers can be evaluated in shadow mode without affecting real systems.

Shadow Mode Architecture

sequenceDiagram
participant Agent
participant OPA
participant Metrics
participant Actuator

Agent->>OPA: Evaluate (mode=shadow)
OPA->>OPA: Run production policy
OPA->>OPA: Run shadow policy
OPA-->>Agent: Production: allow<br/>Shadow: deny
OPA->>Metrics: Record shadow decision
Agent->>Actuator: Execute (production allowed)
Agent->>Metrics: Record would-block

Shadow Policy Configuration

# config/policies/shadow.yaml
shadow_policies:
- name: hvac.setpoint.v2
base_policy: hvac.setpoint.v1
changes:
- Tighter temperature bounds (67-76°F)
- Occupancy-based overrides
duration: 7d
metrics:
- would_allow_rate
- would_deny_rate
- agreement_rate

- name: security.door.ml_anomaly
base_policy: security.door.v1
changes:
- Add ML anomaly detection
duration: 14d
alert_on_divergence: true

Shadow Evaluation

async def evaluate_with_shadow(command: Command, shadow_policy: str):
# Evaluate production policy
prod_decision = await opa.evaluate(
policy="citadel.hvac.setpoint.v1",
input=command_to_input(command)
)

# Evaluate shadow policy
shadow_decision = await opa.evaluate(
policy=shadow_policy,
input=command_to_input(command)
)

# Record metrics
metrics.counter("policy_shadow_evaluation", {
"policy": shadow_policy,
"prod_allow": prod_decision["allow"],
"shadow_allow": shadow_decision["allow"],
"agreement": prod_decision["allow"] == shadow_decision["allow"]
})

# Alert on divergence
if prod_decision["allow"] != shadow_decision["allow"]:
logger.info(
f"Shadow policy divergence: {shadow_policy}",
extra={
"command": command.id,
"prod": prod_decision["allow"],
"shadow": shadow_decision["allow"],
"prod_reason": prod_decision.get("deny_reason"),
"shadow_reason": shadow_decision.get("deny_reason")
}
)

# Execute based on production policy only
return prod_decision

Shadow Graduation

Promote shadow policies to production when metrics show safety:

# Promotion criteria
shadow_graduation:
hvac.setpoint.v2:
required_metrics:
- agreement_rate > 95%
- would_deny_unsafe > 0
- eval_count > 10000
required_duration: 7d
approval_required: true
rollback_plan: revert to v1

Layer 3: Approval Gates

High-impact actions require human-in-the-loop approval.

Approval Triggers

Actions requiring approval:

  1. Emergency overrides: Bypassing standard safety policies
  2. Large setpoint changes: > 5°F temperature adjustment
  3. Access control changes: Modifying user permissions or door schedules
  4. Policy changes: Updating OPA policies in production
  5. System mode changes: Emergency mode, maintenance mode

Approval Workflow

stateDiagram-v2
[*] --> PendingApproval: Command Issued

PendingApproval --> Approved: Human Approves
PendingApproval --> Rejected: Human Rejects
PendingApproval --> Expired: Timeout (5min)

Approved --> Executing: Command Sent
Executing --> Success: Actuated
Executing --> Failed: Error

Rejected --> [*]
Expired --> [*]
Success --> [*]
Failed --> [*]

Approval Implementation

from citadel.approval import ApprovalGate, ApprovalRequest

async def execute_high_impact_command(command: Command):
# Check if approval required
if requires_approval(command):
# Create approval request
approval = ApprovalRequest(
command_id=command.id,
action=command.action,
target=command.target_id,
params=command.params,
risk_level="high",
justification=get_justification(command),
timeout_seconds=300 # 5 minutes
)

# Submit to approval gate
gate = ApprovalGate()
result = await gate.request_approval(approval)

if result.approved:
# Record approval in command
command.safety_token = result.approval_token
logger.info(
f"Command approved by {result.approver}",
extra={"command_id": command.id}
)
else:
raise PermissionError(
f"Command rejected: {result.rejection_reason}"
)

# Execute command with safety token
await actuate(command)

Approval UI

Approvers receive rich context:

{
"approval_request_id": "01HQZXYZ9ABCDEF",
"command_id": "01HQZXYZ9GHIJKL",
"timestamp": "2025-09-30T15:30:00Z",
"action": "Emergency Override: Unlock All Doors",
"target": "building_a.all_doors",
"risk_level": "critical",
"justification": "Fire alarm detected in Zone 3",
"policy_explain": {
"production_policy": "citadel.security.door",
"decision": "deny",
"reason": "Emergency egress requires override",
"constraints": ["no_remote_unlock_all"]
},
"related_incidents": ["incident_01HQZXYZ9MNOPQR"],
"approvers": ["ops.manager", "security.lead"],
"timeout": 300
}

Layer 4: Circuit Breakers

Automatic rollback on anomaly detection protects against cascading failures.

Circuit Breaker States

stateDiagram-v2
[*] --> Closed: Normal Operation

Closed --> Open: Failure Threshold
Open --> HalfOpen: Timeout
HalfOpen --> Closed: Success
HalfOpen --> Open: Failure

Closed: Requests Pass Through
Open: Fail Fast (Return Error)
HalfOpen: Test Single Request

Circuit Breaker Configuration

from citadel.safety import CircuitBreaker

# HVAC setpoint circuit breaker
hvac_breaker = CircuitBreaker(
name="hvac_setpoint",
failure_threshold=5, # Open after 5 failures
success_threshold=2, # Close after 2 successes
timeout_seconds=60, # Half-open after 60s
exceptions=[
DeviceUnreachableError,
SetpointOutOfBoundsError
]
)

@hvac_breaker.protect
async def write_hvac_setpoint(entity_id: str, value: float):
# This function is protected by circuit breaker
command = Command(
target_id=entity_id,
action="write_setpoint",
params={"value": str(value)}
)
return await adapter.execute(command)

Anomaly Detection

ML-based anomaly detection triggers circuit breakers:

from citadel.safety import AnomalyDetector

detector = AnomalyDetector(
model="isolation_forest",
features=["setpoint_delta", "execution_time", "error_rate"],
threshold=0.95
)

async def execute_with_anomaly_check(command: Command):
# Extract features
features = {
"setpoint_delta": abs(current_setpoint - target_setpoint),
"execution_time": last_execution_time_ms,
"error_rate": recent_error_rate
}

# Check for anomaly
is_anomaly, score = detector.predict(features)

if is_anomaly:
logger.warning(
f"Anomaly detected for command {command.id}",
extra={"score": score, "features": features}
)

# Trigger circuit breaker
raise AnomalyDetectedError(
f"Anomaly score {score} exceeds threshold"
)

# Execute normally
return await execute(command)

Layer 5: Rollback and Recovery

Automated rollback on policy violations or failures.

Rollback Strategy

from citadel.safety import RollbackManager

rollback = RollbackManager()

async def execute_with_rollback(command: Command):
# Capture current state
snapshot = await capture_state(command.target_id)
rollback.save_snapshot(command.id, snapshot)

try:
# Execute command
result = await execute(command)

# Verify result
new_state = await get_state(command.target_id)
if not verify_state(new_state, command):
raise StateVerificationError("State verification failed")

return result

except Exception as e:
logger.error(
f"Command failed, initiating rollback",
extra={"command_id": command.id, "error": str(e)}
)

# Rollback to previous state
await rollback.restore_snapshot(command.id)

# Publish rollback event
await event_bus.publish(CloudEvent(
type="citadel.safety.rollback",
source="safety-manager",
subject=command.target_id,
data={"command_id": command.id, "reason": str(e)}
))

raise

Safety Scorecard

Track safety metrics across all operations:

# Safety metrics dashboard
safety_metrics = {
"policy_evaluations_total": 125_430,
"policy_denials_total": 3_245,
"policy_denial_rate": 0.026,

"shadow_divergence_rate": 0.012,
"shadow_would_block_unsafe": 42,

"approvals_requested": 156,
"approvals_granted": 142,
"approvals_rejected": 8,
"approvals_expired": 6,

"circuit_breaker_opens": 3,
"rollbacks_executed": 2,

"safe_days": 47, # Days since last safety incident
}

Compliance Integration

Safety policies map to compliance requirements:

# Policy compliance mapping
policies:
citadel.security.door:
compliance_frameworks:
- IEC-62443: SC-3.3 (Access Control)
- NIST-800-82: AC-2 (Account Management)
controls:
- Emergency egress never blocked
- Audit trail for all access changes
- Multi-person approval for bulk changes

citadel.hvac.setpoint:
compliance_frameworks:
- ASHRAE-90.1: Temperature setpoint limits
- ISO-50001: Energy management constraints
controls:
- Temperature bounds enforced
- Occupancy-based optimization
- Demand response compliance

See Also