Skip to main content

Creating a New Agent from Scratch

This guide walks you through creating a custom CitadelMesh agent using the BaseAgent framework and LangGraph state machines.

Prerequisites

  • Python 3.12+ environment set up
  • CitadelMesh dependencies installed
  • OPA and NATS running (via Aspire or Docker Compose)
  • Familiarity with LangGraph basics

Agent Architecture

Every CitadelMesh agent consists of:

  1. State Definition - Data structure for agent state
  2. BaseAgent Subclass - Core agent logic
  3. State Machine - LangGraph workflow
  4. Event Handlers - Process CloudEvents
  5. MCP Integration - Call vendor tools
  6. Policy Checks - OPA safety validation

Step-by-Step: HVAC Control Agent

Let's build an agent that optimizes HVAC based on occupancy and energy prices.

Step 1: Define Agent State

File: src/agents/hvac/hvac_optimizer.py

"""HVAC Optimization Agent for CitadelMesh"""

import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import asyncio
import logging

sys.path.insert(0, 'src/proto_gen')
sys.path.insert(0, 'src')

from agents.runtime.base_agent import BaseAgent, AgentConfig, AgentStatus
from agents.runtime.event_bus import CloudEventMessage
from agents.runtime.telemetry import TelemetryCollector
from langgraph.graph import StateGraph, END

# Agent-specific state
class OptimizationMode(Enum):
COMFORT = "comfort" # Prioritize comfort
EFFICIENCY = "efficiency" # Prioritize energy savings
BALANCED = "balanced" # Balance both

@dataclass
class HVACState:
"""State for HVAC optimization agent"""
# Sensor data
zones: Dict[str, Dict] = field(default_factory=dict)
occupancy_data: Dict[str, int] = field(default_factory=dict)
outdoor_temp: float = 0.0

# Energy context
electricity_price: float = 0.0 # $/kWh
peak_demand: bool = False

# Optimization results
optimization_mode: OptimizationMode = OptimizationMode.BALANCED
setpoint_changes: List[Dict] = field(default_factory=list)
energy_savings: float = 0.0

# Control flow
error: Optional[str] = None

Step 2: Implement Agent Class

class HVACOptimizer(BaseAgent):
"""HVAC Optimization Agent using LangGraph state machine"""

def __init__(self, config: AgentConfig):
super().__init__(config)
self.mcp_client = None # Will be initialized in start()
self.logger = logging.getLogger("citadel.agents.hvac_optimizer")

async def start(self):
"""Start agent and initialize MCP client"""
await super().start()

# TODO: Initialize MCP client for EcoStruxure
# self.mcp_client = await connect_mcp("ecostruxure-ebo")

self.logger.info("HVAC Optimizer agent started")

def build_graph(self) -> StateGraph:
"""Build LangGraph state machine"""

workflow = StateGraph(HVACState)

# Define nodes
workflow.add_node("collect_data", self._collect_sensor_data)
workflow.add_node("analyze", self._analyze_conditions)
workflow.add_node("optimize", self._optimize_setpoints)
workflow.add_node("check_policy", self._check_safety_policy)
workflow.add_node("execute", self._execute_changes)
workflow.add_node("audit", self._audit_log)

# Define flow
workflow.add_edge("collect_data", "analyze")
workflow.add_edge("analyze", "optimize")
workflow.add_edge("optimize", "check_policy")

# Conditional: execute if policy allows
workflow.add_conditional_edges(
"check_policy",
self._policy_decision,
{
"approved": "execute",
"denied": "audit",
}
)

workflow.add_edge("execute", "audit")
workflow.add_edge("audit", END)

# Set entry point
workflow.set_entry_point("collect_data")

return workflow.compile()

async def _collect_sensor_data(self, state: HVACState) -> HVACState:
"""Collect data from building sensors"""

if self.telemetry:
with self.telemetry.trace_span("collect_sensor_data"):
# Simulate reading from MCP adapter
# In production: state.zones = await self.mcp_client.read_all_zones()

state.zones = {
"zone-lobby": {
"current_temp": 73.0,
"setpoint": 72.0,
"mode": "cool"
},
"zone-office-1": {
"current_temp": 75.0,
"setpoint": 72.0,
"mode": "cool"
}
}

state.occupancy_data = {
"zone-lobby": 15,
"zone-office-1": 8
}

state.outdoor_temp = 85.0
state.electricity_price = 0.15 # Peak hours

