Writing Safety Policies
This guide shows you how to create custom OPA policies for CitadelMesh, with real-world examples and best practices.
Policy Template
Use this template as a starting point for all policies:
# [Policy Name] - [Brief Description]
# Purpose: [What this policy protects]
# Scope: [What actions/systems this applies to]
package citadel.[domain].[subdomain]
import rego.v1
# Default deny for safety
default allow := false
# --- ALLOW RULES ---
# [Rule 1: Description]
allow if {
    input.action == "[action_name]"
    [condition_1]
    [condition_2]
    # Add more conditions as needed
}
# [Rule 2: Description with different conditions]
allow if {
    input.action == "[action_name]"
    [alternative_conditions]
}
# --- HELPER RULES ---
[helper_rule_name] if {
    # Helper logic
}
# --- VIOLATION REASONS ---
violation_reason := "[Helpful error message]" if {
    not allow
    input.action == "[action_name]"
    # Condition that caused denial
}
# --- METADATA ---
policy_metadata := {
    "version": "1.0.0",
    "author": "Your Name",
    "last_updated": "2025-01-15",
    "severity": "high|medium|low"
}
Real-World Examples
Example 1: Access Control Policy
Use Case: Secure door access with multiple safety layers
File: policies/access/door_control.rego
# Door Access Control Policy
# Purpose: Prevent unauthorized or unsafe door operations
# Scope: All door lock/unlock operations
package citadel.access.door_control
import rego.v1
default allow := false
# Emergency override - always allow for fire/safety
allow if {
    input.action == "emergency_unlock"
    input.priority == "PRIORITY_CRITICAL"
    input.emergency_reason != ""
}
# Normal door unlock
allow if {
    input.action == "door_unlock"
    # Basic validation
    valid_door_id
    valid_duration
    # Time restrictions
    within_operating_hours
    # Authorization
    authorized_agent
    # Rate limiting
    not rate_limit_exceeded
}
# Door lock (always safer to allow)
allow if {
    input.action == "door_lock"
    valid_door_id
    authorized_agent
}
# Door status check (read-only, always safe)
allow if {
    input.action == "door_status"
    valid_door_id
}
# --- HELPER RULES ---
valid_door_id if {
    startswith(input.door_id, "door-")
    count(split(input.door_id, "-")) >= 2
}
valid_duration if {
    input.duration_seconds > 0
    input.duration_seconds <= 900  # Max 15 minutes
}
within_operating_hours if {
    hour := time.clock(time.now_ns())[0]
    hour >= 6   # 6 AM
    hour <= 23  # 11 PM
}
# After-hours access requires higher authorization
allow if {
    input.action == "door_unlock"
    not within_operating_hours
    input.authorization_level >= 3
    valid_door_id
    valid_duration
}
authorized_agent if {
    input.agent_id != ""
    # In production, check against data.authorized_agents
}
rate_limit_exceeded if {
    recent_unlocks := [cmd |
        cmd := data.recent_commands[_]
        cmd.agent_id == input.agent_id
        cmd.action == "door_unlock"
        cmd.timestamp > (time.now_ns() - 1800000000000)  # Last 30 min
    ]
    count(recent_unlocks) >= 5
}
# --- VIOLATION REASONS ---
violation_reason := "Door unlock denied: invalid duration (max 900 seconds)" if {
    not allow
    input.action == "door_unlock"
    not valid_duration
}
violation_reason := "Door unlock denied: outside operating hours (6 AM - 11 PM)" if {
    not allow
    input.action == "door_unlock"
    not within_operating_hours
    input.authorization_level < 3
}
violation_reason := "Door unlock denied: rate limit exceeded (5 per 30 min)" if {
    not allow
    input.action == "door_unlock"
    rate_limit_exceeded
}
violation_reason := "Invalid door ID format" if {
    not allow
    not valid_door_id
}
policy_metadata := {
    "version": "1.0.0",
    "domain": "access_control",
    "severity": "high"
}
Example 2: Energy Optimization Policy
Use Case: Prevent wasteful HVAC operations while maintaining comfort
File: policies/energy/hvac_optimization.rego
# HVAC Energy Optimization Policy
# Purpose: Balance comfort and energy efficiency
# Scope: Temperature setpoints and HVAC mode changes
package citadel.energy.hvac_optimization
import rego.v1
default allow := false
# Temperature limits by zone type
temp_limits := {
    "office": {"min": 68, "max": 76},
    "lobby": {"min": 65, "max": 78},
    "datacenter": {"min": 68, "max": 72},
    "warehouse": {"min": 60, "max": 80}
}
# Allow setpoint changes within zone-specific limits
allow if {
    input.action == "set_temperature"
    zone_type := get_zone_type(input.zone_id)
    limits := temp_limits[zone_type]
    input.temperature >= limits.min
    input.temperature <= limits.max
    zone_occupied(input.zone_id)
    not excessive_change
}
# Unoccupied zones - allow setback
allow if {
    input.action == "set_temperature"
    not zone_occupied(input.zone_id)
    # Wider range for unoccupied spaces
    input.temperature >= 60
    input.temperature <= 85
}
# Peak demand - restrict to energy-saving temps
allow if {
    input.action == "set_temperature"
    peak_demand_active
    # Stricter limits during peak
    input.temperature >= 72  # Higher cooling setpoint
    input.temperature <= 74
    zone_occupied(input.zone_id)
}
# Allow mode changes with validation
allow if {
    input.action == "set_hvac_mode"
    input.mode in ["auto", "cool", "heat", "off"]
    valid_mode_change
}
# --- HELPER RULES ---
get_zone_type(zone_id) := zone_type if {
    zone_data := data.zones[zone_id]
    zone_type := zone_data.type
}
get_zone_type(zone_id) := "office" if {
    # Default to office if no data
    not data.zones[zone_id]
}
zone_occupied(zone_id) if {
    zone_data := data.zones[zone_id]
    zone_data.occupancy_count > 0
}
zone_occupied(zone_id) if {
    # Assume occupied if no data (safer)
    not data.zones[zone_id]
}
excessive_change if {
    current_temp := data.zones[input.zone_id].current_temperature
    abs(input.temperature - current_temp) > 5  # Max 5°F change
}
peak_demand_active if {
    data.grid_state.peak_demand == true
}
peak_demand_active if {
    # Peak hours: 2 PM - 7 PM on weekdays
    hour := time.clock(time.now_ns())[0]
    hour >= 14
    hour < 19
    weekday
}
weekday if {
    day := time.weekday(time.now_ns())
    day != "Saturday"
    day != "Sunday"
}
valid_mode_change if {
    # Don't allow switching to heat in summer or cool in winter
    month := time.clock(time.now_ns())[1]
    # Summer months (6-9): no heat mode
    not (input.mode == "heat" && month >= 6 && month <= 9)
    # Winter months (12-3): no cool mode
    not (input.mode == "cool" && (month == 12 || month <= 3))
}
# --- VIOLATION REASONS ---
violation_reason := sprintf("Temperature outside safe range for %v zone (min: %v, max: %v)",
    [zone_type, limits.min, limits.max]) if {
    not allow
    input.action == "set_temperature"
    zone_type := get_zone_type(input.zone_id)
    limits := temp_limits[zone_type]
}
violation_reason := "Excessive temperature change (max 5°F from current)" if {
    not allow
    input.action == "set_temperature"
    excessive_change
}
violation_reason := "Peak demand active: use energy-saving setpoints (72-74°F)" if {
    not allow
    input.action == "set_temperature"
    peak_demand_active
}
violation_reason := "Invalid mode for current season" if {
    not allow
    input.action == "set_hvac_mode"
    not valid_mode_change
}
policy_metadata := {
    "version": "1.0.0",
    "domain": "energy_optimization",
    "severity": "medium",
    "energy_impact": "high"
}
Example 3: Safety Interlock Policy
Use Case: Prevent conflicting or unsafe simultaneous operations
File: policies/safety/interlocks.rego
# Safety Interlock Policy
# Purpose: Prevent hazardous combinations of actions
# Scope: All building control systems
package citadel.safety.interlocks
import rego.v1
default allow := false
# Check for active interlocks before allowing any action
allow if {
    not has_active_interlock
    input.action != ""
}
# Interlock 1: Don't unlock doors during fire alarm
has_active_interlock if {
    input.action == "door_unlock"
    data.system_state.fire_alarm_active == true
}
# Interlock 2: Don't disable cameras during security incident
has_active_interlock if {
    input.action == "camera_disable"
    data.system_state.security_incident_active == true
}
# Interlock 3: Don't adjust HVAC during equipment maintenance
has_active_interlock if {
    input.action in ["set_temperature", "set_hvac_mode", "override_schedule"]
    zone_under_maintenance(input.zone_id)
}
# Interlock 4: Don't operate conflicting systems simultaneously
has_active_interlock if {
    input.action == "enable_cooling"
    heating_active(input.zone_id)
}
has_active_interlock if {
    input.action == "enable_heating"
    cooling_active(input.zone_id)
}
# Interlock 5: Maximum concurrent operations limit
has_active_interlock if {
    high_risk_action(input.action)
    concurrent_high_risk_ops >= 3
}
# --- HELPER RULES ---
zone_under_maintenance(zone_id) if {
    maintenance := data.maintenance_schedule[_]
    maintenance.zone_id == zone_id
    maintenance.active == true
}
heating_active(zone_id) if {
    zone := data.zones[zone_id]
    zone.hvac_mode == "heat"
    zone.heating_output > 0
}
cooling_active(zone_id) if {
    zone := data.zones[zone_id]
    zone.hvac_mode == "cool"
    zone.cooling_output > 0
}
high_risk_action(action) if {
    action in [
        "emergency_unlock",
        "override_hvac",
        "disable_camera",
        "silence_alarm",
        "manual_override"
    ]
}
concurrent_high_risk_ops := count(active_high_risk) if {
    active_high_risk := [cmd |
        cmd := data.active_commands[_]
        high_risk_action(cmd.action)
    ]
}
# --- VIOLATION REASONS ---
violation_reason := "Interlock active: Cannot unlock doors during fire alarm" if {
    not allow
    input.action == "door_unlock"
    data.system_state.fire_alarm_active == true
}
violation_reason := "Interlock active: Cannot disable cameras during security incident" if {
    not allow
    input.action == "camera_disable"
    data.system_state.security_incident_active == true
}
violation_reason := sprintf("Interlock active: Zone %v under maintenance", [input.zone_id]) if {
    not allow
    zone_under_maintenance(input.zone_id)
}
violation_reason := "Interlock active: Cannot enable cooling while heating is active" if {
    not allow
    input.action == "enable_cooling"
    heating_active(input.zone_id)
}
violation_reason := "Interlock active: Too many concurrent high-risk operations (max 3)" if {
    not allow
    high_risk_action(input.action)
    concurrent_high_risk_ops >= 3
}
policy_metadata := {
    "version": "1.0.0",
    "domain": "safety",
    "severity": "critical",
    "type": "interlock"
}
Common Policy Patterns
1. Time-Based Restrictions
# Business hours check
business_hours if {
    hour := time.clock(time.now_ns())[0]
    day := time.weekday(time.now_ns())
    hour >= 8
    hour < 18
    day != "Saturday"
    day != "Sunday"
}
# Holiday check (requires holiday data)
is_holiday if {
    current_date := time.format(time.now_ns())
    holiday := data.holidays[_]
    holiday.date == current_date
}
2. Role-Based Access
# Define role hierarchy
roles := {
    "viewer": 1,
    "operator": 2,
    "admin": 3,
    "superadmin": 4
}
# Check if user has required role level
has_role_level(required_level) if {
    user_role := input.user_role
    user_level := roles[user_role]
    user_level >= required_level
}
allow if {
    input.action == "critical_operation"
    has_role_level(3)  # Requires admin or higher
}
3. Geofencing
# Allow only if request originates from facility
on_facility_network if {
    ip := input.source_ip
    startswith(ip, "192.168.1.")  # Facility IP range
}
allow if {
    on_facility_network
    # other conditions...
}
4. Cascading Fallbacks
# Try specific rule first, fall back to general
allow if {
    specific_zone_rule
}
allow if {
    not specific_zone_rule
    general_rule
}
specific_zone_rule if {
    input.zone_id == "datacenter"
    # stricter conditions
}
general_rule if {
    # normal conditions
}
Integration with Safety Service
Policies are enforced via the CitadelMesh Safety service:
// C# Safety Service calling OPA
public async Task<PolicyResult> EvaluatePolicy(
    string policyPath,
    object input)
{
    var response = await _httpClient.PostAsJsonAsync(
        $"{_opaUrl}/v1/data/{policyPath}",
        new { input }
    );
    var result = await response.Content
        .ReadFromJsonAsync<OpaResponse>();
    return new PolicyResult
    {
        Allowed = result.Result.Allow,
        ViolationReason = result.Result.ViolationReason
    };
}
Policy Versioning
Track policy versions for auditability:
policy_metadata := {
    "version": "2.1.0",
    "changelog": [
        "v2.1.0: Added peak demand restrictions",
        "v2.0.0: Breaking change - new temp limits",
        "v1.0.0: Initial release"
    ],
    "compatibility": {
        "min_agent_version": "1.2.0",
        "min_safety_service_version": "1.1.0"
    }
}
Testing Your Policy
Create a test file alongside your policy:
File: policies/access/door_control_test.rego
package citadel.access.door_control
import rego.v1
# Test: Allow valid unlock
test_allow_valid_unlock if {
    allow with input as {
        "action": "door_unlock",
        "door_id": "door-lobby-main",
        "duration_seconds": 300,
        "agent_id": "security-agent-001"
    } with time.now_ns as mock_time_10am
}
# Test: Deny excessive duration
test_deny_excessive_duration if {
    not allow with input as {
        "action": "door_unlock",
        "door_id": "door-lobby-main",
        "duration_seconds": 1000,
        "agent_id": "security-agent-001"
    }
}
# Test: Emergency override
test_allow_emergency_override if {
    allow with input as {
        "action": "emergency_unlock",
        "priority": "PRIORITY_CRITICAL",
        "emergency_reason": "Fire evacuation",
        "door_id": "door-emergency-exit"
    }
}
# Mock time helpers
mock_time_10am := 1704020400000000000  # 10 AM timestamp
mock_time_2am := 1703991600000000000   # 2 AM timestamp
Run tests:
opa test policies/access/
Best Practices
1. Use Descriptive Names
# ✅ Good
allow if {
    valid_temperature_range
    zone_is_occupied
    not emergency_lockout
}
# ❌ Bad
allow if {
    check1
    check2
    not flag
}
2. Document Complex Logic
# Calculate time-of-use pricing tier
pricing_tier := tier if {
    # Peak hours (2-7 PM weekdays): Tier 3 (highest cost)
    # Off-peak (10 PM - 6 AM): Tier 1 (lowest cost)
    # Mid-peak (all other times): Tier 2
    hour := time.clock(time.now_ns())[0]
    tier := determine_tier(hour, time.weekday(time.now_ns()))
}
3. Fail Safe
# ✅ Good - assume occupied if no data
zone_occupied(zone_id) if {
    zone_data := data.zones[zone_id]
    zone_data.occupancy_count > 0
}
zone_occupied(zone_id) if {
    not data.zones[zone_id]  # No data = assume occupied (safer)
}
4. Audit Everything
# Include metadata in every decision
decision := {
    "allow": allow,
    "violation_reason": violation_reason,
    "policy_version": policy_metadata.version,
    "evaluated_at": time.now_ns(),
    "input": input
}
Next Steps
- Testing Policies - Comprehensive testing guide
- Agent Integration - Connect agents to policies
- OPA Basics - Rego fundamentals
- Example Policies - More examples
Policy Checklist
Before deploying a policy, ensure:
- Default deny is set
- All rules have descriptive names
- Violation reasons are helpful
- Edge cases are handled
- Tests cover success and failure cases
- Policy metadata is complete
- Documentation explains intent
- Fail-safe fallbacks exist
Write safe, testable policies that protect your building! Continue to Testing Policies.