Skip to main content

Writing Safety Policies

This guide shows you how to create custom OPA policies for CitadelMesh, with real-world examples and best practices.

Policy Template

Use this template as a starting point for all policies:

# [Policy Name] - [Brief Description]
# Purpose: [What this policy protects]
# Scope: [What actions/systems this applies to]

package citadel.[domain].[subdomain]

import rego.v1

# Default deny for safety
default allow := false

# --- ALLOW RULES ---

# [Rule 1: Description]
allow if {
input.action == "[action_name]"
[condition_1]
[condition_2]
# Add more conditions as needed
}

# [Rule 2: Description with different conditions]
allow if {
input.action == "[action_name]"
[alternative_conditions]
}

# --- HELPER RULES ---

[helper_rule_name] if {
# Helper logic
}

# --- VIOLATION REASONS ---

violation_reason := "[Helpful error message]" if {
not allow
input.action == "[action_name]"
# Condition that caused denial
}

# --- METADATA ---

policy_metadata := {
"version": "1.0.0",
"author": "Your Name",
"last_updated": "2025-01-15",
"severity": "high|medium|low"
}

Real-World Examples

Example 1: Access Control Policy

Use Case: Secure door access with multiple safety layers

File: policies/access/door_control.rego

# Door Access Control Policy
# Purpose: Prevent unauthorized or unsafe door operations
# Scope: All door lock/unlock operations

package citadel.access.door_control

import rego.v1

default allow := false

# Emergency override - always allow for fire/safety
allow if {
input.action == "emergency_unlock"
input.priority == "PRIORITY_CRITICAL"
input.emergency_reason != ""
}

# Normal door unlock
allow if {
input.action == "door_unlock"

# Basic validation
valid_door_id
valid_duration

# Time restrictions
within_operating_hours

# Authorization
authorized_agent

# Rate limiting
not rate_limit_exceeded
}

# Door lock (always safer to allow)
allow if {
input.action == "door_lock"
valid_door_id
authorized_agent
}

# Door status check (read-only, always safe)
allow if {
input.action == "door_status"
valid_door_id
}

# --- HELPER RULES ---

valid_door_id if {
startswith(input.door_id, "door-")
count(split(input.door_id, "-")) >= 2
}

valid_duration if {
input.duration_seconds > 0
input.duration_seconds <= 900 # Max 15 minutes
}

within_operating_hours if {
hour := time.clock(time.now_ns())[0]
hour >= 6 # 6 AM
hour <= 23 # 11 PM
}

# After-hours access requires higher authorization
allow if {
input.action == "door_unlock"
not within_operating_hours
input.authorization_level >= 3
valid_door_id
valid_duration
}

authorized_agent if {
input.agent_id != ""
# In production, check against data.authorized_agents
}

rate_limit_exceeded if {
recent_unlocks := [cmd |
cmd := data.recent_commands[_]
cmd.agent_id == input.agent_id
cmd.action == "door_unlock"
cmd.timestamp > (time.now_ns() - 1800000000000) # Last 30 min
]
count(recent_unlocks) >= 5
}

# --- VIOLATION REASONS ---

violation_reason := "Door unlock denied: invalid duration (max 900 seconds)" if {
not allow
input.action == "door_unlock"
not valid_duration
}

violation_reason := "Door unlock denied: outside operating hours (6 AM - 11 PM)" if {
not allow
input.action == "door_unlock"
not within_operating_hours
input.authorization_level < 3
}

violation_reason := "Door unlock denied: rate limit exceeded (5 per 30 min)" if {
not allow
input.action == "door_unlock"
rate_limit_exceeded
}

violation_reason := "Invalid door ID format" if {
not allow
not valid_door_id
}

policy_metadata := {
"version": "1.0.0",
"domain": "access_control",
"severity": "high"
}

Example 2: Energy Optimization Policy

Use Case: Prevent wasteful HVAC operations while maintaining comfort

