Creating a New Agent from Scratch
This guide walks you through creating a custom CitadelMesh agent using the BaseAgent framework and LangGraph state machines.
Prerequisites
- Python 3.12+ environment set up
- CitadelMesh dependencies installed
- OPA and NATS running (via Aspire or Docker Compose)
- Familiarity with LangGraph basics
Agent Architecture
Every CitadelMesh agent consists of:
- State Definition - Data structure for agent state
- BaseAgent Subclass - Core agent logic
- State Machine - LangGraph workflow
- Event Handlers - Process CloudEvents
- MCP Integration - Call vendor tools
- Policy Checks - OPA safety validation
Step-by-Step: HVAC Control Agent
Let's build an agent that optimizes HVAC based on occupancy and energy prices.
Step 1: Define Agent State
File: src/agents/hvac/hvac_optimizer.py
"""HVAC Optimization Agent for CitadelMesh"""
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import asyncio
import logging
sys.path.insert(0, 'src/proto_gen')
sys.path.insert(0, 'src')
from agents.runtime.base_agent import BaseAgent, AgentConfig, AgentStatus
from agents.runtime.event_bus import CloudEventMessage
from agents.runtime.telemetry import TelemetryCollector
from langgraph.graph import StateGraph, END
# Agent-specific state
class OptimizationMode(Enum):
    COMFORT = "comfort"           # Prioritize comfort
    EFFICIENCY = "efficiency"     # Prioritize energy savings
    BALANCED = "balanced"         # Balance both
@dataclass
class HVACState:
    """State for HVAC optimization agent"""
    # Sensor data
    zones: Dict[str, Dict] = field(default_factory=dict)
    occupancy_data: Dict[str, int] = field(default_factory=dict)
    outdoor_temp: float = 0.0
    # Energy context
    electricity_price: float = 0.0  # $/kWh
    peak_demand: bool = False
    # Optimization results
    optimization_mode: OptimizationMode = OptimizationMode.BALANCED
    setpoint_changes: List[Dict] = field(default_factory=list)
    energy_savings: float = 0.0
    # Control flow
    error: Optional[str] = None
