Skip to main content

Agent Topology and Architecture

CitadelMesh uses a multi-agent architecture where specialized AI agents collaborate to manage building operations. Each agent is implemented as a LangGraph state machine backed by Orleans services, giving us deterministic execution, replayable intent, and clean domain ownership.

Why this matters: Traditional BMS stacks centralize logic inside one supervisory controller. Any change risks destabilizing the whole building. CitadelMesh lets us ship and verify new agent behaviours per domain while preserving shared guardrails and auditability.

Agent Design Philosophy

Specialists, Not Generalists: Rather than one monolithic "building AI", we deploy focused agents with clear responsibilities:

  • Single Responsibility: Each agent manages one domain (security, energy, compliance, orchestration).
  • Event-Driven: Agents react to CloudEvents from sensors, systems, and other agents so coordination stays asynchronous.
  • Stateful: LangGraph state machines maintain context across interactions and snapshot execution for replay.
  • Observable: OpenTelemetry traces plus CloudEvents deliver end-to-end storyboards of what the agent attempted and why.
  • Safe by Default: Every action presents a safety token issued by OPA policies before orchestration proceeds.
  • Trust-Bound: Agents authenticate with SPIFFE/SPIRE identities and only see subjects permitted by Identity Foundation.

Agent Mesh Overview

Agent Descriptions

Security Agent

Differentiator: closes the loop between access control, vision analytics, and policy guardrails without bypassing approvals. Security operators receive explainable automation instead of opaque alarms.

Responsibility: Physical security threat detection and response

Inputs:

  • Access control events (Schneider Security Expert)
  • Camera analytics (Avigilon person detection, loitering, intrusion)
  • Door sensor state changes
  • After-hours motion detection

Outputs:

  • Security incidents (CloudEvents)
  • Door lock/unlock commands (with OPA approval)
  • Security officer alerts
  • Camera tracking requests

State Machine:

Example Implementation:

from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Optional

class SecurityAgentState(TypedDict):
"""State for Security Agent."""
events: List[SecurityEvent]
threat_level: str # low, medium, high, critical
incident_type: Optional[str]
response_plan: List[str]
context: dict
messages: List[Any]

class SecurityAgent(BaseAgent):
"""LangGraph-based security agent."""

def build_graph(self) -> StateGraph:
"""Build the security agent state machine."""
workflow = StateGraph(SecurityAgentState)

# Add nodes
workflow.add_node("assess_threat", self._assess_threat)
workflow.add_node("classify_incident", self._classify_incident)
workflow.add_node("plan_response", self._plan_response)
workflow.add_node("execute_actions", self._execute_actions)
workflow.add_node("escalate_human", self._escalate_human)

# Define flow
workflow.set_entry_point("assess_threat")
workflow.add_conditional_edges(
"assess_threat",
self._should_classify,
{
"classify": "classify_incident",
"monitor": END
}
)
workflow.add_edge("classify_incident", "plan_response")
workflow.add_conditional_edges(
"plan_response",
self._needs_escalation,
{
"escalate": "escalate_human",
"execute": "execute_actions"
}
)
workflow.add_edge("execute_actions", END)
workflow.add_edge("escalate_human", END)

return workflow.compile()

async def _assess_threat(self, state: SecurityAgentState) -> SecurityAgentState:
"""Assess threat level from incoming events."""
events = state["events"]

# Analyze events (could use LLM for complex reasoning)
threat_indicators = {
"after_hours": any(e.event_type == "after_hours_access" for e in events),
"forced_entry": any(e.event_type == "forced_entry" for e in events),
"loitering": any(e.event_type == "loitering" for e in events),
}

# Calculate threat level
if threat_indicators["forced_entry"]:
state["threat_level"] = "critical"
elif threat_indicators["after_hours"]:
state["threat_level"] = "high"
elif threat_indicators["loitering"]:
state["threat_level"] = "medium"
else:
state["threat_level"] = "low"

return state

async def _execute_actions(self, state: SecurityAgentState) -> SecurityAgentState:
"""Execute planned response actions."""
for action in state["response_plan"]:
if action == "lock_doors":
# Execute door lock via MCP adapter
await self.mcp_client.call_tool(
"securityexpert_door_lock",
{"door_id": "door.lobby.main"}
)

elif action == "alert_security":
# Publish security alert event
await self.event_bus.publish(CloudEvent(
type="citadel.security.alert",
source=self.config.spiffe_id,
data={"incident": state["incident_type"], "level": state["threat_level"]}
))

return state

Energy Agent

Differentiator: optimizes demand-response and comfort using tariff intelligence from the knowledge graph and policy-enforced setpoint limits.

Responsibility: Building energy optimization and HVAC control

Inputs:

  • Zone temperature telemetry (EcoStruxure EBO)
  • Occupancy sensors
  • Weather forecasts
  • Energy tariffs and grid carbon intensity
  • Demand response (DR) events