File: policies/energy/hvac_optimization.rego

# HVAC Energy Optimization Policy
# Purpose: Balance comfort and energy efficiency
# Scope: Temperature setpoints and HVAC mode changes

package citadel.energy.hvac_optimization

import rego.v1

default allow := false

# Temperature limits by zone type
temp_limits := {
"office": {"min": 68, "max": 76},
"lobby": {"min": 65, "max": 78},
"datacenter": {"min": 68, "max": 72},
"warehouse": {"min": 60, "max": 80}
}

# Allow setpoint changes within zone-specific limits
allow if {
input.action == "set_temperature"

zone_type := get_zone_type(input.zone_id)
limits := temp_limits[zone_type]

input.temperature >= limits.min
input.temperature <= limits.max

zone_occupied(input.zone_id)
not excessive_change
}

# Unoccupied zones - allow setback
allow if {
input.action == "set_temperature"
not zone_occupied(input.zone_id)

# Wider range for unoccupied spaces
input.temperature >= 60
input.temperature <= 85
}

# Peak demand - restrict to energy-saving temps
allow if {
input.action == "set_temperature"
peak_demand_active

# Stricter limits during peak
input.temperature >= 72 # Higher cooling setpoint
input.temperature <= 74

zone_occupied(input.zone_id)
}

# Allow mode changes with validation
allow if {
input.action == "set_hvac_mode"
input.mode in ["auto", "cool", "heat", "off"]

valid_mode_change
}

# --- HELPER RULES ---

get_zone_type(zone_id) := zone_type if {
zone_data := data.zones[zone_id]
zone_type := zone_data.type
}

get_zone_type(zone_id) := "office" if {
# Default to office if no data
not data.zones[zone_id]
}

zone_occupied(zone_id) if {
zone_data := data.zones[zone_id]
zone_data.occupancy_count > 0
}

zone_occupied(zone_id) if {
# Assume occupied if no data (safer)
not data.zones[zone_id]
}

excessive_change if {
current_temp := data.zones[input.zone_id].current_temperature
abs(input.temperature - current_temp) > 5 # Max 5°F change
}

peak_demand_active if {
data.grid_state.peak_demand == true
}

peak_demand_active if {
# Peak hours: 2 PM - 7 PM on weekdays
hour := time.clock(time.now_ns())[0]
hour >= 14
hour < 19
weekday
}

weekday if {
day := time.weekday(time.now_ns())
day != "Saturday"
day != "Sunday"
}

valid_mode_change if {
# Don't allow switching to heat in summer or cool in winter
month := time.clock(time.now_ns())[1]

# Summer months (6-9): no heat mode
not (input.mode == "heat" && month >= 6 && month <= 9)

# Winter months (12-3): no cool mode
not (input.mode == "cool" && (month == 12 || month <= 3))
}

# --- VIOLATION REASONS ---

violation_reason := sprintf("Temperature outside safe range for %v zone (min: %v, max: %v)",
[zone_type, limits.min, limits.max]) if {
not allow
input.action == "set_temperature"
zone_type := get_zone_type(input.zone_id)
limits := temp_limits[zone_type]
}

violation_reason := "Excessive temperature change (max 5°F from current)" if {
not allow
input.action == "set_temperature"
excessive_change
}

violation_reason := "Peak demand active: use energy-saving setpoints (72-74°F)" if {
not allow
input.action == "set_temperature"
peak_demand_active
}

violation_reason := "Invalid mode for current season" if {
not allow
input.action == "set_hvac_mode"
not valid_mode_change
}

policy_metadata := {
"version": "1.0.0",
"domain": "energy_optimization",
"severity": "medium",
"energy_impact": "high"
}

Example 3: Safety Interlock Policy

Use Case: Prevent conflicting or unsafe simultaneous operations

File: policies/safety/interlocks.rego

# Safety Interlock Policy
# Purpose: Prevent hazardous combinations of actions
# Scope: All building control systems

