Skip to main content

Agent Policy Integration

This guide shows how to integrate OPA safety policies into your CitadelMesh agents for safety-first building control.

Why Policy Integration?

Every building control action must pass OPA policy validation:

Safety - Prevent dangerous operations ✅ Compliance - Enforce regulations ✅ Auditability - Log all decisions ✅ Flexibility - Update policies without code changes

Integration Architecture

Agent → Safety Service → OPA → Policy Decision
↓ ↓
Execute (if allowed) Audit Log

Policy Check Methods

Method 1: Direct OPA HTTP API

import httpx

class Agent:
def __init__(self):
self.opa_url = "http://localhost:8181"

async def check_policy(self, action: str, params: dict) -> tuple[bool, str]:
"""Check OPA policy via HTTP"""

policy_input = {
"input": {
"action": action,
**params
}
}

async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.opa_url}/v1/data/citadel/security",
json=policy_input
)

result = response.json()["result"]

return (
result.get("allow", False),
result.get("violation_reason", "")
)

async def unlock_door(self, door_id: str, duration: int):
"""Unlock door with policy check"""

# Check policy first
allowed, reason = await self.check_policy("door_unlock", {
"door_id": door_id,
"duration_seconds": duration,
"agent_id": self.agent_id
})

if not allowed:
self.logger.warning(f"Policy denied: {reason}")
raise PolicyViolationError(reason)

# Execute if allowed
await self.mcp_client.unlock_door(door_id, duration)

Safety Service provides:

  • Centralized policy enforcement
  • Caching for performance
  • Audit logging
  • Policy version management
import httpx

class SafetyClient:
"""Client for CitadelMesh Safety Service"""

def __init__(self, safety_url: str = "http://localhost:5001"):
self.safety_url = safety_url

async def evaluate_policy(
self,
policy_path: str,
input_data: dict
) -> dict:
"""Evaluate policy via Safety Service"""

async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.safety_url}/api/policy/evaluate",
json={
"policyPath": policy_path,
"input": input_data
}
)

return response.json()

# Use in agent
class MyAgent(BaseAgent):
def __init__(self, config):
super().__init__(config)
self.safety = SafetyClient()

async def execute_action(self, action: str, params: dict):
"""Execute with safety check"""

# Evaluate policy
result = await self.safety.evaluate_policy(
"citadel.security.door_unlock",
{
"action": action,
"agent_id": self.config.agent_id,
**params
}
)

if not result["allowed"]:
self.logger.warning(f"Denied: {result['violation_reason']}")
return {"success": False, "reason": result["violation_reason"]}

# Execute
await self._do_execute(action, params)
return {"success": True}

Method 3: BaseAgent Built-in Method

class MyAgent(BaseAgent):
async def _execute_door_control(self, state: MyState) -> MyState:
"""Execute with built-in policy check"""

for door_id in state.doors_to_unlock:
# Use BaseAgent's check_safety_policy method
allowed = await self.check_safety_policy(
"door_unlock",
{
"door_id": door_id,
"duration_seconds": 300,
"agent_id": self.config.agent_id
}
)

if allowed:
await self.mcp_client.unlock_door(door_id)
state.unlocked_doors.append(door_id)
else:
self.logger.warning(f"Policy denied unlock for {door_id}")
state.denied_doors.append(door_id)

return state

Policy Integration Patterns

Pattern 1: Pre-Action Validation

Check policy before every action:

async def _execute_setpoint_change(self, state: HVACState) -> HVACState:
"""Execute with pre-action validation"""

for change in state.setpoint_changes:
# Check policy
allowed = await self.check_policy(
"write_setpoint",
{
"entity_id": change["entity_id"],
"value": change["new_setpoint"],
"priority": 8
}
)

if allowed:
await self.mcp_client.write_setpoint(
change["entity_id"],
change["new_setpoint"]
)
change["executed"] = True
else:
change["executed"] = False
change["denial_reason"] = "Policy violation"

return state

Pattern 2: Batch Validation

Validate multiple actions together:

async def _validate_batch(self, actions: List[dict]) -> List[dict]:
"""Validate multiple actions efficiently"""

results = []

for action in actions:
result = await self.safety.evaluate_policy(
f"citadel.{action['domain']}.{action['action']}",
action['params']
)
results.append({
**action,
"allowed": result["allowed"],
"reason": result.get("violation_reason")
})

return results

Pattern 3: Policy-Aware State Machine

Integrate policy checks into state machine:

def build_graph(self):
"""Build graph with policy checks"""

workflow = StateGraph(MyState)

workflow.add_node("plan_actions", self._plan)
workflow.add_node("check_policies", self._check_all_policies)
workflow.add_node("execute_approved", self._execute_approved)
workflow.add_node("log_denials", self._log_denials)

workflow.add_edge("plan_actions", "check_policies")

workflow.add_conditional_edges(
"check_policies",
lambda s: "execute" if s.approved_actions else "log",
{
"execute": "execute_approved",
"log": "log_denials"
}
)

workflow.add_edge("execute_approved", END)
workflow.add_edge("log_denials", END)

workflow.set_entry_point("plan_actions")

return workflow.compile()

async def _check_all_policies(self, state: MyState) -> MyState:
"""Dedicated policy check node"""

state.approved_actions = []
state.denied_actions = []

for action in state.planned_actions:
allowed, reason = await self.check_policy(
action["type"],
action["params"]
)

if allowed:
state.approved_actions.append(action)
else:
state.denied_actions.append({
**action,
"denial_reason": reason
})

return state

Handling Policy Denials