self.logger.info(f"Collected data from {len(state.zones)} zones")

return state

async def _analyze_conditions(self, state: HVACState) -> HVACState:
"""Analyze conditions and determine optimization mode"""

# Determine optimization mode
if state.electricity_price > 0.20:
state.optimization_mode = OptimizationMode.EFFICIENCY
self.logger.info("High electricity prices - efficiency mode")
elif state.outdoor_temp > 95:
state.optimization_mode = OptimizationMode.COMFORT
self.logger.info("Extreme heat - comfort mode")
else:
state.optimization_mode = OptimizationMode.BALANCED
self.logger.info("Normal conditions - balanced mode")

return state

async def _optimize_setpoints(self, state: HVACState) -> HVACState:
"""Calculate optimal setpoints for each zone"""

setpoint_changes = []

for zone_id, zone_data in state.zones.items():
current_setpoint = zone_data["setpoint"]
occupancy = state.occupancy_data.get(zone_id, 0)

# Optimization logic
if state.optimization_mode == OptimizationMode.EFFICIENCY:
if occupancy == 0:
# Unoccupied: setback
new_setpoint = current_setpoint + 4
else:
# Occupied: slight increase
new_setpoint = current_setpoint + 2

elif state.optimization_mode == OptimizationMode.COMFORT:
# Maintain comfort
new_setpoint = current_setpoint

else: # BALANCED
if occupancy == 0:
new_setpoint = current_setpoint + 2
else:
new_setpoint = current_setpoint + 1

# Calculate energy savings
temp_diff = new_setpoint - current_setpoint
savings = abs(temp_diff) * 0.5 # Simplified calculation

setpoint_changes.append({
"zone_id": zone_id,
"current": current_setpoint,
"new": new_setpoint,
"estimated_savings_kwh": savings
})

state.setpoint_changes = setpoint_changes
state.energy_savings = sum(c["estimated_savings_kwh"]
for c in setpoint_changes)

self.logger.info(f"Optimized {len(setpoint_changes)} zones, "
f"est. savings: {state.energy_savings:.2f} kWh")

return state

async def _check_safety_policy(self, state: HVACState) -> HVACState:
"""Check OPA policy for each setpoint change"""

if not self.config.enable_safety_checks:
return state

# Check each proposed change against OPA policy
for change in state.setpoint_changes:
policy_input = {
"action": "write_setpoint",
"entity_id": f"hvac.{change['zone_id']}.setpoint",
"value": change['new'],
"priority": 8,
"agent_id": self.config.agent_id
}

# In production: call OPA via Safety service
# allowed = await self.check_safety_policy("hvac.setpoint", policy_input)
allowed = True # Mock for now

change["policy_approved"] = allowed

if not allowed:
self.logger.warning(f"Policy denied setpoint change for {change['zone_id']}")

return state

def _policy_decision(self, state: HVACState) -> str:
"""Determine if any changes were approved"""

approved_changes = [c for c in state.setpoint_changes
if c.get("policy_approved", False)]

if approved_changes:
# Update state to only include approved changes
state.setpoint_changes = approved_changes
return "approved"
else:
return "denied"

async def _execute_changes(self, state: HVACState) -> HVACState:
"""Execute approved setpoint changes"""

for change in state.setpoint_changes:
if change.get("policy_approved"):
# In production: await self.mcp_client.write_setpoint(...)
self.logger.info(
f"Setting {change['zone_id']} to {change['new']}°F "
f"(was {change['current']}°F)"
)

return state

async def _audit_log(self, state: HVACState) -> HVACState:
"""Log optimization results for audit trail"""

audit_record = {
"agent_id": self.config.agent_id,
"optimization_mode": state.optimization_mode.value,
"changes_proposed": len(state.setpoint_changes),
"changes_executed": sum(1 for c in state.setpoint_changes
if c.get("policy_approved")),
"estimated_savings_kwh": state.energy_savings,
"electricity_price": state.electricity_price
}

self.logger.info(f"Audit log: {audit_record}")

# In production: publish to audit topic
# await self.event_bus.publish("citadel.audit.hvac", ...)

return state

async def process_event(self, event: CloudEventMessage):
"""Process incoming CloudEvents"""

self.logger.info(f"Processing event: {event.type}")

if event.type == "citadel.hvac.optimize.request":
# Trigger optimization
initial_state = HVACState()
result = await self.graph.ainvoke(initial_state)

