Skip to main content

Agent Topology and Architecture

CitadelMesh uses a multi-agent architecture where specialized AI agents collaborate to manage building operations. Each agent is implemented as a LangGraph state machine with deterministic execution, event-driven communication, and built-in safety guardrails.

Agent Design Philosophy

Specialists, Not Generalists: Rather than one monolithic "building AI", we deploy focused agents with clear responsibilities:

  • Single Responsibility: Each agent manages one domain (security, energy, ops)
  • Event-Driven: Agents react to CloudEvents from sensors, systems, and other agents
  • Stateful: LangGraph state machines maintain context across interactions
  • Observable: Full tracing and replay of agent decisions
  • Safe by Default: Every action validated by OPA policies

Agent Mesh Overview

graph TB
subgraph "Building Edge Agents"
Security[Security Agent<br/>Threat Detection]
Energy[Energy Agent<br/>HVAC Optimization]
Automation[Automation Agent<br/>Scenes & Schedules]
DER[DER/Grid Agent<br/>Battery & PV Control]
end

subgraph "Digital Infrastructure"
Twin[Twin Agent<br/>State Synchronization]
Compliance[Compliance Agent<br/>Policy Monitoring]
end

subgraph "Operations"
Ops[Ops Agent<br/>Incident Management]
end

subgraph "Event Bus"
Events[CloudEvents + NATS]
end

Security --> Events
Energy --> Events
Automation --> Events
DER --> Events
Twin --> Events
Compliance --> Events

Events --> Security
Events --> Energy
Events --> Automation
Events --> Ops
Events --> Twin
Events --> Compliance

Security -.Escalate.-> Ops
Energy -.Escalate.-> Ops
Automation -.Escalate.-> Ops

Twin -.State Queries.-> Security
Twin -.State Queries.-> Energy
Twin -.State Queries.-> Automation

Agent Descriptions

Security Agent

Responsibility: Physical security threat detection and response

Inputs:

  • Access control events (Schneider Security Expert)
  • Camera analytics (Avigilon person detection, loitering, intrusion)
  • Door sensor state changes
  • After-hours motion detection

Outputs:

  • Security incidents (CloudEvents)
  • Door lock/unlock commands (with OPA approval)
  • Security officer alerts
  • Camera tracking requests

State Machine:

stateDiagram-v2
[*] --> MonitorEvents: Start

MonitorEvents --> AssessThreat: Event Received
AssessThreat --> ClassifyIncident: Threat Detected
AssessThreat --> MonitorEvents: No Threat

ClassifyIncident --> PlanResponse: Incident Type Known
PlanResponse --> ExecuteActions: Response Plan Ready

ExecuteActions --> NotifyOps: Actions Complete
NotifyOps --> MonitorEvents: Continue Monitoring

ExecuteActions --> EscalateHuman: High Risk
EscalateHuman --> AwaitApproval: Escalated
AwaitApproval --> ExecuteActions: Approved
AwaitApproval --> MonitorEvents: Rejected

Example Implementation:

from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Optional

class SecurityAgentState(TypedDict):
"""State for Security Agent."""
events: List[SecurityEvent]
threat_level: str # low, medium, high, critical
incident_type: Optional[str]
response_plan: List[str]
context: dict
messages: List[Any]

class SecurityAgent(BaseAgent):
"""LangGraph-based security agent."""

def build_graph(self) -> StateGraph:
"""Build the security agent state machine."""
workflow = StateGraph(SecurityAgentState)

# Add nodes
workflow.add_node("assess_threat", self._assess_threat)
workflow.add_node("classify_incident", self._classify_incident)
workflow.add_node("plan_response", self._plan_response)
workflow.add_node("execute_actions", self._execute_actions)
workflow.add_node("escalate_human", self._escalate_human)

# Define flow
workflow.set_entry_point("assess_threat")
workflow.add_conditional_edges(
"assess_threat",
self._should_classify,
{
"classify": "classify_incident",
"monitor": END
}
)
workflow.add_edge("classify_incident", "plan_response")
workflow.add_conditional_edges(
"plan_response",
self._needs_escalation,
{
"escalate": "escalate_human",
"execute": "execute_actions"
}
)
workflow.add_edge("execute_actions", END)
workflow.add_edge("escalate_human", END)

