Agent Policy Integration
This guide shows how to integrate OPA safety policies into your CitadelMesh agents for safety-first building control.
Why Policy Integration?
Every building control action must pass OPA policy validation:
✅ Safety - Prevent dangerous operations ✅ Compliance - Enforce regulations ✅ Auditability - Log all decisions ✅ Flexibility - Update policies without code changes
Integration Architecture
Agent → Safety Service → OPA → Policy Decision
↓ ↓
Execute (if allowed) Audit Log
Policy Check Methods
Method 1: Direct OPA HTTP API
import httpx
class Agent:
def __init__(self):
self.opa_url = "http://localhost:8181"
async def check_policy(self, action: str, params: dict) -> tuple[bool, str]:
"""Check OPA policy via HTTP"""
policy_input = {
"input": {
"action": action,
**params
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.opa_url}/v1/data/citadel/security",
json=policy_input
)
result = response.json()["result"]
return (
result.get("allow", False),
result.get("violation_reason", "")
)
async def unlock_door(self, door_id: str, duration: int):
"""Unlock door with policy check"""
# Check policy first
allowed, reason = await self.check_policy("door_unlock", {
"door_id": door_id,
"duration_seconds": duration,
"agent_id": self.agent_id
})
if not allowed:
self.logger.warning(f"Policy denied: {reason}")
raise PolicyViolationError(reason)
# Execute if allowed
await self.mcp_client.unlock_door(door_id, duration)
Method 2: Using Safety Service (Recommended)
Safety Service provides:
- Centralized policy enforcement
- Caching for performance
- Audit logging
- Policy version management
import httpx
class SafetyClient:
"""Client for CitadelMesh Safety Service"""
def __init__(self, safety_url: str = "http://localhost:5001"):
self.safety_url = safety_url
async def evaluate_policy(
self,
policy_path: str,
input_data: dict
) -> dict:
"""Evaluate policy via Safety Service"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.safety_url}/api/policy/evaluate",
json={
"policyPath": policy_path,
"input": input_data
}
)
return response.json()
# Use in agent
class MyAgent(BaseAgent):
def __init__(self, config):
super().__init__(config)
self.safety = SafetyClient()
async def execute_action(self, action: str, params: dict):
"""Execute with safety check"""
# Evaluate policy
result = await self.safety.evaluate_policy(
"citadel.security.door_unlock",
{
"action": action,
"agent_id": self.config.agent_id,
**params
}
)
if not result["allowed"]:
self.logger.warning(f"Denied: {result['violation_reason']}")
return {"success": False, "reason": result["violation_reason"]}
# Execute
await self._do_execute(action, params)
return {"success": True}
Method 3: BaseAgent Built-in Method
class MyAgent(BaseAgent):
async def _execute_door_control(self, state: MyState) -> MyState:
"""Execute with built-in policy check"""
for door_id in state.doors_to_unlock:
# Use BaseAgent's check_safety_policy method
allowed = await self.check_safety_policy(
"door_unlock",
{
"door_id": door_id,
"duration_seconds": 300,
"agent_id": self.config.agent_id
}
)
if allowed:
await self.mcp_client.unlock_door(door_id)
state.unlocked_doors.append(door_id)
else:
self.logger.warning(f"Policy denied unlock for {door_id}")
state.denied_doors.append(door_id)
return state
Policy Integration Patterns
Pattern 1: Pre-Action Validation
Check policy before every action:
async def _execute_setpoint_change(self, state: HVACState) -> HVACState:
"""Execute with pre-action validation"""
for change in state.setpoint_changes:
# Check policy
allowed = await self.check_policy(
"write_setpoint",
{
"entity_id": change["entity_id"],
"value": change["new_setpoint"],
"priority": 8
}
)
if allowed:
await self.mcp_client.write_setpoint(
change["entity_id"],
change["new_setpoint"]
)
change["executed"] = True
else:
change["executed"] = False
change["denial_reason"] = "Policy violation"
return state
Pattern 2: Batch Validation
Validate multiple actions together:
async def _validate_batch(self, actions: List[dict]) -> List[dict]:
"""Validate multiple actions efficiently"""
results = []
for action in actions:
result = await self.safety.evaluate_policy(
f"citadel.{action['domain']}.{action['action']}",
action['params']
)
results.append({
**action,
"allowed": result["allowed"],
"reason": result.get("violation_reason")
})
return results
Pattern 3: Policy-Aware State Machine
Integrate policy checks into state machine:
def build_graph(self):
"""Build graph with policy checks"""
workflow = StateGraph(MyState)
workflow.add_node("plan_actions", self._plan)
workflow.add_node("check_policies", self._check_all_policies)
workflow.add_node("execute_approved", self._execute_approved)
workflow.add_node("log_denials", self._log_denials)
workflow.add_edge("plan_actions", "check_policies")
workflow.add_conditional_edges(
"check_policies",
lambda s: "execute" if s.approved_actions else "log",
{
"execute": "execute_approved",
"log": "log_denials"
}
)
workflow.add_edge("execute_approved", END)
workflow.add_edge("log_denials", END)
workflow.set_entry_point("plan_actions")
return workflow.compile()
async def _check_all_policies(self, state: MyState) -> MyState:
"""Dedicated policy check node"""
state.approved_actions = []
state.denied_actions = []
for action in state.planned_actions:
allowed, reason = await self.check_policy(
action["type"],
action["params"]
)
if allowed:
state.approved_actions.append(action)
else:
state.denied_actions.append({
**action,
"denial_reason": reason
})
return state
Handling Policy Denials
Graceful Degradation
async def _execute_with_fallback(self, state: MyState) -> MyState:
"""Try primary action, fall back if denied"""
# Try aggressive cooling
allowed = await self.check_policy("set_temperature", {
"zone_id": state.zone,
"temperature": 68
})
if allowed:
await self.mcp_client.set_temperature(state.zone, 68)
state.action_taken = "aggressive_cooling"
else:
# Fall back to moderate cooling
allowed = await self.check_policy("set_temperature", {
"zone_id": state.zone,
"temperature": 70
})
if allowed:
await self.mcp_client.set_temperature(state.zone, 70)
state.action_taken = "moderate_cooling"
else:
state.action_taken = "none"
state.error = "All cooling options denied by policy"
return state
User Notification
async def _handle_policy_denial(self, action: dict, reason: str):
"""Notify operators of policy denials"""
notification = {
"level": "warning",
"message": f"Action denied by policy: {action['type']}",
"reason": reason,
"agent_id": self.config.agent_id,
"timestamp": datetime.now().isoformat(),
"action_details": action
}
# Publish to notification topic
event = CloudEventMessage(
id=f"denial-{uuid.uuid4()}",
source=f"/agents/{self.config.agent_type}",
type="citadel.policy.denial",
data=notification
)
await self.event_bus.publish("citadel.notifications", event)
Audit Trail
Every policy decision should be logged:
async def _audit_policy_decision(
self,
action: str,
params: dict,
allowed: bool,
reason: str = ""
):
"""Log policy decision for audit"""
audit_record = {
"timestamp": datetime.now().isoformat(),
"agent_id": self.config.agent_id,
"action": action,
"params": params,
"policy_decision": "ALLOW" if allowed else "DENY",
"denial_reason": reason if not allowed else None,
"policy_version": "1.0.0" # Get from OPA metadata
}
self.logger.info(f"Policy audit: {audit_record}")
# Store in audit database
# await self.audit_service.log_decision(audit_record)
Testing Policy Integration
Mock Policy Responses
@pytest.mark.asyncio
async def test_agent_with_mock_policy():
"""Test agent with mocked policy responses"""
class MockSafetyClient:
async def evaluate_policy(self, path, input_data):
# Mock: deny high temps
if input_data.get("temperature", 0) > 75:
return {
"allowed": False,
"violation_reason": "Temperature too high"
}
return {"allowed": True}
agent = MyAgent(config)
agent.safety = MockSafetyClient()
state = await agent.graph.ainvoke(initial_state)
# Verify denials
assert state.denied_actions # Should have denials
Integration Test with Real OPA
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agent_with_real_opa():
"""Test agent with real OPA instance"""
# Start OPA container
import docker
client = docker.from_env()
opa = client.containers.run(
"openpolicyagent/opa:latest",
command=["run", "--server"],
ports={"8181/tcp": 8181},
volumes={
f"{os.getcwd()}/policies": {"bind": "/policies", "mode": "ro"}
},
detach=True
)
try:
# Wait for OPA to start
await asyncio.sleep(2)
# Run agent
agent = MyAgent(config)
agent.opa_url = "http://localhost:8181"
result = await agent.execute_action("door_unlock", {
"door_id": "test-door",
"duration_seconds": 300
})
assert result["success"] == True
finally:
opa.stop()
opa.remove()
Performance Optimization
Policy Result Caching
from functools import lru_cache
import hashlib
class CachedSafetyClient:
def __init__(self):
self.cache = {}
self.cache_ttl = 60 # seconds
async def evaluate_policy(self, path, input_data):
# Create cache key
cache_key = hashlib.md5(
f"{path}:{json.dumps(input_data, sort_keys=True)}".encode()
).hexdigest()
# Check cache
if cache_key in self.cache:
cached_time, result = self.cache[cache_key]
if time.time() - cached_time < self.cache_ttl:
return result
# Call OPA
result = await self._call_opa(path, input_data)
# Update cache
self.cache[cache_key] = (time.time(), result)
return result
Async Batch Processing
async def _check_policies_batch(self, actions: List[dict]) -> List[dict]:
"""Check multiple policies in parallel"""
async def check_one(action):
result = await self.safety.evaluate_policy(
action["policy_path"],
action["input"]
)
return {**action, **result}
# Process in parallel
results = await asyncio.gather(*[
check_one(action) for action in actions
])
return results
Best Practices
- Always check policies before executing actions
- Handle denials gracefully with fallback options
- Log all policy decisions for audit trail
- Cache policy results for performance (with TTL)
- Test with both mock and real OPA
- Provide helpful denial messages to operators
- Version your policies and track in audit logs
Troubleshooting
Issue: Policy always denies
Check policy input format:
# Debug policy input
print(f"Policy input: {json.dumps(policy_input, indent=2)}")
# Test directly with OPA CLI
# opa eval -i input.json -d policies/security.rego 'data.citadel.security.allow'
Issue: OPA not reachable
async def check_policy_with_retry(self, action, params, max_retries=3):
"""Check policy with retry logic"""
for attempt in range(max_retries):
try:
return await self.check_policy(action, params)
except Exception as e:
if attempt == max_retries - 1:
self.logger.error(f"OPA unreachable after {max_retries} attempts")
# Fail safe: deny action
return False, "Policy engine unreachable"
await asyncio.sleep(1 * (attempt + 1))
Next Steps
- Testing Agents - Test policy integration
- Writing Policies - Create custom policies
- Safety Architecture - Safety framework
Safety-first agents protect your building! Continue to Testing Agents.