return {
"success": True,
"changes": result.setpoint_changes,
"savings": result.energy_savings
}

elif event.type == "citadel.sensor.occupancy.changed":
# Update occupancy and re-optimize
self.logger.info("Occupancy changed, triggering optimization")
# ... handle occupancy update

return None

Step 3: Configuration

# Agent configuration
config = AgentConfig(
agent_id="hvac-optimizer-001",
agent_type="hvac_optimizer",
spiffe_id="spiffe://citadel.local/agent/hvac-optimizer",
nats_url="nats://localhost:4222",
subscribe_topics=[
"citadel.hvac.optimize.request",
"citadel.sensor.occupancy.changed",
"citadel.energy.price.updated"
],
publish_topics=[
"citadel.hvac.setpoint.changed",
"citadel.audit.hvac"
],
enable_telemetry=True,
enable_safety_checks=True,
log_level="INFO",
metadata={
"version": "1.0.0",
"zone_coverage": ["zone-lobby", "zone-office-1", "zone-office-2"]
}
)

Step 4: Run Agent

async def main():
"""Run HVAC optimizer agent"""

# Create agent
agent = HVACOptimizer(config)

# Start agent
await agent.start()

# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await agent.stop()

if __name__ == "__main__":
import asyncio
asyncio.run(main())

Testing Your Agent

Unit Test

File: src/agents/hvac/test_hvac_optimizer.py

import pytest
from hvac_optimizer import HVACOptimizer, HVACState, OptimizationMode, AgentConfig

@pytest.mark.asyncio
async def test_efficiency_mode():
"""Test agent in efficiency mode"""

config = AgentConfig(
agent_id="test-agent",
agent_type="hvac_optimizer",
spiffe_id="spiffe://test",
enable_safety_checks=False
)

agent = HVACOptimizer(config)

# Build state
state = HVACState(
electricity_price=0.25, # High price
zones={"zone-1": {"setpoint": 72, "current_temp": 73}},
occupancy_data={"zone-1": 0} # Unoccupied
)

# Run through graph
result = await agent.graph.ainvoke(state)

# Verify
assert result.optimization_mode == OptimizationMode.EFFICIENCY
assert len(result.setpoint_changes) > 0
assert result.setpoint_changes[0]["new"] > 72 # Setback

Integration Test

@pytest.mark.asyncio
async def test_full_optimization_flow():
"""Test complete optimization flow"""

config = AgentConfig(
agent_id="test-agent",
agent_type="hvac_optimizer",
spiffe_id="spiffe://test",
nats_url="nats://localhost:4222"
)

agent = HVACOptimizer(config)
await agent.start()

# Simulate optimization request
event = CloudEventMessage(
id="test-001",
source="/test",
type="citadel.hvac.optimize.request",
data={}
)

result = await agent.process_event(event)

assert result["success"] == True
assert "changes" in result

await agent.stop()

Best Practices

1. Error Handling

async def _collect_sensor_data(self, state: HVACState) -> HVACState:
"""Collect data with error handling"""

try:
state.zones = await self.mcp_client.read_all_zones()
except Exception as e:
self.logger.error(f"Failed to read zones: {e}")
state.error = str(e)
# Return early or use fallback data

return state

2. Telemetry

async def _optimize_setpoints(self, state: HVACState) -> HVACState:
"""Optimize with telemetry"""

if self.telemetry:
with self.telemetry.trace_span("optimize_setpoints") as span:
span.set_attribute("zone_count", len(state.zones))

# ... optimization logic ...

span.set_attribute("changes_made", len(state.setpoint_changes))
span.set_attribute("savings_kwh", state.energy_savings)

return state

3. Graceful Degradation

async def _execute_changes(self, state: HVACState) -> HVACState:
"""Execute with graceful degradation"""

for change in state.setpoint_changes:
try:
await self.mcp_client.write_setpoint(
change['zone_id'],
change['new']
)
except Exception as e:
self.logger.error(f"Failed to set {change['zone_id']}: {e}")
# Continue with other zones
continue

return state

Agent Checklist

Before deploying your agent:

  • State is immutable (use dataclasses)
  • All nodes are async functions
  • Error handling in all nodes
  • Telemetry spans for observability
  • OPA policy checks for safety
  • Unit tests for state machine logic
  • Integration tests with real services
  • Logging at appropriate levels
  • Configuration via AgentConfig
  • Documentation with docstrings

Next Steps


Build intelligent, safe agents for your building! Continue to Policy Integration.