return workflow.compile()

async def _assess_threat(self, state: SecurityAgentState) -> SecurityAgentState:
"""Assess threat level from incoming events."""
events = state["events"]

# Analyze events (could use LLM for complex reasoning)
threat_indicators = {
"after_hours": any(e.event_type == "after_hours_access" for e in events),
"forced_entry": any(e.event_type == "forced_entry" for e in events),
"loitering": any(e.event_type == "loitering" for e in events),
}

# Calculate threat level
if threat_indicators["forced_entry"]:
state["threat_level"] = "critical"
elif threat_indicators["after_hours"]:
state["threat_level"] = "high"
elif threat_indicators["loitering"]:
state["threat_level"] = "medium"
else:
state["threat_level"] = "low"

return state

async def _execute_actions(self, state: SecurityAgentState) -> SecurityAgentState:
"""Execute planned response actions."""
for action in state["response_plan"]:
if action == "lock_doors":
# Execute door lock via MCP adapter
await self.mcp_client.call_tool(
"securityexpert_door_lock",
{"door_id": "door.lobby.main"}
)

elif action == "alert_security":
# Publish security alert event
await self.event_bus.publish(CloudEvent(
type="citadel.security.alert",
source=self.config.spiffe_id,
data={"incident": state["incident_type"], "level": state["threat_level"]}
))

return state

Energy Agent

Responsibility: Building energy optimization and HVAC control

Inputs:

  • Zone temperature telemetry (EcoStruxure EBO)
  • Occupancy sensors
  • Weather forecasts
  • Energy tariffs and grid carbon intensity
  • Demand response (DR) events

Outputs:

  • HVAC setpoint adjustments (with OPA temp bounds)
  • Lighting scene changes
  • Pre-cooling/pre-heating schedules
  • Energy KPI metrics

State Machine:

stateDiagram-v2
[*] --> ReadZoneState: Optimization Trigger

ReadZoneState --> AnalyzeConditions: State Loaded
AnalyzeConditions --> CalculateSetpoint: Comfort vs Efficiency

CalculateSetpoint --> ValidatePolicy: Setpoint Computed
ValidatePolicy --> AdjustSetpoint: Policy Allows
ValidatePolicy --> [*]: Policy Denies

AdjustSetpoint --> MonitorResult: Command Sent
MonitorResult --> [*]: Success

Example Implementation:

class EnergyAgentState(TypedDict):
"""State for Energy Agent."""
zone_id: str
current_temp: Optional[float]
target_temp: Optional[float]
occupancy: Optional[bool]
action: Optional[str]
setpoint_adjusted: bool

class EnergyAgent(BaseAgent):
"""Energy optimization agent."""

def build_graph(self) -> StateGraph:
"""Build energy agent state machine."""
workflow = StateGraph(EnergyAgentState)

workflow.add_node("read_zone_state", self._read_zone_state)
workflow.add_node("analyze_conditions", self._analyze_conditions)
workflow.add_node("calculate_optimal_setpoint", self._calculate_optimal_setpoint)
workflow.add_node("adjust_setpoint", self._adjust_setpoint)

workflow.set_entry_point("read_zone_state")
workflow.add_edge("read_zone_state", "analyze_conditions")
workflow.add_edge("analyze_conditions", "calculate_optimal_setpoint")
workflow.add_conditional_edges(
"calculate_optimal_setpoint",
self._should_adjust,
{
"adjust": "adjust_setpoint",
"skip": END
}
)
workflow.add_edge("adjust_setpoint", END)

return workflow.compile()

async def _calculate_optimal_setpoint(self, state: EnergyAgentState) -> EnergyAgentState:
"""Calculate optimal setpoint balancing comfort and energy."""
current = state["current_temp"]
occupancy = state["occupancy"]

# Comfort-first if occupied
if occupancy:
target = 72.0
else:
# Energy-saving setback when unoccupied
target = 76.0