Outputs:

  • HVAC setpoint adjustments (with OPA temp bounds)
  • Lighting scene changes
  • Pre-cooling/pre-heating schedules
  • Energy KPI metrics

State Machine:

Example Implementation:

class EnergyAgentState(TypedDict):
"""State for Energy Agent."""
zone_id: str
current_temp: Optional[float]
target_temp: Optional[float]
occupancy: Optional[bool]
action: Optional[str]
setpoint_adjusted: bool

class EnergyAgent(BaseAgent):
"""Energy optimization agent."""

def build_graph(self) -> StateGraph:
"""Build energy agent state machine."""
workflow = StateGraph(EnergyAgentState)

workflow.add_node("read_zone_state", self._read_zone_state)
workflow.add_node("analyze_conditions", self._analyze_conditions)
workflow.add_node("calculate_optimal_setpoint", self._calculate_optimal_setpoint)
workflow.add_node("adjust_setpoint", self._adjust_setpoint)

workflow.set_entry_point("read_zone_state")
workflow.add_edge("read_zone_state", "analyze_conditions")
workflow.add_edge("analyze_conditions", "calculate_optimal_setpoint")
workflow.add_conditional_edges(
"calculate_optimal_setpoint",
self._should_adjust,
{
"adjust": "adjust_setpoint",
"skip": END
}
)
workflow.add_edge("adjust_setpoint", END)

return workflow.compile()

async def _calculate_optimal_setpoint(self, state: EnergyAgentState) -> EnergyAgentState:
"""Calculate optimal setpoint balancing comfort and energy."""
current = state["current_temp"]
occupancy = state["occupancy"]

# Comfort-first if occupied
if occupancy:
target = 72.0
else:
# Energy-saving setback when unoccupied
target = 76.0

# Adjust for weather (future: use forecast)
# Adjust for tariffs (future: shift load to off-peak)

state["target_temp"] = target
state["action"] = "adjust" if abs(current - target) > 2.0 else "skip"

return state

Automation Agent

Responsibility: Orchestrate scenes, schedules, and user-triggered actions

Inputs:

  • User scene requests (mobile app, voice commands)
  • Schedule triggers (morning routine, evening mode)
  • Occupancy pattern detection
  • Meeting room bookings

Outputs:

  • Lighting scene activations (Home Assistant)
  • HVAC mode changes
  • Window blind adjustments
  • AV system control

Twin Agent

Responsibility: Maintain synchronized digital twin of building state

Inputs:

  • Telemetry from all adapters
  • State change events
  • Reconciliation requests

Outputs:

  • Digital twin mutations (graph database)
  • Derived KPIs (energy efficiency, comfort score)
  • State snapshots for replay

Ops Agent

Responsibility: Human-in-the-loop operations and incident management

Inputs:

  • Escalated incidents from Security/Energy/Automation agents
  • User approval requests
  • SLA violation alerts

Outputs:

  • Approval/rejection decisions
  • Incident reports
  • Maintenance work orders
  • SLA tracking metrics

DER/Grid Agent

Responsibility: Distributed energy resource and grid integration

Inputs:

  • Battery state-of-charge
  • Solar PV generation
  • Grid carbon intensity signals
  • Demand response events (OpenADR)
  • EVSE charging requests (OCPP)

Outputs:

  • Battery charge/discharge commands
  • EV charging schedule optimization
  • Demand limit enforcement
  • DR participation confirmations

Compliance Agent

Responsibility: Continuous compliance monitoring and attestation

Inputs:

  • Control actions from all agents
  • Policy evaluation results
  • Audit log events

Outputs:

  • Compliance violation alerts
  • Attestation reports (SOC 2, IEC 62443)
  • Control effectiveness metrics

Base Agent Framework

All agents inherit from BaseAgent:

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any

@dataclass
class AgentConfig:
"""Configuration for a CitadelMesh agent."""
agent_id: str
agent_type: str
spiffe_id: str
nats_url: str = "nats://localhost:4222"
subscribe_topics: List[str] = field(default_factory=list)
publish_topics: List[str] = field(default_factory=list)
enable_telemetry: bool = True
enable_safety_checks: bool = True
log_level: str = "INFO"

class BaseAgent(ABC):
"""Base class for all CitadelMesh LangGraph agents."""

def __init__(self, config: AgentConfig):
self.config = config
self.state = AgentState()
self.logger = logging.getLogger(f"citadel.agent.{config.agent_type}")

# Event bus integration
self.event_bus: Optional[EventBus] = None

# Telemetry collector
self.telemetry: Optional[TelemetryCollector] = None
if config.enable_telemetry:
self.telemetry = TelemetryCollector(
service_name=f"agent.{config.agent_type}",
agent_id=config.agent_id
)

# LangGraph state machine
self.graph: Optional[StateGraph] = None

async def start(self):
"""Start agent and connect to infrastructure."""
# Connect to NATS event bus
self.event_bus = EventBus(
nats_url=self.config.nats_url,
agent_id=self.config.agent_id
)
await self.event_bus.connect()