package citadel.safety.interlocks

import rego.v1

default allow := false

# Check for active interlocks before allowing any action
allow if {
not has_active_interlock
input.action != ""
}

# Interlock 1: Don't unlock doors during fire alarm
has_active_interlock if {
input.action == "door_unlock"
data.system_state.fire_alarm_active == true
}

# Interlock 2: Don't disable cameras during security incident
has_active_interlock if {
input.action == "camera_disable"
data.system_state.security_incident_active == true
}

# Interlock 3: Don't adjust HVAC during equipment maintenance
has_active_interlock if {
input.action in ["set_temperature", "set_hvac_mode", "override_schedule"]
zone_under_maintenance(input.zone_id)
}

# Interlock 4: Don't operate conflicting systems simultaneously
has_active_interlock if {
input.action == "enable_cooling"
heating_active(input.zone_id)
}

has_active_interlock if {
input.action == "enable_heating"
cooling_active(input.zone_id)
}

# Interlock 5: Maximum concurrent operations limit
has_active_interlock if {
high_risk_action(input.action)
concurrent_high_risk_ops >= 3
}

# --- HELPER RULES ---

zone_under_maintenance(zone_id) if {
maintenance := data.maintenance_schedule[_]
maintenance.zone_id == zone_id
maintenance.active == true
}

heating_active(zone_id) if {
zone := data.zones[zone_id]
zone.hvac_mode == "heat"
zone.heating_output > 0
}

cooling_active(zone_id) if {
zone := data.zones[zone_id]
zone.hvac_mode == "cool"
zone.cooling_output > 0
}

high_risk_action(action) if {
action in [
"emergency_unlock",
"override_hvac",
"disable_camera",
"silence_alarm",
"manual_override"
]
}

concurrent_high_risk_ops := count(active_high_risk) if {
active_high_risk := [cmd |
cmd := data.active_commands[_]
high_risk_action(cmd.action)
]
}

# --- VIOLATION REASONS ---

violation_reason := "Interlock active: Cannot unlock doors during fire alarm" if {
not allow
input.action == "door_unlock"
data.system_state.fire_alarm_active == true
}

violation_reason := "Interlock active: Cannot disable cameras during security incident" if {
not allow
input.action == "camera_disable"
data.system_state.security_incident_active == true
}

violation_reason := sprintf("Interlock active: Zone %v under maintenance", [input.zone_id]) if {
not allow
zone_under_maintenance(input.zone_id)
}

violation_reason := "Interlock active: Cannot enable cooling while heating is active" if {
not allow
input.action == "enable_cooling"
heating_active(input.zone_id)
}

violation_reason := "Interlock active: Too many concurrent high-risk operations (max 3)" if {
not allow
high_risk_action(input.action)
concurrent_high_risk_ops >= 3
}

policy_metadata := {
"version": "1.0.0",
"domain": "safety",
"severity": "critical",
"type": "interlock"
}

Common Policy Patterns

1. Time-Based Restrictions

# Business hours check
business_hours if {
hour := time.clock(time.now_ns())[0]
day := time.weekday(time.now_ns())

hour >= 8
hour < 18
day != "Saturday"
day != "Sunday"
}

# Holiday check (requires holiday data)
is_holiday if {
current_date := time.format(time.now_ns())
holiday := data.holidays[_]
holiday.date == current_date
}

2. Role-Based Access

# Define role hierarchy
roles := {
"viewer": 1,
"operator": 2,
"admin": 3,
"superadmin": 4
}

# Check if user has required role level
has_role_level(required_level) if {
user_role := input.user_role
user_level := roles[user_role]
user_level >= required_level
}

allow if {
input.action == "critical_operation"
has_role_level(3) # Requires admin or higher
}

3. Geofencing

# Allow only if request originates from facility
on_facility_network if {
ip := input.source_ip
startswith(ip, "192.168.1.") # Facility IP range
}

