Skip to main content

Testing OPA Policies

Comprehensive testing ensures your safety policies work correctly. This guide covers unit testing, integration testing, and CI/CD integration for OPA policies.

Unit Testing with OPA

Test File Structure

Every policy should have a corresponding test file:

policies/
├── security.rego
├── security_test.rego # Unit tests
├── hvac/
│ ├── setpoint_control.rego
│ └── setpoint_control_test.rego
└── energy/
├── optimization.rego
└── optimization_test.rego

Writing Tests

Policy: policies/security.rego

package citadel.security

import rego.v1

default allow := false

allow if {
input.action == "door_unlock"
input.duration_seconds <= 300
input.duration_seconds > 0
}

Tests: policies/security_test.rego

package citadel.security

import rego.v1

# Test 1: Valid unlock should be allowed
test_allow_valid_unlock if {
allow with input as {
"action": "door_unlock",
"duration_seconds": 300
}
}

# Test 2: Zero duration should be denied
test_deny_zero_duration if {
not allow with input as {
"action": "door_unlock",
"duration_seconds": 0
}
}

# Test 3: Excessive duration should be denied
test_deny_excessive_duration if {
not allow with input as {
"action": "door_unlock",
"duration_seconds": 600
}
}

# Test 4: Boundary condition - exactly at limit
test_allow_at_max_duration if {
allow with input as {
"action": "door_unlock",
"duration_seconds": 300
}
}

# Test 5: Boundary condition - one second over
test_deny_one_over_limit if {
not allow with input as {
"action": "door_unlock",
"duration_seconds": 301
}
}

# Test 6: Wrong action should be denied
test_deny_wrong_action if {
not allow with input as {
"action": "door_lock",
"duration_seconds": 300
}
}

# Test 7: Missing field should be denied
test_deny_missing_duration if {
not allow with input as {
"action": "door_unlock"
}
}

Running Tests

# Run all tests in policies directory
opa test policies/

# Run tests for specific package
opa test policies/security_test.rego

# Verbose output
opa test -v policies/

# Coverage report
opa test --coverage policies/

Expected Output:

policies/security_test.rego:
test_allow_valid_unlock: PASS
test_deny_zero_duration: PASS
test_deny_excessive_duration: PASS
test_allow_at_max_duration: PASS
test_deny_one_over_limit: PASS
test_deny_wrong_action: PASS
test_deny_missing_duration: PASS

All tests passed

Advanced Testing Techniques

1. Mocking Data

Test policies that depend on external data:

package citadel.hvac.setpoint

import rego.v1

# Policy uses zone data
allow if {
zone_data := data.zones[input.zone_id]
zone_data.occupancy_count > 0
input.temperature >= 18
}

# Test with mock data
test_allow_occupied_zone if {
allow with input as {
"zone_id": "zone-1",
"temperature": 22
} with data.zones as {
"zone-1": {
"occupancy_count": 5,
"type": "office"
}
}
}

test_deny_unoccupied_zone if {
not allow with input as {
"zone_id": "zone-2",
"temperature": 22
} with data.zones as {
"zone-2": {
"occupancy_count": 0,
"type": "office"
}
}
}

2. Time-Based Tests

Mock time for time-dependent policies:

package citadel.security

import rego.v1

# Policy: Only allow unlocks during business hours (9 AM - 5 PM)
allow if {
input.action == "door_unlock"
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 17
}

# Test during business hours
test_allow_during_business_hours if {
allow with input as {
"action": "door_unlock"
} with time.now_ns as 1704031200000000000 # 10 AM
}

# Test outside business hours
test_deny_after_hours if {
not allow with input as {
"action": "door_unlock"
} with time.now_ns as 1704063600000000000 # 7 PM
}

# Helper to create timestamps
mock_time_at_hour(hour) := timestamp if {
# January 1, 2024, at specified hour
base := 1704067200000000000 # Midnight
timestamp := base + (hour * 3600000000000) # Add hours in nanoseconds
}

test_allow_at_9am if {
allow with input as {
"action": "door_unlock"
} with time.now_ns as mock_time_at_hour(9)
}

test_deny_at_8am if {
not allow with input as {
"action": "door_unlock"
} with time.now_ns as mock_time_at_hour(8)
}

3. Testing Collections

Test policies with arrays and sets:

package citadel.access

import rego.v1

# Policy: Allow admin actions
admin_users := {"admin@citadel.local", "security@citadel.local"}

allow if {
input.user in admin_users
}

# Test admin access
test_allow_admin if {
allow with input as {"user": "admin@citadel.local"}
}

