Agent Policy Integration
This guide shows how to integrate OPA safety policies into your CitadelMesh agents for safety-first building control.
Why Policy Integration?
Every building control action must pass OPA policy validation:
✅ Safety - Prevent dangerous operations ✅ Compliance - Enforce regulations ✅ Auditability - Log all decisions ✅ Flexibility - Update policies without code changes
Integration Architecture
Agent → Safety Service → OPA → Policy Decision
  ↓                              ↓
Execute (if allowed)         Audit Log
Policy Check Methods
Method 1: Direct OPA HTTP API
import httpx
class Agent:
    def __init__(self):
        self.opa_url = "http://localhost:8181"
    async def check_policy(self, action: str, params: dict) -> tuple[bool, str]:
        """Check OPA policy via HTTP"""
        policy_input = {
            "input": {
                "action": action,
                **params
            }
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.opa_url}/v1/data/citadel/security",
                json=policy_input
            )
            result = response.json()["result"]
            return (
                result.get("allow", False),
                result.get("violation_reason", "")
            )
    async def unlock_door(self, door_id: str, duration: int):
        """Unlock door with policy check"""
        # Check policy first
        allowed, reason = await self.check_policy("door_unlock", {
            "door_id": door_id,
            "duration_seconds": duration,
            "agent_id": self.agent_id
        })
        if not allowed:
            self.logger.warning(f"Policy denied: {reason}")
            raise PolicyViolationError(reason)
        # Execute if allowed
        await self.mcp_client.unlock_door(door_id, duration)
