Agent Topology and Architecture
CitadelMesh uses a multi-agent architecture where specialized AI agents collaborate to manage building operations. Each agent is implemented as a LangGraph state machine with deterministic execution, event-driven communication, and built-in safety guardrails.
Agent Design Philosophy
Specialists, Not Generalists: Rather than one monolithic "building AI", we deploy focused agents with clear responsibilities:
- Single Responsibility: Each agent manages one domain (security, energy, ops)
- Event-Driven: Agents react to CloudEvents from sensors, systems, and other agents
- Stateful: LangGraph state machines maintain context across interactions
- Observable: Full tracing and replay of agent decisions
- Safe by Default: Every action validated by OPA policies
Agent Mesh Overview
graph TB
subgraph "Building Edge Agents"
Security[Security Agent<br/>Threat Detection]
Energy[Energy Agent<br/>HVAC Optimization]
Automation[Automation Agent<br/>Scenes & Schedules]
DER[DER/Grid Agent<br/>Battery & PV Control]
end
subgraph "Digital Infrastructure"
Twin[Twin Agent<br/>State Synchronization]
Compliance[Compliance Agent<br/>Policy Monitoring]
end
subgraph "Operations"
Ops[Ops Agent<br/>Incident Management]
end
subgraph "Event Bus"
Events[CloudEvents + NATS]
end
Security --> Events
Energy --> Events
Automation --> Events
DER --> Events
Twin --> Events
Compliance --> Events
Events --> Security
Events --> Energy
Events --> Automation
Events --> Ops
Events --> Twin
Events --> Compliance
Security -.Escalate.-> Ops
Energy -.Escalate.-> Ops
Automation -.Escalate.-> Ops
Twin -.State Queries.-> Security
Twin -.State Queries.-> Energy
Twin -.State Queries.-> Automation
Agent Descriptions
Security Agent
Responsibility: Physical security threat detection and response
Inputs:
- Access control events (Schneider Security Expert)
- Camera analytics (Avigilon person detection, loitering, intrusion)
- Door sensor state changes
- After-hours motion detection
Outputs:
- Security incidents (CloudEvents)
- Door lock/unlock commands (with OPA approval)
- Security officer alerts
- Camera tracking requests
State Machine:
stateDiagram-v2
[*] --> MonitorEvents: Start
MonitorEvents --> AssessThreat: Event Received
AssessThreat --> ClassifyIncident: Threat Detected
AssessThreat --> MonitorEvents: No Threat
ClassifyIncident --> PlanResponse: Incident Type Known
PlanResponse --> ExecuteActions: Response Plan Ready
ExecuteActions --> NotifyOps: Actions Complete
NotifyOps --> MonitorEvents: Continue Monitoring
ExecuteActions --> EscalateHuman: High Risk
EscalateHuman --> AwaitApproval: Escalated
AwaitApproval --> ExecuteActions: Approved
AwaitApproval --> MonitorEvents: Rejected
Example Implementation:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Optional
class SecurityAgentState(TypedDict):
"""State for Security Agent."""
events: List[SecurityEvent]
threat_level: str # low, medium, high, critical
incident_type: Optional[str]
response_plan: List[str]
context: dict
messages: List[Any]
class SecurityAgent(BaseAgent):
"""LangGraph-based security agent."""
def build_graph(self) -> StateGraph:
"""Build the security agent state machine."""
workflow = StateGraph(SecurityAgentState)
# Add nodes
workflow.add_node("assess_threat", self._assess_threat)
workflow.add_node("classify_incident", self._classify_incident)
workflow.add_node("plan_response", self._plan_response)
workflow.add_node("execute_actions", self._execute_actions)
workflow.add_node("escalate_human", self._escalate_human)
# Define flow
workflow.set_entry_point("assess_threat")
workflow.add_conditional_edges(
"assess_threat",
self._should_classify,
{
"classify": "classify_incident",
"monitor": END
}
)
workflow.add_edge("classify_incident", "plan_response")
workflow.add_conditional_edges(
"plan_response",
self._needs_escalation,
{
"escalate": "escalate_human",
"execute": "execute_actions"
}
)
workflow.add_edge("execute_actions", END)
workflow.add_edge("escalate_human", END)
return workflow.compile()
async def _assess_threat(self, state: SecurityAgentState) -> SecurityAgentState:
"""Assess threat level from incoming events."""
events = state["events"]
# Analyze events (could use LLM for complex reasoning)
threat_indicators = {
"after_hours": any(e.event_type == "after_hours_access" for e in events),
"forced_entry": any(e.event_type == "forced_entry" for e in events),
"loitering": any(e.event_type == "loitering" for e in events),
}
# Calculate threat level
if threat_indicators["forced_entry"]:
state["threat_level"] = "critical"
elif threat_indicators["after_hours"]:
state["threat_level"] = "high"
elif threat_indicators["loitering"]:
state["threat_level"] = "medium"
else:
state["threat_level"] = "low"
return state
async def _execute_actions(self, state: SecurityAgentState) -> SecurityAgentState:
"""Execute planned response actions."""
for action in state["response_plan"]:
if action == "lock_doors":
# Execute door lock via MCP adapter
await self.mcp_client.call_tool(
"securityexpert_door_lock",
{"door_id": "door.lobby.main"}
)
elif action == "alert_security":
# Publish security alert event
await self.event_bus.publish(CloudEvent(
type="citadel.security.alert",
source=self.config.spiffe_id,
data={"incident": state["incident_type"], "level": state["threat_level"]}
))
return state
Energy Agent
Responsibility: Building energy optimization and HVAC control
Inputs:
- Zone temperature telemetry (EcoStruxure EBO)
- Occupancy sensors
- Weather forecasts
- Energy tariffs and grid carbon intensity
- Demand response (DR) events
Outputs:
- HVAC setpoint adjustments (with OPA temp bounds)
- Lighting scene changes
- Pre-cooling/pre-heating schedules
- Energy KPI metrics
State Machine:
stateDiagram-v2
[*] --> ReadZoneState: Optimization Trigger
ReadZoneState --> AnalyzeConditions: State Loaded
AnalyzeConditions --> CalculateSetpoint: Comfort vs Efficiency
CalculateSetpoint --> ValidatePolicy: Setpoint Computed
ValidatePolicy --> AdjustSetpoint: Policy Allows
ValidatePolicy --> [*]: Policy Denies
AdjustSetpoint --> MonitorResult: Command Sent
MonitorResult --> [*]: Success
Example Implementation:
class EnergyAgentState(TypedDict):
"""State for Energy Agent."""
zone_id: str
current_temp: Optional[float]
target_temp: Optional[float]
occupancy: Optional[bool]
action: Optional[str]
setpoint_adjusted: bool
class EnergyAgent(BaseAgent):
"""Energy optimization agent."""
def build_graph(self) -> StateGraph:
"""Build energy agent state machine."""
workflow = StateGraph(EnergyAgentState)
workflow.add_node("read_zone_state", self._read_zone_state)
workflow.add_node("analyze_conditions", self._analyze_conditions)
workflow.add_node("calculate_optimal_setpoint", self._calculate_optimal_setpoint)
workflow.add_node("adjust_setpoint", self._adjust_setpoint)
workflow.set_entry_point("read_zone_state")
workflow.add_edge("read_zone_state", "analyze_conditions")
workflow.add_edge("analyze_conditions", "calculate_optimal_setpoint")
workflow.add_conditional_edges(
"calculate_optimal_setpoint",
self._should_adjust,
{
"adjust": "adjust_setpoint",
"skip": END
}
)
workflow.add_edge("adjust_setpoint", END)
return workflow.compile()
async def _calculate_optimal_setpoint(self, state: EnergyAgentState) -> EnergyAgentState:
"""Calculate optimal setpoint balancing comfort and energy."""
current = state["current_temp"]
occupancy = state["occupancy"]
# Comfort-first if occupied
if occupancy:
target = 72.0
else:
# Energy-saving setback when unoccupied
target = 76.0
# Adjust for weather (future: use forecast)
# Adjust for tariffs (future: shift load to off-peak)
state["target_temp"] = target
state["action"] = "adjust" if abs(current - target) > 2.0 else "skip"
return state
Automation Agent
Responsibility: Orchestrate scenes, schedules, and user-triggered actions
Inputs:
- User scene requests (mobile app, voice commands)
- Schedule triggers (morning routine, evening mode)
- Occupancy pattern detection
- Meeting room bookings
Outputs:
- Lighting scene activations (Home Assistant)
- HVAC mode changes
- Window blind adjustments
- AV system control
Twin Agent
Responsibility: Maintain synchronized digital twin of building state
Inputs:
- Telemetry from all adapters
- State change events
- Reconciliation requests
Outputs:
- Digital twin mutations (graph database)
- Derived KPIs (energy efficiency, comfort score)
- State snapshots for replay
Ops Agent
Responsibility: Human-in-the-loop operations and incident management
Inputs:
- Escalated incidents from Security/Energy/Automation agents
- User approval requests
- SLA violation alerts
Outputs:
- Approval/rejection decisions
- Incident reports
- Maintenance work orders
- SLA tracking metrics
DER/Grid Agent
Responsibility: Distributed energy resource and grid integration
Inputs:
- Battery state-of-charge
- Solar PV generation
- Grid carbon intensity signals
- Demand response events (OpenADR)
- EVSE charging requests (OCPP)
Outputs:
- Battery charge/discharge commands
- EV charging schedule optimization
- Demand limit enforcement
- DR participation confirmations
Compliance Agent
Responsibility: Continuous compliance monitoring and attestation
Inputs:
- Control actions from all agents
- Policy evaluation results
- Audit log events
Outputs:
- Compliance violation alerts
- Attestation reports (SOC 2, IEC 62443)
- Control effectiveness metrics
Base Agent Framework
All agents inherit from BaseAgent:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
@dataclass
class AgentConfig:
"""Configuration for a CitadelMesh agent."""
agent_id: str
agent_type: str
spiffe_id: str
nats_url: str = "nats://localhost:4222"
subscribe_topics: List[str] = field(default_factory=list)
publish_topics: List[str] = field(default_factory=list)
enable_telemetry: bool = True
enable_safety_checks: bool = True
log_level: str = "INFO"
class BaseAgent(ABC):
"""Base class for all CitadelMesh LangGraph agents."""
def __init__(self, config: AgentConfig):
self.config = config
self.state = AgentState()
self.logger = logging.getLogger(f"citadel.agent.{config.agent_type}")
# Event bus integration
self.event_bus: Optional[EventBus] = None
# Telemetry collector
self.telemetry: Optional[TelemetryCollector] = None
if config.enable_telemetry:
self.telemetry = TelemetryCollector(
service_name=f"agent.{config.agent_type}",
agent_id=config.agent_id
)
# LangGraph state machine
self.graph: Optional[StateGraph] = None
async def start(self):
"""Start agent and connect to infrastructure."""
# Connect to NATS event bus
self.event_bus = EventBus(
nats_url=self.config.nats_url,
agent_id=self.config.agent_id
)
await self.event_bus.connect()
# Subscribe to topics
for topic in self.config.subscribe_topics:
await self.event_bus.subscribe(topic, self._handle_event)
# Build LangGraph
self.graph = self.build_graph()
# Update status
self.state.status = AgentStatus.ACTIVE
await self._publish_state_update()
@abstractmethod
def build_graph(self) -> StateGraph:
"""Build the LangGraph state machine for this agent."""
pass
@abstractmethod
async def process_event(self, event: CloudEventMessage) -> Optional[Dict[str, Any]]:
"""Process incoming CloudEvent."""
pass
async def _handle_event(self, event: CloudEventMessage):
"""Internal event handler with telemetry."""
with self.telemetry.trace_span("handle_event") as span:
span.set_attribute("event.type", event.type)
span.set_attribute("event.source", event.source)
try:
result = await self.process_event(event)
return result
except Exception as e:
self.logger.error(f"Event processing failed: {e}")
span.record_exception(e)
raise
Inter-Agent Communication
Agents communicate via CloudEvents on the event bus:
Coordination Pattern: Escalation
# Security Agent escalates to Ops Agent
incident_event = CloudEvent(
type="citadel.security.incident",
source="spiffe://citadel.mesh/agent/security",
subject="incident.forced_entry.001",
data={
"incident_type": "forced_entry",
"threat_level": "critical",
"location": "door.lobby.main",
"actions_taken": ["lock_doors", "alert_security"],
"requires_human": True
}
)
await event_bus.publish("incidents.events.building_a", incident_event)
Coordination Pattern: State Query
# Energy Agent queries Twin Agent for zone state
async def get_zone_state(zone_id: str) -> dict:
"""Query digital twin for zone state via gRPC."""
twin_client = TwinServiceStub(grpc_channel)
request = GetEntityRequest(entity_id=zone_id)
entity = await twin_client.GetEntity(request)
return {
"temp": entity.attributes["current_temp"],
"setpoint": entity.attributes["target_setpoint"],
"occupancy": entity.attributes["occupied"]
}
Coordination Pattern: Command Chain
# Automation Agent triggers scene -> Energy Agent adjusts HVAC
scene_event = CloudEvent(
type="citadel.automation.scene.activated",
source="spiffe://citadel.mesh/agent/automation",
subject="scene.meeting_mode",
data={
"scene_id": "meeting_mode",
"zones": ["conference_room_a"],
"lighting": {"preset": "presentation"},
"hvac": {"temp_override": 70}
}
)
# Energy Agent subscribes to scene events
async def handle_scene_event(event: CloudEventMessage):
if event.type == "citadel.automation.scene.activated":
hvac_override = event.data.get("hvac", {}).get("temp_override")
if hvac_override:
await adjust_setpoint(event.subject, hvac_override)
Deterministic Execution and Replay
LangGraph state machines are deterministic and replayable:
# Record execution trace
execution_id = ulid()
execution_trace = {
"execution_id": execution_id,
"agent_id": self.config.agent_id,
"graph_version": "v1.2.0",
"initial_state": initial_state,
"events": [],
"nodes_executed": [],
"final_state": None
}
# Execute graph with tracing
result = await self.graph.ainvoke(
initial_state,
config={"callbacks": [ExecutionTracer(execution_trace)]}
)
# Store trace for replay
await store_execution_trace(execution_trace)
# Replay for debugging
replayed_result = await self.graph.ainvoke(
execution_trace["initial_state"],
config={"deterministic": True}
)
assert replayed_result == execution_trace["final_state"]
Safety Integration
Every agent action validated by OPA:
async def execute_command(self, command: Command) -> CommandResult:
"""Execute command with OPA safety check."""
# Evaluate policy
decision = await opa_client.evaluate(
policy=f"citadel.{self.config.agent_type}",
input={
"action": command.action,
"target": command.target_id,
"params": command.params,
"issued_by": self.config.spiffe_id
}
)
if not decision["allow"]:
raise PolicyViolation(decision["deny_reason"])
# Execute with safety token
command.safety_token = decision["token"]
result = await adapter.execute(command)
return result
Agent Deployment
Agents deploy as Kubernetes pods:
apiVersion: apps/v1
kind: Deployment
metadata:
name: security-agent
namespace: citadel-agents
spec:
replicas: 1
selector:
matchLabels:
app: security-agent
template:
metadata:
labels:
app: security-agent
spec:
serviceAccountName: security-agent
containers:
- name: agent
image: citadel/security-agent:v1.0.0
env:
- name: AGENT_ID
value: "security-agent-building-a"
- name: SPIFFE_ENDPOINT_SOCKET
value: "unix:///run/spire/sockets/agent.sock"
- name: NATS_URL
value: "nats://nats.citadel.svc:4222"
volumeMounts:
- name: spire-agent-socket
mountPath: /run/spire/sockets
readOnly: true
volumes:
- name: spire-agent-socket
hostPath:
path: /run/spire/sockets
Related Documentation
- Protocol Strategy - CloudEvents and event bus design
- Safety Guardrails - OPA policy integration
- Observability - Agent tracing and metrics
- Identity Foundation - SPIFFE identity for agents