Step 2: Implement Agent Class
class HVACOptimizer(BaseAgent):
    """HVAC Optimization Agent using LangGraph state machine"""
    def __init__(self, config: AgentConfig):
        super().__init__(config)
        self.mcp_client = None  # Will be initialized in start()
        self.logger = logging.getLogger("citadel.agents.hvac_optimizer")
    async def start(self):
        """Start agent and initialize MCP client"""
        await super().start()
        # TODO: Initialize MCP client for EcoStruxure
        # self.mcp_client = await connect_mcp("ecostruxure-ebo")
        self.logger.info("HVAC Optimizer agent started")
    def build_graph(self) -> StateGraph:
        """Build LangGraph state machine"""
        workflow = StateGraph(HVACState)
        # Define nodes
        workflow.add_node("collect_data", self._collect_sensor_data)
        workflow.add_node("analyze", self._analyze_conditions)
        workflow.add_node("optimize", self._optimize_setpoints)
        workflow.add_node("check_policy", self._check_safety_policy)
        workflow.add_node("execute", self._execute_changes)
        workflow.add_node("audit", self._audit_log)
        # Define flow
        workflow.add_edge("collect_data", "analyze")
        workflow.add_edge("analyze", "optimize")
        workflow.add_edge("optimize", "check_policy")
        # Conditional: execute if policy allows
        workflow.add_conditional_edges(
            "check_policy",
            self._policy_decision,
            {
                "approved": "execute",
                "denied": "audit",
            }
        )
        workflow.add_edge("execute", "audit")
        workflow.add_edge("audit", END)
        # Set entry point
        workflow.set_entry_point("collect_data")
        return workflow.compile()
    async def _collect_sensor_data(self, state: HVACState) -> HVACState:
        """Collect data from building sensors"""
        if self.telemetry:
            with self.telemetry.trace_span("collect_sensor_data"):
                # Simulate reading from MCP adapter
                # In production: state.zones = await self.mcp_client.read_all_zones()
                state.zones = {
                    "zone-lobby": {
                        "current_temp": 73.0,
                        "setpoint": 72.0,
                        "mode": "cool"
                    },
                    "zone-office-1": {
                        "current_temp": 75.0,
                        "setpoint": 72.0,
                        "mode": "cool"
                    }
                }
                state.occupancy_data = {
                    "zone-lobby": 15,
                    "zone-office-1": 8
                }
                state.outdoor_temp = 85.0
                state.electricity_price = 0.15  # Peak hours
                self.logger.info(f"Collected data from {len(state.zones)} zones")
        return state
    async def _analyze_conditions(self, state: HVACState) -> HVACState:
        """Analyze conditions and determine optimization mode"""
        # Determine optimization mode
        if state.electricity_price > 0.20:
            state.optimization_mode = OptimizationMode.EFFICIENCY
            self.logger.info("High electricity prices - efficiency mode")
        elif state.outdoor_temp > 95:
            state.optimization_mode = OptimizationMode.COMFORT
            self.logger.info("Extreme heat - comfort mode")
        else:
            state.optimization_mode = OptimizationMode.BALANCED
            self.logger.info("Normal conditions - balanced mode")
        return state
    async def _optimize_setpoints(self, state: HVACState) -> HVACState:
        """Calculate optimal setpoints for each zone"""
        setpoint_changes = []
        for zone_id, zone_data in state.zones.items():
            current_setpoint = zone_data["setpoint"]
            occupancy = state.occupancy_data.get(zone_id, 0)
            # Optimization logic
            if state.optimization_mode == OptimizationMode.EFFICIENCY:
                if occupancy == 0:
                    # Unoccupied: setback
                    new_setpoint = current_setpoint + 4
                else:
                    # Occupied: slight increase
                    new_setpoint = current_setpoint + 2
            elif state.optimization_mode == OptimizationMode.COMFORT:
                # Maintain comfort
                new_setpoint = current_setpoint
            else:  # BALANCED
                if occupancy == 0:
                    new_setpoint = current_setpoint + 2
                else:
                    new_setpoint = current_setpoint + 1
            # Calculate energy savings
            temp_diff = new_setpoint - current_setpoint
            savings = abs(temp_diff) * 0.5  # Simplified calculation
            setpoint_changes.append({
                "zone_id": zone_id,
                "current": current_setpoint,
                "new": new_setpoint,
                "estimated_savings_kwh": savings
            })
        state.setpoint_changes = setpoint_changes
        state.energy_savings = sum(c["estimated_savings_kwh"]
                                    for c in setpoint_changes)
        self.logger.info(f"Optimized {len(setpoint_changes)} zones, "
                        f"est. savings: {state.energy_savings:.2f} kWh")
        return state
    async def _check_safety_policy(self, state: HVACState) -> HVACState:
        """Check OPA policy for each setpoint change"""
        if not self.config.enable_safety_checks:
            return state
        # Check each proposed change against OPA policy
        for change in state.setpoint_changes:
            policy_input = {
                "action": "write_setpoint",
                "entity_id": f"hvac.{change['zone_id']}.setpoint",
                "value": change['new'],
                "priority": 8,
                "agent_id": self.config.agent_id
            }
            # In production: call OPA via Safety service
            # allowed = await self.check_safety_policy("hvac.setpoint", policy_input)
            allowed = True  # Mock for now
            change["policy_approved"] = allowed
            if not allowed:
                self.logger.warning(f"Policy denied setpoint change for {change['zone_id']}")
        return state
    def _policy_decision(self, state: HVACState) -> str:
        """Determine if any changes were approved"""
        approved_changes = [c for c in state.setpoint_changes
                           if c.get("policy_approved", False)]
        if approved_changes:
            # Update state to only include approved changes
            state.setpoint_changes = approved_changes
            return "approved"
        else:
            return "denied"
    async def _execute_changes(self, state: HVACState) -> HVACState:
        """Execute approved setpoint changes"""
        for change in state.setpoint_changes:
            if change.get("policy_approved"):
                # In production: await self.mcp_client.write_setpoint(...)
                self.logger.info(
                    f"Setting {change['zone_id']} to {change['new']}°F "
                    f"(was {change['current']}°F)"
                )
        return state
    async def _audit_log(self, state: HVACState) -> HVACState:
        """Log optimization results for audit trail"""
        audit_record = {
            "agent_id": self.config.agent_id,
            "optimization_mode": state.optimization_mode.value,
            "changes_proposed": len(state.setpoint_changes),
            "changes_executed": sum(1 for c in state.setpoint_changes
                                   if c.get("policy_approved")),
            "estimated_savings_kwh": state.energy_savings,
            "electricity_price": state.electricity_price
        }
        self.logger.info(f"Audit log: {audit_record}")
        # In production: publish to audit topic
        # await self.event_bus.publish("citadel.audit.hvac", ...)
        return state
    async def process_event(self, event: CloudEventMessage):
        """Process incoming CloudEvents"""
        self.logger.info(f"Processing event: {event.type}")
        if event.type == "citadel.hvac.optimize.request":
            # Trigger optimization
            initial_state = HVACState()
            result = await self.graph.ainvoke(initial_state)
            return {
                "success": True,
                "changes": result.setpoint_changes,
                "savings": result.energy_savings
            }
        elif event.type == "citadel.sensor.occupancy.changed":
            # Update occupancy and re-optimize
            self.logger.info("Occupancy changed, triggering optimization")
            # ... handle occupancy update
        return None
