OPA Policy Basics
Open Policy Agent (OPA) is the foundation of CitadelMesh's safety-first architecture. This guide introduces OPA, the Rego language, and how to write effective safety policies.
What is OPA?
Open Policy Agent (OPA) is a policy engine that enforces rules across your system. In CitadelMesh:
- Agents check policies before executing building control actions
- Policies are written in Rego, a declarative language
- All decisions are audited for compliance and debugging
Why OPA?
✅ Declarative - Express what's allowed, not how to check it ✅ Testable - Unit test policies just like code ✅ Auditable - Every decision is logged ✅ Fast - Sub-millisecond policy evaluation ✅ Flexible - Complex rules with simple syntax
Rego Language Fundamentals
Basic Syntax
Rego policies consist of rules that evaluate to true or false.
Simple rule:
package citadel.example
# Rule that's always true
always_true := true
# Rule that checks a condition
temperature_safe if {
input.temperature >= 18
input.temperature <= 26
}
Input and Data:
input- The request being evaluated (from agent)data- Historical data and system state (from database/cache)
Package and Imports
Every policy file starts with a package declaration:
package citadel.security
import rego.v1 # Modern Rego syntax
Package names map to API endpoints:
package citadel.security→/v1/data/citadel/security
Default Values
Set defaults for safety:
# Default deny - safest approach
default allow := false
# Only allow if explicitly permitted
allow if {
# conditions here
}
Logical Operators
# AND - all conditions must be true
allow if {
input.user == "admin" # AND
input.action == "unlock" # AND
input.duration <= 300 # AND
}
# OR - use multiple rules
allow if {
input.user == "admin"
}
allow if {
input.priority == "emergency"
}
# NOT - negate a condition
allow if {
not emergency_mode
}
Comparisons
# Equality
input.action == "door_unlock"
# Inequality
input.duration != 0
# Numerical comparisons
input.temperature >= 18
input.temperature <= 26
# String operations
startswith(input.entity_id, "hvac.")
contains(input.message, "emergency")
Collections
# Arrays
allowed_actions := ["unlock", "lock", "status"]
# Check membership
allow if {
input.action in allowed_actions
}
# Sets (unique values)
critical_zones := {"datacenter", "server_room", "vault"}
# Check membership
zone_critical if {
input.zone in critical_zones
}
# Objects (dictionaries)
temperature_limits := {
"lobby": {"min": 18, "max": 24},
"datacenter": {"min": 18, "max": 22}
}
# Access values
allow if {
limits := temperature_limits[input.zone]
input.temperature >= limits.min
input.temperature <= limits.max
}
CitadelMesh Policy Structure
1. Security Policy Example
File: policies/security.rego
package citadel.security
import rego.v1
# Default deny for safety
default allow := false
# Door unlock policy
allow if {
input.action == "door_unlock"
door_unlock_allowed
}
door_unlock_allowed if {
# Emergency override
input.priority == "PRIORITY_EMERGENCY"
}
door_unlock_allowed if {
# Normal operations
input.priority != "PRIORITY_EMERGENCY"
# Duration limits
input.duration_seconds <= 300 # Max 5 minutes
input.duration_seconds > 0
# Time restrictions
current_hour := time.clock(time.now_ns())[0]
current_hour >= 6 # After 6 AM
current_hour <= 22 # Before 10 PM
# Rate limiting
not excessive_unlock_requests
}
# Rate limiting logic
excessive_unlock_requests if {
count(recent_door_unlocks) > 10
}
recent_door_unlocks contains cmd if {
cmd := data.recent_commands[_]
cmd.action == "door_unlock"
cmd.agent_id == input.agent_id
cmd.timestamp > (time.now_ns() - 3600000000000) # Last hour
}
# Violation reasons for debugging
violation_reason := "Door unlock denied: duration exceeds 300 seconds" if {
not allow
input.action == "door_unlock"
input.duration_seconds > 300
}
violation_reason := "Door unlock denied: outside allowed hours (6 AM - 10 PM)" if {
not allow
input.action == "door_unlock"
current_hour := time.clock(time.now_ns())[0]
not (current_hour >= 6)
not (current_hour <= 22)
}
2. HVAC Policy Example
File: policies/hvac/setpoint_control.rego
package citadel.hvac.setpoint
import rego.v1
default allow := false
# Temperature limits
min_temp := 65 # Fahrenheit
max_temp := 78
# Allow setpoint changes within safe range
allow if {
input.action == "write_setpoint"
input.value >= min_temp
input.value <= max_temp
valid_entity_id
not emergency_mode
}
# Validate entity ID format
valid_entity_id if {
startswith(input.entity_id, "hvac.")
count(split(input.entity_id, ".")) >= 3
}
# Check for emergency lockout
emergency_mode if {
data.system_state.emergency_active == true
}
# Always allow read operations
allow if {
input.action == "read_point"
}
# Datacenter has stricter limits
allow if {
input.action == "write_setpoint"
input.entity_id == "hvac.datacenter.setpoint"
input.value >= 68
input.value <= 75
not emergency_mode
}
# Night setback - lower temps allowed
allow if {
input.action == "write_setpoint"
is_nighttime
input.value >= 60 # Lower minimum at night
input.value <= max_temp
valid_entity_id
not emergency_mode
}
is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour >= 22 # After 10 PM
}
is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour <= 6 # Before 6 AM
}
Testing Policies with OPA Playground
1. Install OPA CLI
# macOS
brew install opa
# Linux
curl -L -o opa https://openpolicyagent.org/downloads/latest/opa_linux_amd64
chmod +x opa
sudo mv opa /usr/local/bin/
# Windows
choco install opa
2. Evaluate Policy Locally
Test input file (test-input.json):
{
"action": "door_unlock",
"duration_seconds": 300,
"priority": "PRIORITY_NORMAL",
"agent_id": "security-agent-001"
}
Evaluate policy:
opa eval -i test-input.json \
-d policies/security.rego \
'data.citadel.security.allow'
Output:
{
"result": [
{
"expressions": [
{
"value": true,
"text": "data.citadel.security.allow"
}
]
}
]
}
3. Test Denial Scenario
Test input (test-deny.json):
{
"action": "door_unlock",
"duration_seconds": 600,
"priority": "PRIORITY_NORMAL"
}
Evaluate:
opa eval -i test-deny.json \
-d policies/security.rego \
'data.citadel.security.allow'
Output:
{
"result": [
{
"expressions": [
{
"value": false,
"text": "data.citadel.security.allow"
}
]
}
]
}
Get denial reason:
opa eval -i test-deny.json \
-d policies/security.rego \
'data.citadel.security.violation_reason'
Output:
"Door unlock denied: duration exceeds 300 seconds"
4. OPA Web Playground
Use the interactive playground at: https://play.openpolicyagent.org
- Paste policy in left panel
- Add test input in right panel
- Click Evaluate
- See results instantly
Debugging Policies
1. Print Debugging
Use trace() to debug:
allow if {
trace(sprintf("Checking action: %v", [input.action]))
input.action == "door_unlock"
trace(sprintf("Duration: %v seconds", [input.duration_seconds]))
input.duration_seconds <= 300
}
Run with --explain:
opa eval -i test-input.json \
-d policies/security.rego \
--explain=full \
'data.citadel.security.allow'
2. Check Policy Syntax
opa check policies/*.rego
Output:
policies/security.rego: OK
policies/energy.rego: OK
policies/hvac/setpoint_control.rego: OK
3. Format Policy Code
opa fmt -w policies/security.rego
Auto-formats Rego code with consistent style.
4. Evaluate Specific Rules
Test individual rules:
# Test only the rate limiting rule
opa eval -i test-input.json \
-d policies/security.rego \
'data.citadel.security.excessive_unlock_requests'
Common Patterns
1. Time-Based Restrictions
# Business hours only (9 AM - 5 PM)
business_hours if {
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 17
}
# Weekdays only
weekday if {
day := time.weekday(time.now_ns())
day != "Saturday"
day != "Sunday"
}
allow if {
business_hours
weekday
# other conditions...
}
2. Role-Based Access
# User roles
admin_users := {"admin@citadel.local", "security@citadel.local"}
operator_users := {"operator1@citadel.local", "operator2@citadel.local"}
# Admin can do anything
allow if {
input.user in admin_users
}
# Operators have limited access
allow if {
input.user in operator_users
input.action in ["status", "read"]
}
3. Rate Limiting
# Max 10 actions per hour per agent
rate_limit_exceeded if {
recent_actions := [cmd |
cmd := data.recent_commands[_]
cmd.agent_id == input.agent_id
cmd.timestamp > (time.now_ns() - 3600000000000)
]
count(recent_actions) >= 10
}
allow if {
not rate_limit_exceeded
# other conditions...
}
4. Multi-Factor Conditions
# High-risk actions require multiple checks
high_risk_action if {
input.action in ["emergency_unlock", "override_hvac", "disable_camera"]
}
allow if {
high_risk_action
input.user in admin_users
input.approval_code != ""
input.justification != ""
verify_approval_code(input.approval_code)
}
Best Practices
1. Always Use Default Deny
# ✅ Good - explicit default
default allow := false
allow if {
# specific conditions
}
# ❌ Bad - implicit allow
allow if {
# might miss cases
}
2. Provide Violation Reasons
# ✅ Good - helpful error messages
violation_reason := "Temperature outside safe range (18-26°C)" if {
not allow
input.action == "set_temperature"
not (input.temperature >= 18 && input.temperature <= 26)
}
3. Use Named Rules for Clarity
# ✅ Good - readable
allow if {
valid_temperature
valid_zone
not emergency_mode
}
valid_temperature if {
input.temperature >= 18
input.temperature <= 26
}
# ❌ Bad - hard to read
allow if {
input.temperature >= 18
input.temperature <= 26
startswith(input.zone, "hvac.")
data.system_state.emergency != true
}
4. Test Edge Cases
# Test boundary conditions
test_at_min_boundary if {
allow with input as {"temperature": 18}
}
test_below_min_boundary if {
not allow with input as {"temperature": 17.9}
}
Next Steps
- Writing Policies - Create custom safety policies
- Testing Policies - Comprehensive testing guide
- Agent Integration - Connect agents to OPA
- Architecture: Safety - Safety framework deep dive
Common Pitfalls
Issue: Policy always returns false
Solution: Check for unintentional AND conditions:
# All conditions must be true (AND)
allow if {
input.action == "unlock"
input.action == "lock" # Can never be both!
}
# Use separate rules for OR
allow if { input.action == "unlock" }
allow if { input.action == "lock" }
Issue: Time-based rules not working
Solution: Time functions return nanoseconds since epoch:
# ✅ Correct
current_hour := time.clock(time.now_ns())[0]
# ❌ Wrong
current_hour := time.now_ns() # This is nanoseconds, not hours!
Issue: Data not available in policy
Solution: Check data path and use mock data for testing:
opa eval -i input.json \
-d policies/security.rego \
--data test-data.json \ # Provide mock data
'data.citadel.security.allow'
Master OPA fundamentals and build safe, auditable policies! Continue to Writing Policies.