test_allow_security if {
allow with input as {"user": "security@citadel.local"}
}

# Test non-admin denial
test_deny_regular_user if {
not allow with input as {"user": "user@citadel.local"}
}

4. Testing Violation Reasons

Verify helpful error messages:

package citadel.security

import rego.v1

default allow := false

allow if {
input.duration_seconds <= 300
}

violation_reason := "Duration exceeds 300 seconds" if {
not allow
input.duration_seconds > 300
}

# Test that violation reason is set correctly
test_violation_reason_excessive_duration if {
not allow with input as {"duration_seconds": 600}
violation_reason == "Duration exceeds 300 seconds" with input as {"duration_seconds": 600}
}

Integration Testing with Safety Service

Test policies through the Safety service API:

Setup Test Environment

# Start OPA with test policies
docker run -d --name opa-test \
-p 8181:8181 \
-v $(pwd)/policies:/policies \
openpolicyagent/opa:latest \
run --server --addr 0.0.0.0:8181 /policies

Python Integration Test

File: tests/integration/test_opa_integration.py

import pytest
import httpx
import asyncio

OPA_URL = "http://localhost:8181"

@pytest.mark.asyncio
async def test_door_unlock_policy():
"""Test door unlock policy through OPA API"""
async with httpx.AsyncClient() as client:
# Test: Valid unlock
response = await client.post(
f"{OPA_URL}/v1/data/citadel/security/allow",
json={
"input": {
"action": "door_unlock",
"duration_seconds": 300
}
}
)
result = response.json()
assert result["result"] == True

# Test: Excessive duration
response = await client.post(
f"{OPA_URL}/v1/data/citadel/security/allow",
json={
"input": {
"action": "door_unlock",
"duration_seconds": 600
}
}
)
result = response.json()
assert result["result"] == False