# Adjust for weather (future: use forecast)
# Adjust for tariffs (future: shift load to off-peak)

state["target_temp"] = target
state["action"] = "adjust" if abs(current - target) > 2.0 else "skip"

return state

Automation Agent

Responsibility: Orchestrate scenes, schedules, and user-triggered actions

Inputs:

  • User scene requests (mobile app, voice commands)
  • Schedule triggers (morning routine, evening mode)
  • Occupancy pattern detection
  • Meeting room bookings

Outputs:

  • Lighting scene activations (Home Assistant)
  • HVAC mode changes
  • Window blind adjustments
  • AV system control

Twin Agent

Responsibility: Maintain synchronized digital twin of building state

Inputs:

  • Telemetry from all adapters
  • State change events
  • Reconciliation requests

Outputs:

  • Digital twin mutations (graph database)
  • Derived KPIs (energy efficiency, comfort score)
  • State snapshots for replay

Ops Agent

Responsibility: Human-in-the-loop operations and incident management

Inputs:

  • Escalated incidents from Security/Energy/Automation agents
  • User approval requests
  • SLA violation alerts

Outputs:

  • Approval/rejection decisions
  • Incident reports
  • Maintenance work orders
  • SLA tracking metrics

DER/Grid Agent

Responsibility: Distributed energy resource and grid integration

Inputs:

  • Battery state-of-charge
  • Solar PV generation
  • Grid carbon intensity signals
  • Demand response events (OpenADR)
  • EVSE charging requests (OCPP)

Outputs:

  • Battery charge/discharge commands
  • EV charging schedule optimization
  • Demand limit enforcement
  • DR participation confirmations

Compliance Agent

Responsibility: Continuous compliance monitoring and attestation

Inputs:

  • Control actions from all agents
  • Policy evaluation results
  • Audit log events

Outputs:

  • Compliance violation alerts
  • Attestation reports (SOC 2, IEC 62443)
  • Control effectiveness metrics

Base Agent Framework

All agents inherit from BaseAgent:

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any

@dataclass
class AgentConfig:
"""Configuration for a CitadelMesh agent."""
agent_id: str
agent_type: str
spiffe_id: str
nats_url: str = "nats://localhost:4222"
subscribe_topics: List[str] = field(default_factory=list)
publish_topics: List[str] = field(default_factory=list)
enable_telemetry: bool = True
enable_safety_checks: bool = True
log_level: str = "INFO"

class BaseAgent(ABC):
"""Base class for all CitadelMesh LangGraph agents."""

def __init__(self, config: AgentConfig):
self.config = config
self.state = AgentState()
self.logger = logging.getLogger(f"citadel.agent.{config.agent_type}")

# Event bus integration
self.event_bus: Optional[EventBus] = None

# Telemetry collector
self.telemetry: Optional[TelemetryCollector] = None
if config.enable_telemetry:
self.telemetry = TelemetryCollector(
service_name=f"agent.{config.agent_type}",
agent_id=config.agent_id
)

# LangGraph state machine
self.graph: Optional[StateGraph] = None

async def start(self):
"""Start agent and connect to infrastructure."""
# Connect to NATS event bus
self.event_bus = EventBus(
nats_url=self.config.nats_url,
agent_id=self.config.agent_id
)
await self.event_bus.connect()

# Subscribe to topics
for topic in self.config.subscribe_topics:
await self.event_bus.subscribe(topic, self._handle_event)

# Build LangGraph
self.graph = self.build_graph()

# Update status
self.state.status = AgentStatus.ACTIVE
await self._publish_state_update()

@abstractmethod
def build_graph(self) -> StateGraph:
"""Build the LangGraph state machine for this agent."""
pass

@abstractmethod
async def process_event(self, event: CloudEventMessage) -> Optional[Dict[str, Any]]:
"""Process incoming CloudEvent."""
pass

async def _handle_event(self, event: CloudEventMessage):
"""Internal event handler with telemetry."""
with self.telemetry.trace_span("handle_event") as span:
span.set_attribute("event.type", event.type)
span.set_attribute("event.source", event.source)