# Subscribe to topics
for topic in self.config.subscribe_topics:
await self.event_bus.subscribe(topic, self._handle_event)

# Build LangGraph
self.graph = self.build_graph()

# Update status
self.state.status = AgentStatus.ACTIVE
await self._publish_state_update()

@abstractmethod
def build_graph(self) -> StateGraph:
"""Build the LangGraph state machine for this agent."""
pass

@abstractmethod
async def process_event(self, event: CloudEventMessage) -> Optional[Dict[str, Any]]:
"""Process incoming CloudEvent."""
pass

async def _handle_event(self, event: CloudEventMessage):
"""Internal event handler with telemetry."""
with self.telemetry.trace_span("handle_event") as span:
span.set_attribute("event.type", event.type)
span.set_attribute("event.source", event.source)

try:
result = await self.process_event(event)
return result
except Exception as e:
self.logger.error(f"Event processing failed: {e}")
span.record_exception(e)
raise

Inter-Agent Communication

Agents communicate via CloudEvents on the event bus:

Coordination Pattern: Escalation

# Security Agent escalates to Ops Agent
incident_event = CloudEvent(
type="citadel.security.incident",
source="spiffe://citadel.mesh/agent/security",
subject="incident.forced_entry.001",
data={
"incident_type": "forced_entry",
"threat_level": "critical",
"location": "door.lobby.main",
"actions_taken": ["lock_doors", "alert_security"],
"requires_human": True
}
)

await event_bus.publish("incidents.events.building_a", incident_event)

Coordination Pattern: State Query

# Energy Agent queries Twin Agent for zone state
async def get_zone_state(zone_id: str) -> dict:
"""Query digital twin for zone state via gRPC."""
twin_client = TwinServiceStub(grpc_channel)

request = GetEntityRequest(entity_id=zone_id)
entity = await twin_client.GetEntity(request)

return {
"temp": entity.attributes["current_temp"],
"setpoint": entity.attributes["target_setpoint"],
"occupancy": entity.attributes["occupied"]
}

Coordination Pattern: Command Chain

# Automation Agent triggers scene -> Energy Agent adjusts HVAC
scene_event = CloudEvent(
type="citadel.automation.scene.activated",
source="spiffe://citadel.mesh/agent/automation",
subject="scene.meeting_mode",
data={
"scene_id": "meeting_mode",
"zones": ["conference_room_a"],
"lighting": {"preset": "presentation"},
"hvac": {"temp_override": 70}
}
)

# Energy Agent subscribes to scene events
async def handle_scene_event(event: CloudEventMessage):
if event.type == "citadel.automation.scene.activated":
hvac_override = event.data.get("hvac", {}).get("temp_override")
if hvac_override:
await adjust_setpoint(event.subject, hvac_override)

Deterministic Execution and Replay

LangGraph state machines are deterministic and replayable:

# Record execution trace
execution_id = ulid()
execution_trace = {
"execution_id": execution_id,
"agent_id": self.config.agent_id,
"graph_version": "v1.2.0",
"initial_state": initial_state,
"events": [],
"nodes_executed": [],
"final_state": None
}

# Execute graph with tracing
result = await self.graph.ainvoke(
initial_state,
config={"callbacks": [ExecutionTracer(execution_trace)]}
)

# Store trace for replay
await store_execution_trace(execution_trace)

# Replay for debugging
replayed_result = await self.graph.ainvoke(
execution_trace["initial_state"],
config={"deterministic": True}
)

assert replayed_result == execution_trace["final_state"]

Safety Integration

Every agent action validated by OPA:

async def execute_command(self, command: Command) -> CommandResult:
"""Execute command with OPA safety check."""

# Evaluate policy
decision = await opa_client.evaluate(
policy=f"citadel.{self.config.agent_type}",
input={
"action": command.action,
"target": command.target_id,
"params": command.params,
"issued_by": self.config.spiffe_id
}
)

if not decision["allow"]:
raise PolicyViolation(decision["deny_reason"])

# Execute with safety token
command.safety_token = decision["token"]
result = await adapter.execute(command)

return result

Agent Deployment

Agents deploy as Kubernetes pods:

apiVersion: apps/v1
kind: Deployment
metadata:
name: security-agent
namespace: citadel-agents
spec:
replicas: 1
selector:
matchLabels:
app: security-agent
template:
metadata:
labels:
app: security-agent
spec:
serviceAccountName: security-agent
containers:
- name: agent
image: citadel/security-agent:v1.0.0
env:
- name: AGENT_ID
value: "security-agent-building-a"
- name: SPIFFE_ENDPOINT_SOCKET
value: "unix:///run/spire/sockets/agent.sock"
- name: NATS_URL
value: "nats://nats.citadel.svc:4222"
volumeMounts:
- name: spire-agent-socket
mountPath: /run/spire/sockets
readOnly: true
volumes:
- name: spire-agent-socket
hostPath:
path: /run/spire/sockets

See Also