@pytest.mark.asyncio
async def test_policy_violation_reason():
"""Test that violation reasons are returned"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{OPA_URL}/v1/data/citadel/security",
json={
"input": {
"action": "door_unlock",
"duration_seconds": 600
}
}
)
result = response.json()
assert result["result"]["allow"] == False
assert "Duration exceeds" in result["result"]["violation_reason"]

Run integration tests:

pytest tests/integration/test_opa_integration.py -v

C# Integration Test

File: tests/CitadelMesh.Safety.Tests/PolicyIntegrationTests.cs

using Xunit;
using FluentAssertions;
using System.Net.Http.Json;

public class PolicyIntegrationTests
{
private readonly HttpClient _opaClient;
private const string OpaUrl = "http://localhost:8181";

public PolicyIntegrationTests()
{
_opaClient = new HttpClient { BaseAddress = new Uri(OpaUrl) };
}

[Fact]
public async Task DoorUnlock_WithValidDuration_ShouldAllow()
{
// Arrange
var input = new
{
input = new
{
action = "door_unlock",
duration_seconds = 300
}
};

// Act
var response = await _opaClient.PostAsJsonAsync(
"/v1/data/citadel/security/allow",
input
);

var result = await response.Content
.ReadFromJsonAsync<OpaResponse>();

// Assert
result.Result.Should().BeTrue();
}

[Fact]
public async Task DoorUnlock_WithExcessiveDuration_ShouldDeny()
{
// Arrange
var input = new
{
input = new
{
action = "door_unlock",
duration_seconds = 600
}
};

// Act
var response = await _opaClient.PostAsJsonAsync(
"/v1/data/citadel/security/allow",
input
);

var result = await response.Content
.ReadFromJsonAsync<OpaResponse>();

// Assert
result.Result.Should().BeFalse();
}

record OpaResponse(bool Result);
}

Test Coverage

Generate Coverage Report

# Run tests with coverage
opa test --coverage policies/ > coverage.txt

# View coverage report
cat coverage.txt

Example Output:

policies/security.rego:
Line 4: allow rule (COVERED)
Line 8: door_unlock_allowed rule (COVERED)
Line 15: excessive_unlock_requests rule (NOT COVERED)

Coverage report generated

Improve Coverage

Add tests for uncovered rules:

# Previously uncovered rule
excessive_unlock_requests if {
count(recent_door_unlocks) > 10
}

# New test to cover it
test_rate_limit_exceeded if {
recent_door_unlocks with data.recent_commands as [
{"action": "door_unlock", "agent_id": "agent-1", "timestamp": 1704067200000000000},
{"action": "door_unlock", "agent_id": "agent-1", "timestamp": 1704067201000000000},
# ... 9 more entries
]
}

CI/CD Integration

GitHub Actions Workflow

File: .github/workflows/opa-tests.yml

name: OPA Policy Tests

on:
push:
branches: [main, develop]
paths:
- 'policies/**'
pull_request:
paths:
- 'policies/**'

jobs:
test-policies:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup OPA
uses: open-policy-agent/setup-opa@v2
with:
version: latest

- name: Validate policy syntax
run: opa check policies/*.rego

- name: Run unit tests
run: opa test -v policies/

- name: Check coverage
run: |
opa test --coverage policies/ > coverage.txt
cat coverage.txt

- name: Integration tests
run: |
# Start OPA server
docker run -d --name opa \
-p 8181:8181 \
-v $(pwd)/policies:/policies \
openpolicyagent/opa:latest \
run --server --addr 0.0.0.0:8181 /policies

# Wait for OPA to be ready
sleep 5

# Run integration tests
pytest tests/integration/test_opa_integration.py

- name: Upload coverage report
uses: codecov/codecov-action@v3
with:
files: ./coverage.txt
flags: opa-policies

GitLab CI/CD

File: .gitlab-ci.yml

opa-tests:
stage: test
image: openpolicyagent/opa:latest
script:
- opa check policies/*.rego
- opa test -v policies/
- opa test --coverage policies/ > coverage.txt
coverage: '/Coverage:\s+(\d+\.?\d*)%/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: coverage.txt

Test Data Management

Fixture Files

Store test data in JSON files:

File: tests/fixtures/zone_data.json

{
"zones": {
"zone-lobby": {
"type": "lobby",
"occupancy_count": 12,
"current_temperature": 72
},
"zone-datacenter": {
"type": "datacenter",
"occupancy_count": 2,
"current_temperature": 70
}
}
}

Use in tests:

package citadel.hvac.setpoint

import rego.v1

test_allow_occupied_lobby if {
allow with input as {
"zone_id": "zone-lobby",
"temperature": 72
} with data as json.unmarshal(file("tests/fixtures/zone_data.json"))
}

Or load via command line:

opa test policies/ --data tests/fixtures/zone_data.json

Performance Testing

Benchmark Policy Evaluation

package citadel.security

import rego.v1

# Benchmark: How long does evaluation take?
test_benchmark_door_unlock if {
allow with input as {
"action": "door_unlock",
"duration_seconds": 300
}
}

Run with timing:

opa test --bench policies/security_test.rego

Output:

Benchmark results:
test_benchmark_door_unlock: sub-millisecond evaluation

Load Testing

Simulate high load with concurrent requests:

import asyncio
import httpx
import time

async def evaluate_policy(client, input_data):
response = await client.post(
"http://localhost:8181/v1/data/citadel/security/allow",
json={"input": input_data}
)
return response.json()

async def load_test():
async with httpx.AsyncClient() as client:
# Simulate 1000 concurrent requests
tasks = [
evaluate_policy(client, {
"action": "door_unlock",
"duration_seconds": 300
})
for _ in range(1000)
]

start = time.time()
results = await asyncio.gather(*tasks)
duration = time.time() - start

print(f"1000 requests completed in {duration:.2f}s")
print(f"Throughput: {1000/duration:.2f} req/s")

asyncio.run(load_test())

Troubleshooting Tests

Issue: Test fails with "undefined ref"

Error: test_allow_valid_unlock: undefined ref: allow

Solution: Ensure test is in same package as policy:

# Policy
package citadel.security

# Test - must match package
package citadel.security # ← Same package name

Issue: Mock data not working

Error: Policy still reads production data

Solution: Use correct mock syntax:

# ✅ Correct
test_with_mock_data if {
allow with input as {...} with data.zones as {...}
}

# ❌ Wrong - data is global, can't reassign
test_with_mock_data if {
data.zones := {...}
allow with input as {...}
}

Issue: Time-based tests inconsistent

Solution: Always mock time:

# ✅ Correct - deterministic
test_business_hours if {
allow with input as {...} with time.now_ns as fixed_timestamp
}

# ❌ Wrong - uses real time, can fail
test_business_hours if {
allow with input as {...} # Time changes each run!
}

Best Practices

  1. Test all code paths - Comprehensive coverage
  2. Test edge cases - Boundaries, nulls, missing fields
  3. Use descriptive test names - test_deny_excessive_duration not test_1
  4. Mock external dependencies - Time, data, API calls
  5. Automate in CI/CD - Fail builds on test failures
  6. Performance test - Ensure sub-millisecond evaluation
  7. Version test data - Track test fixtures with policies

Next Steps


Test-driven policy development ensures safety and reliability! Continue to Agent Basics.