try:
result = await self.process_event(event)
return result
except Exception as e:
self.logger.error(f"Event processing failed: {e}")
span.record_exception(e)
raise

Inter-Agent Communication

Agents communicate via CloudEvents on the event bus:

Coordination Pattern: Escalation

# Security Agent escalates to Ops Agent
incident_event = CloudEvent(
type="citadel.security.incident",
source="spiffe://citadel.mesh/agent/security",
subject="incident.forced_entry.001",
data={
"incident_type": "forced_entry",
"threat_level": "critical",
"location": "door.lobby.main",
"actions_taken": ["lock_doors", "alert_security"],
"requires_human": True
}
)

await event_bus.publish("incidents.events.building_a", incident_event)

Coordination Pattern: State Query

# Energy Agent queries Twin Agent for zone state
async def get_zone_state(zone_id: str) -> dict:
"""Query digital twin for zone state via gRPC."""
twin_client = TwinServiceStub(grpc_channel)

request = GetEntityRequest(entity_id=zone_id)
entity = await twin_client.GetEntity(request)

return {
"temp": entity.attributes["current_temp"],
"setpoint": entity.attributes["target_setpoint"],
"occupancy": entity.attributes["occupied"]
}

Coordination Pattern: Command Chain

# Automation Agent triggers scene -> Energy Agent adjusts HVAC
scene_event = CloudEvent(
type="citadel.automation.scene.activated",
source="spiffe://citadel.mesh/agent/automation",
subject="scene.meeting_mode",
data={
"scene_id": "meeting_mode",
"zones": ["conference_room_a"],
"lighting": {"preset": "presentation"},
"hvac": {"temp_override": 70}
}
)

# Energy Agent subscribes to scene events
async def handle_scene_event(event: CloudEventMessage):
if event.type == "citadel.automation.scene.activated":
hvac_override = event.data.get("hvac", {}).get("temp_override")
if hvac_override:
await adjust_setpoint(event.subject, hvac_override)

Deterministic Execution and Replay

LangGraph state machines are deterministic and replayable:

# Record execution trace
execution_id = ulid()
execution_trace = {
"execution_id": execution_id,
"agent_id": self.config.agent_id,
"graph_version": "v1.2.0",
"initial_state": initial_state,
"events": [],
"nodes_executed": [],
"final_state": None
}

# Execute graph with tracing
result = await self.graph.ainvoke(
initial_state,
config={"callbacks": [ExecutionTracer(execution_trace)]}
)

# Store trace for replay
await store_execution_trace(execution_trace)

# Replay for debugging
replayed_result = await self.graph.ainvoke(
execution_trace["initial_state"],
config={"deterministic": True}
)

assert replayed_result == execution_trace["final_state"]

Safety Integration

Every agent action validated by OPA:

async def execute_command(self, command: Command) -> CommandResult:
"""Execute command with OPA safety check."""

# Evaluate policy
decision = await opa_client.evaluate(
policy=f"citadel.{self.config.agent_type}",
input={
"action": command.action,
"target": command.target_id,
"params": command.params,
"issued_by": self.config.spiffe_id
}
)

if not decision["allow"]:
raise PolicyViolation(decision["deny_reason"])

# Execute with safety token
command.safety_token = decision["token"]
result = await adapter.execute(command)

return result

Agent Deployment

Agents deploy as Kubernetes pods:

apiVersion: apps/v1
kind: Deployment
metadata:
name: security-agent
namespace: citadel-agents
spec:
replicas: 1
selector:
matchLabels:
app: security-agent
template:
metadata:
labels:
app: security-agent
spec:
serviceAccountName: security-agent
containers:
- name: agent
image: citadel/security-agent:v1.0.0
env:
- name: AGENT_ID
value: "security-agent-building-a"
- name: SPIFFE_ENDPOINT_SOCKET
value: "unix:///run/spire/sockets/agent.sock"
- name: NATS_URL
value: "nats://nats.citadel.svc:4222"
volumeMounts:
- name: spire-agent-socket
mountPath: /run/spire/sockets
readOnly: true
volumes:
- name: spire-agent-socket
hostPath:
path: /run/spire/sockets

See Also