allow if {
on_facility_network
# other conditions...
}

4. Cascading Fallbacks

# Try specific rule first, fall back to general
allow if {
specific_zone_rule
}

allow if {
not specific_zone_rule
general_rule
}

specific_zone_rule if {
input.zone_id == "datacenter"
# stricter conditions
}

general_rule if {
# normal conditions
}

Integration with Safety Service

Policies are enforced via the CitadelMesh Safety service:

// C# Safety Service calling OPA
public async Task<PolicyResult> EvaluatePolicy(
string policyPath,
object input)
{
var response = await _httpClient.PostAsJsonAsync(
$"{_opaUrl}/v1/data/{policyPath}",
new { input }
);

var result = await response.Content
.ReadFromJsonAsync<OpaResponse>();

return new PolicyResult
{
Allowed = result.Result.Allow,
ViolationReason = result.Result.ViolationReason
};
}

Policy Versioning

Track policy versions for auditability:

policy_metadata := {
"version": "2.1.0",
"changelog": [
"v2.1.0: Added peak demand restrictions",
"v2.0.0: Breaking change - new temp limits",
"v1.0.0: Initial release"
],
"compatibility": {
"min_agent_version": "1.2.0",
"min_safety_service_version": "1.1.0"
}
}

Testing Your Policy

Create a test file alongside your policy:

File: policies/access/door_control_test.rego

package citadel.access.door_control

import rego.v1

# Test: Allow valid unlock
test_allow_valid_unlock if {
allow with input as {
"action": "door_unlock",
"door_id": "door-lobby-main",
"duration_seconds": 300,
"agent_id": "security-agent-001"
} with time.now_ns as mock_time_10am
}

# Test: Deny excessive duration
test_deny_excessive_duration if {
not allow with input as {
"action": "door_unlock",
"door_id": "door-lobby-main",
"duration_seconds": 1000,
"agent_id": "security-agent-001"
}
}

# Test: Emergency override
test_allow_emergency_override if {
allow with input as {
"action": "emergency_unlock",
"priority": "PRIORITY_CRITICAL",
"emergency_reason": "Fire evacuation",
"door_id": "door-emergency-exit"
}
}

# Mock time helpers
mock_time_10am := 1704020400000000000 # 10 AM timestamp
mock_time_2am := 1703991600000000000 # 2 AM timestamp

Run tests:

opa test policies/access/

Best Practices

1. Use Descriptive Names

# ✅ Good
allow if {
valid_temperature_range
zone_is_occupied
not emergency_lockout
}

# ❌ Bad
allow if {
check1
check2
not flag
}

2. Document Complex Logic

# Calculate time-of-use pricing tier
pricing_tier := tier if {
# Peak hours (2-7 PM weekdays): Tier 3 (highest cost)
# Off-peak (10 PM - 6 AM): Tier 1 (lowest cost)
# Mid-peak (all other times): Tier 2

hour := time.clock(time.now_ns())[0]
tier := determine_tier(hour, time.weekday(time.now_ns()))
}

3. Fail Safe

# ✅ Good - assume occupied if no data
zone_occupied(zone_id) if {
zone_data := data.zones[zone_id]
zone_data.occupancy_count > 0
}

zone_occupied(zone_id) if {
not data.zones[zone_id] # No data = assume occupied (safer)
}

4. Audit Everything

# Include metadata in every decision
decision := {
"allow": allow,
"violation_reason": violation_reason,
"policy_version": policy_metadata.version,
"evaluated_at": time.now_ns(),
"input": input
}

Next Steps

Policy Checklist

Before deploying a policy, ensure:

  • Default deny is set
  • All rules have descriptive names
  • Violation reasons are helpful
  • Edge cases are handled
  • Tests cover success and failure cases
  • Policy metadata is complete
  • Documentation explains intent
  • Fail-safe fallbacks exist

Write safe, testable policies that protect your building! Continue to Testing Policies.