Agent Topology and Architecture
CitadelMesh uses a multi-agent architecture where specialized AI agents collaborate to manage building operations. Each agent is implemented as a LangGraph state machine backed by Orleans services, giving us deterministic execution, replayable intent, and clean domain ownership.
Why this matters: Traditional BMS stacks centralize logic inside one supervisory controller. Any change risks destabilizing the whole building. CitadelMesh lets us ship and verify new agent behaviours per domain while preserving shared guardrails and auditability.
Agent Design Philosophy
Specialists, Not Generalists: Rather than one monolithic "building AI", we deploy focused agents with clear responsibilities:
- Single Responsibility: Each agent manages one domain (security, energy, compliance, orchestration).
- Event-Driven: Agents react to CloudEvents from sensors, systems, and other agents so coordination stays asynchronous.
- Stateful: LangGraph state machines maintain context across interactions and snapshot execution for replay.
- Observable: OpenTelemetry traces plus CloudEvents deliver end-to-end storyboards of what the agent attempted and why.
- Safe by Default: Every action presents a safety token issued by OPA policies before orchestration proceeds.
- Trust-Bound: Agents authenticate with SPIFFE/SPIRE identities and only see subjects permitted by Identity Foundation.
Agent Mesh Overview
Agent Descriptions
Security Agent
Differentiator: closes the loop between access control, vision analytics, and policy guardrails without bypassing approvals. Security operators receive explainable automation instead of opaque alarms.
Responsibility: Physical security threat detection and response
Inputs:
- Access control events (Schneider Security Expert)
- Camera analytics (Avigilon person detection, loitering, intrusion)
- Door sensor state changes
- After-hours motion detection
Outputs:
- Security incidents (CloudEvents)
- Door lock/unlock commands (with OPA approval)
- Security officer alerts
- Camera tracking requests
State Machine:
Example Implementation:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Optional
class SecurityAgentState(TypedDict):
"""State for Security Agent."""
events: List[SecurityEvent]
threat_level: str # low, medium, high, critical
incident_type: Optional[str]
response_plan: List[str]
context: dict
messages: List[Any]
class SecurityAgent(BaseAgent):
"""LangGraph-based security agent."""
def build_graph(self) -> StateGraph:
"""Build the security agent state machine."""
workflow = StateGraph(SecurityAgentState)
# Add nodes
workflow.add_node("assess_threat", self._assess_threat)
workflow.add_node("classify_incident", self._classify_incident)
workflow.add_node("plan_response", self._plan_response)
workflow.add_node("execute_actions", self._execute_actions)
workflow.add_node("escalate_human", self._escalate_human)
# Define flow
workflow.set_entry_point("assess_threat")
workflow.add_conditional_edges(
"assess_threat",
self._should_classify,
{
"classify": "classify_incident",
"monitor": END
}
)
workflow.add_edge("classify_incident", "plan_response")
workflow.add_conditional_edges(
"plan_response",
self._needs_escalation,
{
"escalate": "escalate_human",
"execute": "execute_actions"
}
)
workflow.add_edge("execute_actions", END)
workflow.add_edge("escalate_human", END)
return workflow.compile()
async def _assess_threat(self, state: SecurityAgentState) -> SecurityAgentState:
"""Assess threat level from incoming events."""
events = state["events"]
# Analyze events (could use LLM for complex reasoning)
threat_indicators = {
"after_hours": any(e.event_type == "after_hours_access" for e in events),
"forced_entry": any(e.event_type == "forced_entry" for e in events),
"loitering": any(e.event_type == "loitering" for e in events),
}
# Calculate threat level
if threat_indicators["forced_entry"]:
state["threat_level"] = "critical"
elif threat_indicators["after_hours"]:
state["threat_level"] = "high"
elif threat_indicators["loitering"]:
state["threat_level"] = "medium"
else:
state["threat_level"] = "low"
return state
async def _execute_actions(self, state: SecurityAgentState) -> SecurityAgentState:
"""Execute planned response actions."""
for action in state["response_plan"]:
if action == "lock_doors":
# Execute door lock via MCP adapter
await self.mcp_client.call_tool(
"securityexpert_door_lock",
{"door_id": "door.lobby.main"}
)
elif action == "alert_security":
# Publish security alert event
await self.event_bus.publish(CloudEvent(
type="citadel.security.alert",
source=self.config.spiffe_id,
data={"incident": state["incident_type"], "level": state["threat_level"]}
))
return state
Energy Agent
Differentiator: optimizes demand-response and comfort using tariff intelligence from the knowledge graph and policy-enforced setpoint limits.
Responsibility: Building energy optimization and HVAC control
Inputs:
- Zone temperature telemetry (EcoStruxure EBO)
- Occupancy sensors
- Weather forecasts
- Energy tariffs and grid carbon intensity
- Demand response (DR) events
Outputs:
- HVAC setpoint adjustments (with OPA temp bounds)
- Lighting scene changes
- Pre-cooling/pre-heating schedules
- Energy KPI metrics
State Machine:
Example Implementation:
class EnergyAgentState(TypedDict):
"""State for Energy Agent."""
zone_id: str
current_temp: Optional[float]
target_temp: Optional[float]
occupancy: Optional[bool]
action: Optional[str]
setpoint_adjusted: bool
class EnergyAgent(BaseAgent):
"""Energy optimization agent."""
def build_graph(self) -> StateGraph:
"""Build energy agent state machine."""
workflow = StateGraph(EnergyAgentState)
workflow.add_node("read_zone_state", self._read_zone_state)
workflow.add_node("analyze_conditions", self._analyze_conditions)
workflow.add_node("calculate_optimal_setpoint", self._calculate_optimal_setpoint)
workflow.add_node("adjust_setpoint", self._adjust_setpoint)
workflow.set_entry_point("read_zone_state")
workflow.add_edge("read_zone_state", "analyze_conditions")
workflow.add_edge("analyze_conditions", "calculate_optimal_setpoint")
workflow.add_conditional_edges(
"calculate_optimal_setpoint",
self._should_adjust,
{
"adjust": "adjust_setpoint",
"skip": END
}
)
workflow.add_edge("adjust_setpoint", END)
return workflow.compile()
async def _calculate_optimal_setpoint(self, state: EnergyAgentState) -> EnergyAgentState:
"""Calculate optimal setpoint balancing comfort and energy."""
current = state["current_temp"]
occupancy = state["occupancy"]
# Comfort-first if occupied
if occupancy:
target = 72.0
else:
# Energy-saving setback when unoccupied
target = 76.0
# Adjust for weather (future: use forecast)
# Adjust for tariffs (future: shift load to off-peak)
state["target_temp"] = target
state["action"] = "adjust" if abs(current - target) > 2.0 else "skip"
return state
Automation Agent
Responsibility: Orchestrate scenes, schedules, and user-triggered actions
Inputs:
- User scene requests (mobile app, voice commands)
- Schedule triggers (morning routine, evening mode)
- Occupancy pattern detection
- Meeting room bookings
Outputs:
- Lighting scene activations (Home Assistant)
- HVAC mode changes
- Window blind adjustments
- AV system control
Twin Agent
Responsibility: Maintain synchronized digital twin of building state
Inputs:
- Telemetry from all adapters
- State change events
- Reconciliation requests
Outputs:
- Digital twin mutations (graph database)
- Derived KPIs (energy efficiency, comfort score)
- State snapshots for replay
Ops Agent
Responsibility: Human-in-the-loop operations and incident management
Inputs:
- Escalated incidents from Security/Energy/Automation agents
- User approval requests
- SLA violation alerts
Outputs:
- Approval/rejection decisions
- Incident reports
- Maintenance work orders
- SLA tracking metrics
DER/Grid Agent
Responsibility: Distributed energy resource and grid integration
Inputs:
- Battery state-of-charge
- Solar PV generation
- Grid carbon intensity signals
- Demand response events (OpenADR)
- EVSE charging requests (OCPP)
Outputs:
- Battery charge/discharge commands
- EV charging schedule optimization
- Demand limit enforcement
- DR participation confirmations
Compliance Agent
Responsibility: Continuous compliance monitoring and attestation
Inputs:
- Control actions from all agents
- Policy evaluation results
- Audit log events
Outputs:
- Compliance violation alerts
- Attestation reports (SOC 2, IEC 62443)
- Control effectiveness metrics
Base Agent Framework
All agents inherit from BaseAgent:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
@dataclass
class AgentConfig:
"""Configuration for a CitadelMesh agent."""
agent_id: str
agent_type: str
spiffe_id: str
nats_url: str = "nats://localhost:4222"
subscribe_topics: List[str] = field(default_factory=list)
publish_topics: List[str] = field(default_factory=list)
enable_telemetry: bool = True
enable_safety_checks: bool = True
log_level: str = "INFO"
class BaseAgent(ABC):
"""Base class for all CitadelMesh LangGraph agents."""
def __init__(self, config: AgentConfig):
self.config = config
self.state = AgentState()
self.logger = logging.getLogger(f"citadel.agent.{config.agent_type}")
# Event bus integration
self.event_bus: Optional[EventBus] = None
# Telemetry collector
self.telemetry: Optional[TelemetryCollector] = None
if config.enable_telemetry:
self.telemetry = TelemetryCollector(
service_name=f"agent.{config.agent_type}",
agent_id=config.agent_id
)
# LangGraph state machine
self.graph: Optional[StateGraph] = None
async def start(self):
"""Start agent and connect to infrastructure."""
# Connect to NATS event bus
self.event_bus = EventBus(
nats_url=self.config.nats_url,
agent_id=self.config.agent_id
)
await self.event_bus.connect()
# Subscribe to topics
for topic in self.config.subscribe_topics:
await self.event_bus.subscribe(topic, self._handle_event)
# Build LangGraph
self.graph = self.build_graph()
# Update status
self.state.status = AgentStatus.ACTIVE
await self._publish_state_update()
@abstractmethod
def build_graph(self) -> StateGraph:
"""Build the LangGraph state machine for this agent."""
pass
@abstractmethod
async def process_event(self, event: CloudEventMessage) -> Optional[Dict[str, Any]]:
"""Process incoming CloudEvent."""
pass
async def _handle_event(self, event: CloudEventMessage):
"""Internal event handler with telemetry."""
with self.telemetry.trace_span("handle_event") as span:
span.set_attribute("event.type", event.type)
span.set_attribute("event.source", event.source)
try:
result = await self.process_event(event)
return result
except Exception as e:
self.logger.error(f"Event processing failed: {e}")
span.record_exception(e)
raise
Inter-Agent Communication
Agents communicate via CloudEvents on the event bus:
Coordination Pattern: Escalation
# Security Agent escalates to Ops Agent
incident_event = CloudEvent(
type="citadel.security.incident",
source="spiffe://citadel.mesh/agent/security",
subject="incident.forced_entry.001",
data={
"incident_type": "forced_entry",
"threat_level": "critical",
"location": "door.lobby.main",
"actions_taken": ["lock_doors", "alert_security"],
"requires_human": True
}
)
await event_bus.publish("incidents.events.building_a", incident_event)
Coordination Pattern: State Query
# Energy Agent queries Twin Agent for zone state
async def get_zone_state(zone_id: str) -> dict:
"""Query digital twin for zone state via gRPC."""
twin_client = TwinServiceStub(grpc_channel)
request = GetEntityRequest(entity_id=zone_id)
entity = await twin_client.GetEntity(request)
return {
"temp": entity.attributes["current_temp"],
"setpoint": entity.attributes["target_setpoint"],
"occupancy": entity.attributes["occupied"]
}
Coordination Pattern: Command Chain
# Automation Agent triggers scene -> Energy Agent adjusts HVAC
scene_event = CloudEvent(
type="citadel.automation.scene.activated",
source="spiffe://citadel.mesh/agent/automation",
subject="scene.meeting_mode",
data={
"scene_id": "meeting_mode",
"zones": ["conference_room_a"],
"lighting": {"preset": "presentation"},
"hvac": {"temp_override": 70}
}
)
# Energy Agent subscribes to scene events
async def handle_scene_event(event: CloudEventMessage):
if event.type == "citadel.automation.scene.activated":
hvac_override = event.data.get("hvac", {}).get("temp_override")
if hvac_override:
await adjust_setpoint(event.subject, hvac_override)
Deterministic Execution and Replay
LangGraph state machines are deterministic and replayable:
# Record execution trace
execution_id = ulid()
execution_trace = {
"execution_id": execution_id,
"agent_id": self.config.agent_id,
"graph_version": "v1.2.0",
"initial_state": initial_state,
"events": [],
"nodes_executed": [],
"final_state": None
}
# Execute graph with tracing
result = await self.graph.ainvoke(
initial_state,
config={"callbacks": [ExecutionTracer(execution_trace)]}
)
# Store trace for replay
await store_execution_trace(execution_trace)
# Replay for debugging
replayed_result = await self.graph.ainvoke(
execution_trace["initial_state"],
config={"deterministic": True}
)
assert replayed_result == execution_trace["final_state"]
Safety Integration
Every agent action validated by OPA:
async def execute_command(self, command: Command) -> CommandResult:
"""Execute command with OPA safety check."""
# Evaluate policy
decision = await opa_client.evaluate(
policy=f"citadel.{self.config.agent_type}",
input={
"action": command.action,
"target": command.target_id,
"params": command.params,
"issued_by": self.config.spiffe_id
}
)
if not decision["allow"]:
raise PolicyViolation(decision["deny_reason"])
# Execute with safety token
command.safety_token = decision["token"]
result = await adapter.execute(command)
return result
Agent Deployment
Agents deploy as Kubernetes pods:
apiVersion: apps/v1
kind: Deployment
metadata:
name: security-agent
namespace: citadel-agents
spec:
replicas: 1
selector:
matchLabels:
app: security-agent
template:
metadata:
labels:
app: security-agent
spec:
serviceAccountName: security-agent
containers:
- name: agent
image: citadel/security-agent:v1.0.0
env:
- name: AGENT_ID
value: "security-agent-building-a"
- name: SPIFFE_ENDPOINT_SOCKET
value: "unix:///run/spire/sockets/agent.sock"
- name: NATS_URL
value: "nats://nats.citadel.svc:4222"
volumeMounts:
- name: spire-agent-socket
mountPath: /run/spire/sockets
readOnly: true
volumes:
- name: spire-agent-socket
hostPath:
path: /run/spire/sockets
Related Documentation
- Protocol Strategy - CloudEvents and event bus design
- Safety Guardrails - OPA policy integration
- Observability - Agent tracing and metrics
- Identity Foundation - SPIFFE identity for agents