Graceful Degradation

async def _execute_with_fallback(self, state: MyState) -> MyState:
"""Try primary action, fall back if denied"""

# Try aggressive cooling
allowed = await self.check_policy("set_temperature", {
"zone_id": state.zone,
"temperature": 68
})

if allowed:
await self.mcp_client.set_temperature(state.zone, 68)
state.action_taken = "aggressive_cooling"
else:
# Fall back to moderate cooling
allowed = await self.check_policy("set_temperature", {
"zone_id": state.zone,
"temperature": 70
})

if allowed:
await self.mcp_client.set_temperature(state.zone, 70)
state.action_taken = "moderate_cooling"
else:
state.action_taken = "none"
state.error = "All cooling options denied by policy"

return state

User Notification

async def _handle_policy_denial(self, action: dict, reason: str):
"""Notify operators of policy denials"""

notification = {
"level": "warning",
"message": f"Action denied by policy: {action['type']}",
"reason": reason,
"agent_id": self.config.agent_id,
"timestamp": datetime.now().isoformat(),
"action_details": action
}

# Publish to notification topic
event = CloudEventMessage(
id=f"denial-{uuid.uuid4()}",
source=f"/agents/{self.config.agent_type}",
type="citadel.policy.denial",
data=notification
)

await self.event_bus.publish("citadel.notifications", event)

Audit Trail

Every policy decision should be logged:

async def _audit_policy_decision(
self,
action: str,
params: dict,
allowed: bool,
reason: str = ""
):
"""Log policy decision for audit"""

audit_record = {
"timestamp": datetime.now().isoformat(),
"agent_id": self.config.agent_id,
"action": action,
"params": params,
"policy_decision": "ALLOW" if allowed else "DENY",
"denial_reason": reason if not allowed else None,
"policy_version": "1.0.0" # Get from OPA metadata
}

self.logger.info(f"Policy audit: {audit_record}")

# Store in audit database
# await self.audit_service.log_decision(audit_record)

Testing Policy Integration

Mock Policy Responses

@pytest.mark.asyncio
async def test_agent_with_mock_policy():
"""Test agent with mocked policy responses"""

class MockSafetyClient:
async def evaluate_policy(self, path, input_data):
# Mock: deny high temps
if input_data.get("temperature", 0) > 75:
return {
"allowed": False,
"violation_reason": "Temperature too high"
}
return {"allowed": True}

agent = MyAgent(config)
agent.safety = MockSafetyClient()

state = await agent.graph.ainvoke(initial_state)

# Verify denials
assert state.denied_actions # Should have denials

Integration Test with Real OPA

@pytest.mark.integration
@pytest.mark.asyncio
async def test_agent_with_real_opa():
"""Test agent with real OPA instance"""

# Start OPA container
import docker
client = docker.from_env()
opa = client.containers.run(
"openpolicyagent/opa:latest",
command=["run", "--server"],
ports={"8181/tcp": 8181},
volumes={
f"{os.getcwd()}/policies": {"bind": "/policies", "mode": "ro"}
},
detach=True
)

try:
# Wait for OPA to start
await asyncio.sleep(2)

# Run agent
agent = MyAgent(config)
agent.opa_url = "http://localhost:8181"

result = await agent.execute_action("door_unlock", {
"door_id": "test-door",
"duration_seconds": 300
})

assert result["success"] == True

finally:
opa.stop()
opa.remove()

Performance Optimization

Policy Result Caching

from functools import lru_cache
import hashlib

class CachedSafetyClient:
def __init__(self):
self.cache = {}
self.cache_ttl = 60 # seconds

async def evaluate_policy(self, path, input_data):
# Create cache key
cache_key = hashlib.md5(
f"{path}:{json.dumps(input_data, sort_keys=True)}".encode()
).hexdigest()

# Check cache
if cache_key in self.cache:
cached_time, result = self.cache[cache_key]
if time.time() - cached_time < self.cache_ttl:
return result

# Call OPA
result = await self._call_opa(path, input_data)

# Update cache
self.cache[cache_key] = (time.time(), result)

return result

Async Batch Processing

async def _check_policies_batch(self, actions: List[dict]) -> List[dict]:
"""Check multiple policies in parallel"""

async def check_one(action):
result = await self.safety.evaluate_policy(
action["policy_path"],
action["input"]
)
return {**action, **result}

# Process in parallel
results = await asyncio.gather(*[
check_one(action) for action in actions
])

return results

Best Practices

  1. Always check policies before executing actions
  2. Handle denials gracefully with fallback options
  3. Log all policy decisions for audit trail
  4. Cache policy results for performance (with TTL)
  5. Test with both mock and real OPA
  6. Provide helpful denial messages to operators
  7. Version your policies and track in audit logs

Troubleshooting

Issue: Policy always denies

Check policy input format:

# Debug policy input
print(f"Policy input: {json.dumps(policy_input, indent=2)}")

# Test directly with OPA CLI
# opa eval -i input.json -d policies/security.rego 'data.citadel.security.allow'

Issue: OPA not reachable

async def check_policy_with_retry(self, action, params, max_retries=3):
"""Check policy with retry logic"""

for attempt in range(max_retries):
try:
return await self.check_policy(action, params)
except Exception as e:
if attempt == max_retries - 1:
self.logger.error(f"OPA unreachable after {max_retries} attempts")
# Fail safe: deny action
return False, "Policy engine unreachable"
await asyncio.sleep(1 * (attempt + 1))

Next Steps


Safety-first agents protect your building! Continue to Testing Agents.