Skip to main content

OPA Policy Basics

Open Policy Agent (OPA) is the foundation of CitadelMesh's safety-first architecture. This guide introduces OPA, the Rego language, and how to write effective safety policies.

What is OPA?

Open Policy Agent (OPA) is a policy engine that enforces rules across your system. In CitadelMesh:

  • Agents check policies before executing building control actions
  • Policies are written in Rego, a declarative language
  • All decisions are audited for compliance and debugging

Why OPA?

Declarative - Express what's allowed, not how to check it ✅ Testable - Unit test policies just like code ✅ Auditable - Every decision is logged ✅ Fast - Sub-millisecond policy evaluation ✅ Flexible - Complex rules with simple syntax

Rego Language Fundamentals

Basic Syntax

Rego policies consist of rules that evaluate to true or false.

Simple rule:

package citadel.example

# Rule that's always true
always_true := true

# Rule that checks a condition
temperature_safe if {
input.temperature >= 18
input.temperature <= 26
}

Input and Data:

  • input - The request being evaluated (from agent)
  • data - Historical data and system state (from database/cache)

Package and Imports

Every policy file starts with a package declaration:

package citadel.security

import rego.v1 # Modern Rego syntax

Package names map to API endpoints:

  • package citadel.security/v1/data/citadel/security

Default Values

Set defaults for safety:

# Default deny - safest approach
default allow := false

# Only allow if explicitly permitted
allow if {
# conditions here
}

Logical Operators

# AND - all conditions must be true
allow if {
input.user == "admin" # AND
input.action == "unlock" # AND
input.duration <= 300 # AND
}

# OR - use multiple rules
allow if {
input.user == "admin"
}

allow if {
input.priority == "emergency"
}

# NOT - negate a condition
allow if {
not emergency_mode
}

Comparisons

# Equality
input.action == "door_unlock"

# Inequality
input.duration != 0

# Numerical comparisons
input.temperature >= 18
input.temperature <= 26

# String operations
startswith(input.entity_id, "hvac.")
contains(input.message, "emergency")

Collections

# Arrays
allowed_actions := ["unlock", "lock", "status"]

# Check membership
allow if {
input.action in allowed_actions
}

# Sets (unique values)
critical_zones := {"datacenter", "server_room", "vault"}

# Check membership
zone_critical if {
input.zone in critical_zones
}

# Objects (dictionaries)
temperature_limits := {
"lobby": {"min": 18, "max": 24},
"datacenter": {"min": 18, "max": 22}
}

# Access values
allow if {
limits := temperature_limits[input.zone]
input.temperature >= limits.min
input.temperature <= limits.max
}

CitadelMesh Policy Structure

1. Security Policy Example

File: policies/security.rego

package citadel.security

import rego.v1

# Default deny for safety
default allow := false

# Door unlock policy
allow if {
input.action == "door_unlock"
door_unlock_allowed
}

door_unlock_allowed if {
# Emergency override
input.priority == "PRIORITY_EMERGENCY"
}

door_unlock_allowed if {
# Normal operations
input.priority != "PRIORITY_EMERGENCY"

# Duration limits
input.duration_seconds <= 300 # Max 5 minutes
input.duration_seconds > 0

# Time restrictions
current_hour := time.clock(time.now_ns())[0]
current_hour >= 6 # After 6 AM
current_hour <= 22 # Before 10 PM

# Rate limiting
not excessive_unlock_requests
}

# Rate limiting logic
excessive_unlock_requests if {
count(recent_door_unlocks) > 10
}

recent_door_unlocks contains cmd if {
cmd := data.recent_commands[_]
cmd.action == "door_unlock"
cmd.agent_id == input.agent_id
cmd.timestamp > (time.now_ns() - 3600000000000) # Last hour
}

# Violation reasons for debugging
violation_reason := "Door unlock denied: duration exceeds 300 seconds" if {
not allow
input.action == "door_unlock"
input.duration_seconds > 300
}

violation_reason := "Door unlock denied: outside allowed hours (6 AM - 10 PM)" if {
not allow
input.action == "door_unlock"
current_hour := time.clock(time.now_ns())[0]
not (current_hour >= 6)
not (current_hour <= 22)
}

2. HVAC Policy Example

File: policies/hvac/setpoint_control.rego

package citadel.hvac.setpoint

import rego.v1

default allow := false

# Temperature limits
min_temp := 65 # Fahrenheit
max_temp := 78

# Allow setpoint changes within safe range
allow if {
input.action == "write_setpoint"
input.value >= min_temp
input.value <= max_temp
valid_entity_id
not emergency_mode
}

# Validate entity ID format
valid_entity_id if {
startswith(input.entity_id, "hvac.")
count(split(input.entity_id, ".")) >= 3
}

# Check for emergency lockout
emergency_mode if {
data.system_state.emergency_active == true
}

# Always allow read operations
allow if {
input.action == "read_point"
}

# Datacenter has stricter limits
allow if {
input.action == "write_setpoint"
input.entity_id == "hvac.datacenter.setpoint"
input.value >= 68
input.value <= 75
not emergency_mode
}

# Night setback - lower temps allowed
allow if {
input.action == "write_setpoint"
is_nighttime
input.value >= 60 # Lower minimum at night
input.value <= max_temp
valid_entity_id
not emergency_mode
}

is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour >= 22 # After 10 PM
}

is_nighttime if {
hour := time.clock(time.now_ns())[0]
hour <= 6 # Before 6 AM
}

Testing Policies with OPA Playground

1. Install OPA CLI

# macOS
brew install opa

