Creating a New Agent from Scratch
This guide walks you through creating a custom CitadelMesh agent using the BaseAgent framework and LangGraph state machines.
Prerequisites
- Python 3.12+ environment set up
- CitadelMesh dependencies installed
- OPA and NATS running (via Aspire or Docker Compose)
- Familiarity with LangGraph basics
Agent Architecture
Every CitadelMesh agent consists of:
- State Definition - Data structure for agent state
- BaseAgent Subclass - Core agent logic
- State Machine - LangGraph workflow
- Event Handlers - Process CloudEvents
- MCP Integration - Call vendor tools
- Policy Checks - OPA safety validation
Step-by-Step: HVAC Control Agent
Let's build an agent that optimizes HVAC based on occupancy and energy prices.
Step 1: Define Agent State
File: src/agents/hvac/hvac_optimizer.py
"""HVAC Optimization Agent for CitadelMesh"""
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import asyncio
import logging
sys.path.insert(0, 'src/proto_gen')
sys.path.insert(0, 'src')
from agents.runtime.base_agent import BaseAgent, AgentConfig, AgentStatus
from agents.runtime.event_bus import CloudEventMessage
from agents.runtime.telemetry import TelemetryCollector
from langgraph.graph import StateGraph, END
# Agent-specific state
class OptimizationMode(Enum):
COMFORT = "comfort" # Prioritize comfort
EFFICIENCY = "efficiency" # Prioritize energy savings
BALANCED = "balanced" # Balance both
@dataclass
class HVACState:
"""State for HVAC optimization agent"""
# Sensor data
zones: Dict[str, Dict] = field(default_factory=dict)
occupancy_data: Dict[str, int] = field(default_factory=dict)
outdoor_temp: float = 0.0
# Energy context
electricity_price: float = 0.0 # $/kWh
peak_demand: bool = False
# Optimization results
optimization_mode: OptimizationMode = OptimizationMode.BALANCED
setpoint_changes: List[Dict] = field(default_factory=list)
energy_savings: float = 0.0
# Control flow
error: Optional[str] = None
Step 2: Implement Agent Class
class HVACOptimizer(BaseAgent):
"""HVAC Optimization Agent using LangGraph state machine"""
def __init__(self, config: AgentConfig):
super().__init__(config)
self.mcp_client = None # Will be initialized in start()
self.logger = logging.getLogger("citadel.agents.hvac_optimizer")
async def start(self):
"""Start agent and initialize MCP client"""
await super().start()
# TODO: Initialize MCP client for EcoStruxure
# self.mcp_client = await connect_mcp("ecostruxure-ebo")
self.logger.info("HVAC Optimizer agent started")
def build_graph(self) -> StateGraph:
"""Build LangGraph state machine"""
workflow = StateGraph(HVACState)
# Define nodes
workflow.add_node("collect_data", self._collect_sensor_data)
workflow.add_node("analyze", self._analyze_conditions)
workflow.add_node("optimize", self._optimize_setpoints)
workflow.add_node("check_policy", self._check_safety_policy)
workflow.add_node("execute", self._execute_changes)
workflow.add_node("audit", self._audit_log)
# Define flow
workflow.add_edge("collect_data", "analyze")
workflow.add_edge("analyze", "optimize")
workflow.add_edge("optimize", "check_policy")
# Conditional: execute if policy allows
workflow.add_conditional_edges(
"check_policy",
self._policy_decision,
{
"approved": "execute",
"denied": "audit",
}
)
workflow.add_edge("execute", "audit")
workflow.add_edge("audit", END)
# Set entry point
workflow.set_entry_point("collect_data")
return workflow.compile()
async def _collect_sensor_data(self, state: HVACState) -> HVACState:
"""Collect data from building sensors"""
if self.telemetry:
with self.telemetry.trace_span("collect_sensor_data"):
# Simulate reading from MCP adapter
# In production: state.zones = await self.mcp_client.read_all_zones()
state.zones = {
"zone-lobby": {
"current_temp": 73.0,
"setpoint": 72.0,
"mode": "cool"
},
"zone-office-1": {
"current_temp": 75.0,
"setpoint": 72.0,
"mode": "cool"
}
}
state.occupancy_data = {
"zone-lobby": 15,
"zone-office-1": 8
}
state.outdoor_temp = 85.0
state.electricity_price = 0.15 # Peak hours
self.logger.info(f"Collected data from {len(state.zones)} zones")
return state
async def _analyze_conditions(self, state: HVACState) -> HVACState:
"""Analyze conditions and determine optimization mode"""
# Determine optimization mode
if state.electricity_price > 0.20:
state.optimization_mode = OptimizationMode.EFFICIENCY
self.logger.info("High electricity prices - efficiency mode")
elif state.outdoor_temp > 95:
state.optimization_mode = OptimizationMode.COMFORT
self.logger.info("Extreme heat - comfort mode")
else:
state.optimization_mode = OptimizationMode.BALANCED
self.logger.info("Normal conditions - balanced mode")
return state
async def _optimize_setpoints(self, state: HVACState) -> HVACState:
"""Calculate optimal setpoints for each zone"""
setpoint_changes = []
for zone_id, zone_data in state.zones.items():
current_setpoint = zone_data["setpoint"]
occupancy = state.occupancy_data.get(zone_id, 0)
# Optimization logic
if state.optimization_mode == OptimizationMode.EFFICIENCY:
if occupancy == 0:
# Unoccupied: setback
new_setpoint = current_setpoint + 4
else:
# Occupied: slight increase
new_setpoint = current_setpoint + 2
elif state.optimization_mode == OptimizationMode.COMFORT:
# Maintain comfort
new_setpoint = current_setpoint
else: # BALANCED
if occupancy == 0:
new_setpoint = current_setpoint + 2
else:
new_setpoint = current_setpoint + 1
# Calculate energy savings
temp_diff = new_setpoint - current_setpoint
savings = abs(temp_diff) * 0.5 # Simplified calculation
setpoint_changes.append({
"zone_id": zone_id,
"current": current_setpoint,
"new": new_setpoint,
"estimated_savings_kwh": savings
})
state.setpoint_changes = setpoint_changes
state.energy_savings = sum(c["estimated_savings_kwh"]
for c in setpoint_changes)
self.logger.info(f"Optimized {len(setpoint_changes)} zones, "
f"est. savings: {state.energy_savings:.2f} kWh")
return state
async def _check_safety_policy(self, state: HVACState) -> HVACState:
"""Check OPA policy for each setpoint change"""
if not self.config.enable_safety_checks:
return state
# Check each proposed change against OPA policy
for change in state.setpoint_changes:
policy_input = {
"action": "write_setpoint",
"entity_id": f"hvac.{change['zone_id']}.setpoint",
"value": change['new'],
"priority": 8,
"agent_id": self.config.agent_id
}
# In production: call OPA via Safety service
# allowed = await self.check_safety_policy("hvac.setpoint", policy_input)
allowed = True # Mock for now
change["policy_approved"] = allowed
if not allowed:
self.logger.warning(f"Policy denied setpoint change for {change['zone_id']}")
return state
def _policy_decision(self, state: HVACState) -> str:
"""Determine if any changes were approved"""
approved_changes = [c for c in state.setpoint_changes
if c.get("policy_approved", False)]
if approved_changes:
# Update state to only include approved changes
state.setpoint_changes = approved_changes
return "approved"
else:
return "denied"
async def _execute_changes(self, state: HVACState) -> HVACState:
"""Execute approved setpoint changes"""
for change in state.setpoint_changes:
if change.get("policy_approved"):
# In production: await self.mcp_client.write_setpoint(...)
self.logger.info(
f"Setting {change['zone_id']} to {change['new']}°F "
f"(was {change['current']}°F)"
)
return state
async def _audit_log(self, state: HVACState) -> HVACState:
"""Log optimization results for audit trail"""
audit_record = {
"agent_id": self.config.agent_id,
"optimization_mode": state.optimization_mode.value,
"changes_proposed": len(state.setpoint_changes),
"changes_executed": sum(1 for c in state.setpoint_changes
if c.get("policy_approved")),
"estimated_savings_kwh": state.energy_savings,
"electricity_price": state.electricity_price
}
self.logger.info(f"Audit log: {audit_record}")
# In production: publish to audit topic
# await self.event_bus.publish("citadel.audit.hvac", ...)
return state
async def process_event(self, event: CloudEventMessage):
"""Process incoming CloudEvents"""
self.logger.info(f"Processing event: {event.type}")
if event.type == "citadel.hvac.optimize.request":
# Trigger optimization
initial_state = HVACState()
result = await self.graph.ainvoke(initial_state)
return {
"success": True,
"changes": result.setpoint_changes,
"savings": result.energy_savings
}
elif event.type == "citadel.sensor.occupancy.changed":
# Update occupancy and re-optimize
self.logger.info("Occupancy changed, triggering optimization")
# ... handle occupancy update
return None
Step 3: Configuration
# Agent configuration
config = AgentConfig(
agent_id="hvac-optimizer-001",
agent_type="hvac_optimizer",
spiffe_id="spiffe://citadel.local/agent/hvac-optimizer",
nats_url="nats://localhost:4222",
subscribe_topics=[
"citadel.hvac.optimize.request",
"citadel.sensor.occupancy.changed",
"citadel.energy.price.updated"
],
publish_topics=[
"citadel.hvac.setpoint.changed",
"citadel.audit.hvac"
],
enable_telemetry=True,
enable_safety_checks=True,
log_level="INFO",
metadata={
"version": "1.0.0",
"zone_coverage": ["zone-lobby", "zone-office-1", "zone-office-2"]
}
)
Step 4: Run Agent
async def main():
"""Run HVAC optimizer agent"""
# Create agent
agent = HVACOptimizer(config)
# Start agent
await agent.start()
# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await agent.stop()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Testing Your Agent
Unit Test
File: src/agents/hvac/test_hvac_optimizer.py
import pytest
from hvac_optimizer import HVACOptimizer, HVACState, OptimizationMode, AgentConfig
@pytest.mark.asyncio
async def test_efficiency_mode():
"""Test agent in efficiency mode"""
config = AgentConfig(
agent_id="test-agent",
agent_type="hvac_optimizer",
spiffe_id="spiffe://test",
enable_safety_checks=False
)
agent = HVACOptimizer(config)
# Build state
state = HVACState(
electricity_price=0.25, # High price
zones={"zone-1": {"setpoint": 72, "current_temp": 73}},
occupancy_data={"zone-1": 0} # Unoccupied
)
# Run through graph
result = await agent.graph.ainvoke(state)
# Verify
assert result.optimization_mode == OptimizationMode.EFFICIENCY
assert len(result.setpoint_changes) > 0
assert result.setpoint_changes[0]["new"] > 72 # Setback
Integration Test
@pytest.mark.asyncio
async def test_full_optimization_flow():
"""Test complete optimization flow"""
config = AgentConfig(
agent_id="test-agent",
agent_type="hvac_optimizer",
spiffe_id="spiffe://test",
nats_url="nats://localhost:4222"
)
agent = HVACOptimizer(config)
await agent.start()
# Simulate optimization request
event = CloudEventMessage(
id="test-001",
source="/test",
type="citadel.hvac.optimize.request",
data={}
)
result = await agent.process_event(event)
assert result["success"] == True
assert "changes" in result
await agent.stop()
Best Practices
1. Error Handling
async def _collect_sensor_data(self, state: HVACState) -> HVACState:
"""Collect data with error handling"""
try:
state.zones = await self.mcp_client.read_all_zones()
except Exception as e:
self.logger.error(f"Failed to read zones: {e}")
state.error = str(e)
# Return early or use fallback data
return state
2. Telemetry
async def _optimize_setpoints(self, state: HVACState) -> HVACState:
"""Optimize with telemetry"""
if self.telemetry:
with self.telemetry.trace_span("optimize_setpoints") as span:
span.set_attribute("zone_count", len(state.zones))
# ... optimization logic ...
span.set_attribute("changes_made", len(state.setpoint_changes))
span.set_attribute("savings_kwh", state.energy_savings)
return state
3. Graceful Degradation
async def _execute_changes(self, state: HVACState) -> HVACState:
"""Execute with graceful degradation"""
for change in state.setpoint_changes:
try:
await self.mcp_client.write_setpoint(
change['zone_id'],
change['new']
)
except Exception as e:
self.logger.error(f"Failed to set {change['zone_id']}: {e}")
# Continue with other zones
continue
return state
Agent Checklist
Before deploying your agent:
- State is immutable (use dataclasses)
- All nodes are async functions
- Error handling in all nodes
- Telemetry spans for observability
- OPA policy checks for safety
- Unit tests for state machine logic
- Integration tests with real services
- Logging at appropriate levels
- Configuration via AgentConfig
- Documentation with docstrings
Next Steps
- Policy Integration - Connect to OPA policies
- Testing Agents - Comprehensive testing guide
- MCP Integration - Call vendor tools
- Production Deployment - Deploy to K3s
Build intelligent, safe agents for your building! Continue to Policy Integration.