Method 2: Using Safety Service (Recommended)
Safety Service provides:
- Centralized policy enforcement
- Caching for performance
- Audit logging
- Policy version management
import httpx
class SafetyClient:
    """Client for CitadelMesh Safety Service"""
    def __init__(self, safety_url: str = "http://localhost:5001"):
        self.safety_url = safety_url
    async def evaluate_policy(
        self,
        policy_path: str,
        input_data: dict
    ) -> dict:
        """Evaluate policy via Safety Service"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.safety_url}/api/policy/evaluate",
                json={
                    "policyPath": policy_path,
                    "input": input_data
                }
            )
            return response.json()
# Use in agent
class MyAgent(BaseAgent):
    def __init__(self, config):
        super().__init__(config)
        self.safety = SafetyClient()
    async def execute_action(self, action: str, params: dict):
        """Execute with safety check"""
        # Evaluate policy
        result = await self.safety.evaluate_policy(
            "citadel.security.door_unlock",
            {
                "action": action,
                "agent_id": self.config.agent_id,
                **params
            }
        )
        if not result["allowed"]:
            self.logger.warning(f"Denied: {result['violation_reason']}")
            return {"success": False, "reason": result["violation_reason"]}
        # Execute
        await self._do_execute(action, params)
        return {"success": True}
Method 3: BaseAgent Built-in Method
class MyAgent(BaseAgent):
    async def _execute_door_control(self, state: MyState) -> MyState:
        """Execute with built-in policy check"""
        for door_id in state.doors_to_unlock:
            # Use BaseAgent's check_safety_policy method
            allowed = await self.check_safety_policy(
                "door_unlock",
                {
                    "door_id": door_id,
                    "duration_seconds": 300,
                    "agent_id": self.config.agent_id
                }
            )
            if allowed:
                await self.mcp_client.unlock_door(door_id)
                state.unlocked_doors.append(door_id)
            else:
                self.logger.warning(f"Policy denied unlock for {door_id}")
                state.denied_doors.append(door_id)
        return state
Policy Integration Patterns
Pattern 1: Pre-Action Validation
Check policy before every action:
async def _execute_setpoint_change(self, state: HVACState) -> HVACState:
    """Execute with pre-action validation"""
    for change in state.setpoint_changes:
        # Check policy
        allowed = await self.check_policy(
            "write_setpoint",
            {
                "entity_id": change["entity_id"],
                "value": change["new_setpoint"],
                "priority": 8
            }
        )
        if allowed:
            await self.mcp_client.write_setpoint(
                change["entity_id"],
                change["new_setpoint"]
            )
            change["executed"] = True
        else:
            change["executed"] = False
            change["denial_reason"] = "Policy violation"
    return state
Pattern 2: Batch Validation
Validate multiple actions together:
async def _validate_batch(self, actions: List[dict]) -> List[dict]:
    """Validate multiple actions efficiently"""
    results = []
    for action in actions:
        result = await self.safety.evaluate_policy(
            f"citadel.{action['domain']}.{action['action']}",
            action['params']
        )
        results.append({
            **action,
            "allowed": result["allowed"],
            "reason": result.get("violation_reason")
        })
    return results
Pattern 3: Policy-Aware State Machine
Integrate policy checks into state machine:
def build_graph(self):
    """Build graph with policy checks"""
    workflow = StateGraph(MyState)
    workflow.add_node("plan_actions", self._plan)
    workflow.add_node("check_policies", self._check_all_policies)
    workflow.add_node("execute_approved", self._execute_approved)
    workflow.add_node("log_denials", self._log_denials)
    workflow.add_edge("plan_actions", "check_policies")
    workflow.add_conditional_edges(
        "check_policies",
        lambda s: "execute" if s.approved_actions else "log",
        {
            "execute": "execute_approved",
            "log": "log_denials"
        }
    )
    workflow.add_edge("execute_approved", END)
    workflow.add_edge("log_denials", END)
    workflow.set_entry_point("plan_actions")
    return workflow.compile()
async def _check_all_policies(self, state: MyState) -> MyState:
    """Dedicated policy check node"""
    state.approved_actions = []
    state.denied_actions = []
    for action in state.planned_actions:
        allowed, reason = await self.check_policy(
            action["type"],
            action["params"]
        )
        if allowed:
            state.approved_actions.append(action)
        else:
            state.denied_actions.append({
                **action,
                "denial_reason": reason
            })
    return state
Handling Policy Denials
Graceful Degradation
async def _execute_with_fallback(self, state: MyState) -> MyState:
    """Try primary action, fall back if denied"""
    # Try aggressive cooling
    allowed = await self.check_policy("set_temperature", {
        "zone_id": state.zone,
        "temperature": 68
    })
    if allowed:
        await self.mcp_client.set_temperature(state.zone, 68)
        state.action_taken = "aggressive_cooling"
    else:
        # Fall back to moderate cooling
        allowed = await self.check_policy("set_temperature", {
            "zone_id": state.zone,
            "temperature": 70
        })
        if allowed:
            await self.mcp_client.set_temperature(state.zone, 70)
            state.action_taken = "moderate_cooling"
        else:
            state.action_taken = "none"
            state.error = "All cooling options denied by policy"
    return state
User Notification
async def _handle_policy_denial(self, action: dict, reason: str):
    """Notify operators of policy denials"""
    notification = {
        "level": "warning",
        "message": f"Action denied by policy: {action['type']}",
        "reason": reason,
        "agent_id": self.config.agent_id,
        "timestamp": datetime.now().isoformat(),
        "action_details": action
    }
    # Publish to notification topic
    event = CloudEventMessage(
        id=f"denial-{uuid.uuid4()}",
        source=f"/agents/{self.config.agent_type}",
        type="citadel.policy.denial",
        data=notification
    )
    await self.event_bus.publish("citadel.notifications", event)
Audit Trail
Every policy decision should be logged:
async def _audit_policy_decision(
    self,
    action: str,
    params: dict,
    allowed: bool,
    reason: str = ""
):
    """Log policy decision for audit"""
    audit_record = {
        "timestamp": datetime.now().isoformat(),
        "agent_id": self.config.agent_id,
        "action": action,
        "params": params,
        "policy_decision": "ALLOW" if allowed else "DENY",
        "denial_reason": reason if not allowed else None,
        "policy_version": "1.0.0"  # Get from OPA metadata
    }
    self.logger.info(f"Policy audit: {audit_record}")
    # Store in audit database
    # await self.audit_service.log_decision(audit_record)
Testing Policy Integration
Mock Policy Responses
@pytest.mark.asyncio
async def test_agent_with_mock_policy():
    """Test agent with mocked policy responses"""
    class MockSafetyClient:
        async def evaluate_policy(self, path, input_data):
            # Mock: deny high temps
            if input_data.get("temperature", 0) > 75:
                return {
                    "allowed": False,
                    "violation_reason": "Temperature too high"
                }
            return {"allowed": True}
    agent = MyAgent(config)
    agent.safety = MockSafetyClient()
    state = await agent.graph.ainvoke(initial_state)
    # Verify denials
    assert state.denied_actions  # Should have denials
Integration Test with Real OPA
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agent_with_real_opa():
    """Test agent with real OPA instance"""
    # Start OPA container
    import docker
    client = docker.from_env()
    opa = client.containers.run(
        "openpolicyagent/opa:latest",
        command=["run", "--server"],
        ports={"8181/tcp": 8181},
        volumes={
            f"{os.getcwd()}/policies": {"bind": "/policies", "mode": "ro"}
        },
        detach=True
    )
    try:
        # Wait for OPA to start
        await asyncio.sleep(2)
        # Run agent
        agent = MyAgent(config)
        agent.opa_url = "http://localhost:8181"
        result = await agent.execute_action("door_unlock", {
            "door_id": "test-door",
            "duration_seconds": 300
        })
        assert result["success"] == True
    finally:
        opa.stop()
        opa.remove()
Performance Optimization
Policy Result Caching
from functools import lru_cache
import hashlib
class CachedSafetyClient:
    def __init__(self):
        self.cache = {}
        self.cache_ttl = 60  # seconds
    async def evaluate_policy(self, path, input_data):
        # Create cache key
        cache_key = hashlib.md5(
            f"{path}:{json.dumps(input_data, sort_keys=True)}".encode()
        ).hexdigest()
        # Check cache
        if cache_key in self.cache:
            cached_time, result = self.cache[cache_key]
            if time.time() - cached_time < self.cache_ttl:
                return result
        # Call OPA
        result = await self._call_opa(path, input_data)
        # Update cache
        self.cache[cache_key] = (time.time(), result)
        return result
Async Batch Processing
async def _check_policies_batch(self, actions: List[dict]) -> List[dict]:
    """Check multiple policies in parallel"""
    async def check_one(action):
        result = await self.safety.evaluate_policy(
            action["policy_path"],
            action["input"]
        )
        return {**action, **result}
    # Process in parallel
    results = await asyncio.gather(*[
        check_one(action) for action in actions
    ])
    return results
Best Practices
- Always check policies before executing actions
- Handle denials gracefully with fallback options
- Log all policy decisions for audit trail
- Cache policy results for performance (with TTL)
- Test with both mock and real OPA
- Provide helpful denial messages to operators
- Version your policies and track in audit logs
Troubleshooting
Issue: Policy always denies
Check policy input format:
# Debug policy input
print(f"Policy input: {json.dumps(policy_input, indent=2)}")
# Test directly with OPA CLI
# opa eval -i input.json -d policies/security.rego 'data.citadel.security.allow'
Issue: OPA not reachable
async def check_policy_with_retry(self, action, params, max_retries=3):
    """Check policy with retry logic"""
    for attempt in range(max_retries):
        try:
            return await self.check_policy(action, params)
        except Exception as e:
            if attempt == max_retries - 1:
                self.logger.error(f"OPA unreachable after {max_retries} attempts")
                # Fail safe: deny action
                return False, "Policy engine unreachable"
            await asyncio.sleep(1 * (attempt + 1))
Next Steps
- Testing Agents - Test policy integration
- Writing Policies - Create custom policies
- Safety Architecture - Safety framework
Safety-first agents protect your building! Continue to Testing Agents.