# Linux
curl -L -o opa https://openpolicyagent.org/downloads/latest/opa_linux_amd64
chmod +x opa
sudo mv opa /usr/local/bin/

# Windows
choco install opa

2. Evaluate Policy Locally

Test input file (test-input.json):

{
"action": "door_unlock",
"duration_seconds": 300,
"priority": "PRIORITY_NORMAL",
"agent_id": "security-agent-001"
}

Evaluate policy:

opa eval -i test-input.json \
-d policies/security.rego \
'data.citadel.security.allow'

Output:

{
"result": [
{
"expressions": [
{
"value": true,
"text": "data.citadel.security.allow"
}
]
}
]
}

3. Test Denial Scenario

Test input (test-deny.json):

{
"action": "door_unlock",
"duration_seconds": 600,
"priority": "PRIORITY_NORMAL"
}

Evaluate:

opa eval -i test-deny.json \
-d policies/security.rego \
'data.citadel.security.allow'

Output:

{
"result": [
{
"expressions": [
{
"value": false,
"text": "data.citadel.security.allow"
}
]
}
]
}

Get denial reason:

opa eval -i test-deny.json \
-d policies/security.rego \
'data.citadel.security.violation_reason'

Output:

"Door unlock denied: duration exceeds 300 seconds"

4. OPA Web Playground

Use the interactive playground at: https://play.openpolicyagent.org

  1. Paste policy in left panel
  2. Add test input in right panel
  3. Click Evaluate
  4. See results instantly

Debugging Policies

1. Print Debugging

Use trace() to debug:

allow if {
trace(sprintf("Checking action: %v", [input.action]))
input.action == "door_unlock"

trace(sprintf("Duration: %v seconds", [input.duration_seconds]))
input.duration_seconds <= 300
}

Run with --explain:

opa eval -i test-input.json \
-d policies/security.rego \
--explain=full \
'data.citadel.security.allow'

2. Check Policy Syntax

opa check policies/*.rego

Output:

policies/security.rego: OK
policies/energy.rego: OK
policies/hvac/setpoint_control.rego: OK

3. Format Policy Code

opa fmt -w policies/security.rego

Auto-formats Rego code with consistent style.

4. Evaluate Specific Rules

Test individual rules:

# Test only the rate limiting rule
opa eval -i test-input.json \
-d policies/security.rego \
'data.citadel.security.excessive_unlock_requests'

Common Patterns

1. Time-Based Restrictions

# Business hours only (9 AM - 5 PM)
business_hours if {
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 17
}

# Weekdays only
weekday if {
day := time.weekday(time.now_ns())
day != "Saturday"
day != "Sunday"
}

allow if {
business_hours
weekday
# other conditions...
}

2. Role-Based Access

# User roles
admin_users := {"admin@citadel.local", "security@citadel.local"}
operator_users := {"operator1@citadel.local", "operator2@citadel.local"}

# Admin can do anything
allow if {
input.user in admin_users
}

# Operators have limited access
allow if {
input.user in operator_users
input.action in ["status", "read"]
}

3. Rate Limiting

# Max 10 actions per hour per agent
rate_limit_exceeded if {
recent_actions := [cmd |
cmd := data.recent_commands[_]
cmd.agent_id == input.agent_id
cmd.timestamp > (time.now_ns() - 3600000000000)
]
count(recent_actions) >= 10
}

allow if {
not rate_limit_exceeded
# other conditions...
}

4. Multi-Factor Conditions

# High-risk actions require multiple checks
high_risk_action if {
input.action in ["emergency_unlock", "override_hvac", "disable_camera"]
}

allow if {
high_risk_action
input.user in admin_users
input.approval_code != ""
input.justification != ""
verify_approval_code(input.approval_code)
}

Best Practices

1. Always Use Default Deny

# ✅ Good - explicit default
default allow := false

allow if {
# specific conditions
}
# ❌ Bad - implicit allow
allow if {
# might miss cases
}

2. Provide Violation Reasons

# ✅ Good - helpful error messages
violation_reason := "Temperature outside safe range (18-26°C)" if {
not allow
input.action == "set_temperature"
not (input.temperature >= 18 && input.temperature <= 26)
}

3. Use Named Rules for Clarity

# ✅ Good - readable
allow if {
valid_temperature
valid_zone
not emergency_mode
}

valid_temperature if {
input.temperature >= 18
input.temperature <= 26
}

# ❌ Bad - hard to read
allow if {
input.temperature >= 18
input.temperature <= 26
startswith(input.zone, "hvac.")
data.system_state.emergency != true
}

4. Test Edge Cases

# Test boundary conditions
test_at_min_boundary if {
allow with input as {"temperature": 18}
}

test_below_min_boundary if {
not allow with input as {"temperature": 17.9}
}

Next Steps

Common Pitfalls

Issue: Policy always returns false

Solution: Check for unintentional AND conditions:

# All conditions must be true (AND)
allow if {
input.action == "unlock"
input.action == "lock" # Can never be both!
}

# Use separate rules for OR
allow if { input.action == "unlock" }
allow if { input.action == "lock" }

Issue: Time-based rules not working

Solution: Time functions return nanoseconds since epoch:

# ✅ Correct
current_hour := time.clock(time.now_ns())[0]

# ❌ Wrong
current_hour := time.now_ns() # This is nanoseconds, not hours!

Issue: Data not available in policy

Solution: Check data path and use mock data for testing:

opa eval -i input.json \
-d policies/security.rego \
--data test-data.json \ # Provide mock data
'data.citadel.security.allow'

Master OPA fundamentals and build safe, auditable policies! Continue to Writing Policies.