Step 3: Configuration
# Agent configuration
config = AgentConfig(
    agent_id="hvac-optimizer-001",
    agent_type="hvac_optimizer",
    spiffe_id="spiffe://citadel.local/agent/hvac-optimizer",
    nats_url="nats://localhost:4222",
    subscribe_topics=[
        "citadel.hvac.optimize.request",
        "citadel.sensor.occupancy.changed",
        "citadel.energy.price.updated"
    ],
    publish_topics=[
        "citadel.hvac.setpoint.changed",
        "citadel.audit.hvac"
    ],
    enable_telemetry=True,
    enable_safety_checks=True,
    log_level="INFO",
    metadata={
        "version": "1.0.0",
        "zone_coverage": ["zone-lobby", "zone-office-1", "zone-office-2"]
    }
)
Step 4: Run Agent
async def main():
    """Run HVAC optimizer agent"""
    # Create agent
    agent = HVACOptimizer(config)
    # Start agent
    await agent.start()
    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await agent.stop()
if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
Testing Your Agent
Unit Test
File: src/agents/hvac/test_hvac_optimizer.py
import pytest
from hvac_optimizer import HVACOptimizer, HVACState, OptimizationMode, AgentConfig
@pytest.mark.asyncio
async def test_efficiency_mode():
    """Test agent in efficiency mode"""
    config = AgentConfig(
        agent_id="test-agent",
        agent_type="hvac_optimizer",
        spiffe_id="spiffe://test",
        enable_safety_checks=False
    )
    agent = HVACOptimizer(config)
    # Build state
    state = HVACState(
        electricity_price=0.25,  # High price
        zones={"zone-1": {"setpoint": 72, "current_temp": 73}},
        occupancy_data={"zone-1": 0}  # Unoccupied
    )
    # Run through graph
    result = await agent.graph.ainvoke(state)
    # Verify
    assert result.optimization_mode == OptimizationMode.EFFICIENCY
    assert len(result.setpoint_changes) > 0
    assert result.setpoint_changes[0]["new"] > 72  # Setback
Integration Test
@pytest.mark.asyncio
async def test_full_optimization_flow():
    """Test complete optimization flow"""
    config = AgentConfig(
        agent_id="test-agent",
        agent_type="hvac_optimizer",
        spiffe_id="spiffe://test",
        nats_url="nats://localhost:4222"
    )
    agent = HVACOptimizer(config)
    await agent.start()
    # Simulate optimization request
    event = CloudEventMessage(
        id="test-001",
        source="/test",
        type="citadel.hvac.optimize.request",
        data={}
    )
    result = await agent.process_event(event)
    assert result["success"] == True
    assert "changes" in result
    await agent.stop()
Best Practices
1. Error Handling
async def _collect_sensor_data(self, state: HVACState) -> HVACState:
    """Collect data with error handling"""
    try:
        state.zones = await self.mcp_client.read_all_zones()
    except Exception as e:
        self.logger.error(f"Failed to read zones: {e}")
        state.error = str(e)
        # Return early or use fallback data
    return state
2. Telemetry
async def _optimize_setpoints(self, state: HVACState) -> HVACState:
    """Optimize with telemetry"""
    if self.telemetry:
        with self.telemetry.trace_span("optimize_setpoints") as span:
            span.set_attribute("zone_count", len(state.zones))
            # ... optimization logic ...
            span.set_attribute("changes_made", len(state.setpoint_changes))
            span.set_attribute("savings_kwh", state.energy_savings)
    return state
3. Graceful Degradation
async def _execute_changes(self, state: HVACState) -> HVACState:
    """Execute with graceful degradation"""
    for change in state.setpoint_changes:
        try:
            await self.mcp_client.write_setpoint(
                change['zone_id'],
                change['new']
            )
        except Exception as e:
            self.logger.error(f"Failed to set {change['zone_id']}: {e}")
            # Continue with other zones
            continue
    return state
Agent Checklist
Before deploying your agent:
- State is immutable (use dataclasses)
- All nodes are async functions
- Error handling in all nodes
- Telemetry spans for observability
- OPA policy checks for safety
- Unit tests for state machine logic
- Integration tests with real services
- Logging at appropriate levels
- Configuration via AgentConfig
- Documentation with docstrings
Next Steps
- Policy Integration - Connect to OPA policies
- Testing Agents - Comprehensive testing guide
- MCP Integration - Call vendor tools
- Production Deployment - Deploy to K3s
Build intelligent, safe agents for your building! Continue to Policy Integration.