Skip to main content

MCP Adapter Architecture

Model Context Protocol (MCP) servers expose CitadelMesh building resources as tool servers that any AI agent, framework, or IDE can consume. This document explains the MCP adapter architecture, implementation patterns, and integration with the broader CitadelMesh system.

MCP in CitadelMesh

Purpose: Provide a universal, framework-agnostic interface to building systems

Benefits:

  1. Agent Framework Portability: LangGraph, AutoGen, Semantic Kernel can all use the same tools
  2. IDE Integration: VS Code, Cursor, Claude can directly control buildings
  3. Composability: Chain MCP tools into complex workflows
  4. Observability: Every tool call generates CloudEvents for audit

MCP Architecture

graph TB
subgraph "MCP Clients"
LangGraph[LangGraph Agent]
AutoGen[AutoGen Agent]
SK[Semantic Kernel]
IDE[VS Code/Cursor]
end

subgraph "MCP Servers (CitadelMesh)"
BACnet[BACnet MCP Server]
OPCUA[OPC UA MCP Server]
SecurityExpert[Security Expert MCP Server]
Avigilon[Avigilon MCP Server]
Twin[Digital Twin MCP Server]
Policy[Policy MCP Server]
end

subgraph "Backend"
EventBus[CloudEvents Bus]
OPA[OPA Engine]
Adapters[Protocol Adapters]
end

LangGraph --> BACnet
AutoGen --> SecurityExpert
SK --> Twin
IDE --> Policy

BACnet --> EventBus
SecurityExpert --> EventBus
Avigilon --> EventBus
Twin --> EventBus
Policy --> OPA

EventBus --> Adapters

MCP Server Implementation

Example: Security Expert MCP Server

import { MCPServer, Tool } from '@modelcontextprotocol/sdk';

class SecurityExpertMCPServer extends MCPServer {
constructor(private adapter: SecurityExpertAdapter) {
super({
name: 'citadel-security-expert',
version: '1.0.0',
description: 'Schneider Security Expert building access control'
});
}

async getTools(): Promise<Tool[]> {
return [
{
name: 'securityexpert_door_lock',
description: 'Lock a door immediately',
inputSchema: {
type: 'object',
properties: {
door_id: {
type: 'string',
description: 'Door identifier (e.g., door.lobby.main)'
}
},
required: ['door_id']
}
},
{
name: 'securityexpert_door_unlock',
description: 'Unlock a door temporarily (requires OPA approval)',
inputSchema: {
type: 'object',
properties: {
door_id: { type: 'string' },
duration_seconds: {
type: 'integer',
minimum: 60,
maximum: 900,
default: 300
},
priority: {
type: 'string',
enum: ['PRIORITY_LOW', 'PRIORITY_NORMAL', 'PRIORITY_HIGH'],
default: 'PRIORITY_NORMAL'
}
},
required: ['door_id']
}
},
{
name: 'securityexpert_subscribe_events',
description: 'Subscribe to access control events',
inputSchema: {
type: 'object',
properties: {
event_types: {
type: 'array',
items: {
enum: ['access_granted', 'access_denied', 'door_forced', 'door_held']
}
}
}
}
}
];
}

async callTool(name: string, args: any): Promise<any> {
// Every tool call generates audit event
const callId = ulid();
await this.emitAuditEvent(callId, name, args);

switch (name) {
case 'securityexpert_door_lock':
return await this.lockDoor(args.door_id, callId);

case 'securityexpert_door_unlock':
return await this.unlockDoor(
args.door_id,
args.duration_seconds || 300,
args.priority || 'PRIORITY_NORMAL',
callId
);

case 'securityexpert_subscribe_events':
return await this.subscribeEvents(args.event_types, callId);

default:
throw new Error(`Unknown tool: ${name}`);
}
}

private async lockDoor(doorId: string, callId: string): Promise<any> {
// Create command
const command = {
id: callId,
target_id: doorId,
action: 'lock_door',
params: {},
issued_by: this.spiffeId
};

// Validate with OPA
const decision = await this.opaClient.evaluate('citadel.security.door', {
action: 'lock_door',
target: doorId,
issued_by: this.spiffeId
});

if (!decision.allow) {
throw new Error(`Policy denied: ${decision.deny_reason}`);
}

// Execute via adapter
const result = await this.adapter.lockDoor(doorId);

// Publish CloudEvent
await this.publishCloudEvent({
type: 'citadel.control.command',
source: this.spiffeId,
subject: doorId,
data: { command, result }
});

return {
success: true,
door_id: doorId,
status: 'locked',
timestamp: new Date().toISOString()
};
}
}

Python MCP Server Example

from mcp import MCPServer, Tool
from typing import List, Dict, Any

class EBOMCPServer(MCPServer):
"""MCP server for EcoStruxure Building Operation."""

def __init__(self, adapter: EBOAdapter):
super().__init__(
name="citadel-ebo",
version="1.0.0",
description="EcoStruxure HVAC and BMS control"
)
self.adapter = adapter

async def list_tools(self) -> List[Tool]:
return [
Tool(
name="ebo_read_point",
description="Read a BACnet/OPC UA point value",
input_schema={
"type": "object",
"properties": {
"point_id": {
"type": "string",
"description": "Point identifier (e.g., hvac.zone1.temp)"
}
},
"required": ["point_id"]
}
),
Tool(
name="ebo_write_setpoint",
description="Write HVAC setpoint (OPA validated)",
input_schema={
"type": "object",
"properties": {
"entity_id": {"type": "string"},
"value": {"type": "number"},
"unit": {"type": "string", "default": "F"},
"priority": {"type": "integer", "minimum": 1, "maximum": 16, "default": 8}
},
"required": ["entity_id", "value"]
}
)
]

async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
call_id = ulid()

if name == "ebo_read_point":
point = await self.adapter.read_point(arguments["point_id"])
return {
"entity_id": point.entity_id,
"metric": point.metric,
"value": point.value,
"unit": point.unit,
"timestamp": point.timestamp.isoformat(),
"quality": point.quality
}

elif name == "ebo_write_setpoint":
# OPA validation
decision = await opa_client.evaluate(
"citadel.hvac.setpoint",
{
"action": "write_setpoint",
"entity_id": arguments["entity_id"],
"value": arguments["value"]
}
)

if not decision["allow"]:
raise PolicyViolationError(decision["deny_reason"])

# Execute command
command = Command(
id=call_id,
target_id=arguments["entity_id"],
action="write_setpoint",
params={
"value": str(arguments["value"]),
"unit": arguments.get("unit", "F"),
"priority": str(arguments.get("priority", 8))
},
safety_token=decision["token"],
issued_by=self.spiffe_id
)

result = await self.adapter.execute(command)

# Publish CloudEvent
await self.event_bus.publish(CloudEvent(
type="citadel.control.command",
source=self.spiffe_id,
subject=arguments["entity_id"],
data=command
))

return {
"success": result.success,
"message": result.message,
"timestamp": result.executed_at.isoformat()
}

MCP Tool Catalog

Building Control Tools

ToolMCP ServerDescriptionSafety Level
bacnet_read_pointbacnetRead BACnet object valueRead-only
bacnet_write_pointbacnetWrite BACnet point (OPA validated)High
opcua_browseopcuaBrowse OPC UA node hierarchyRead-only
opcua_readopcuaRead OPC UA node valueRead-only
opcua_writeopcuaWrite OPC UA node (OPA validated)High

Security Tools

ToolMCP ServerDescriptionSafety Level
securityexpert_door_locksecurity-expertLock doorMedium
securityexpert_door_unlocksecurity-expertUnlock door temporarilyHigh
avigilon_snapshotavigilonCapture camera snapshotLow
avigilon_track_personavigilonTrack person across camerasMedium

Energy Tools

ToolMCP ServerDescriptionSafety Level
ebo_read_pointeboRead HVAC pointRead-only
ebo_write_setpointeboWrite HVAC setpointHigh
pme_query_powerpmeQuery real-time powerRead-only

Digital Twin Tools

ToolMCP ServerDescriptionSafety Level
twin_querytwinQuery digital twin entitiesRead-only
twin_get_entitytwinGet entity stateRead-only
twin_mutatetwinUpdate entity (validated)Medium

Policy Tools

ToolMCP ServerDescriptionSafety Level
policy_evaluatepolicyEvaluate action against OPA policyRead-only
policy_explainpolicyExplain policy decisionRead-only

Agent Integration

LangGraph Agent with MCP Tools

from langchain_core.tools import StructuredTool
from mcp.client import MCPClient

class EnergyAgentWithMCP:
def __init__(self):
# Connect to MCP servers
self.mcp_ebo = MCPClient("http://mcp-ebo.citadel.svc:8080")
self.mcp_twin = MCPClient("http://mcp-twin.citadel.svc:8080")

# Wrap MCP tools as LangChain tools
self.tools = self._create_tools()

def _create_tools(self) -> List[StructuredTool]:
return [
StructuredTool(
name="read_zone_temp",
description="Read current zone temperature",
func=lambda zone_id: self.mcp_ebo.call_tool(
"ebo_read_point",
{"point_id": f"hvac.{zone_id}.temp"}
)
),
StructuredTool(
name="write_setpoint",
description="Write HVAC setpoint (validated by OPA)",
func=lambda zone_id, value: self.mcp_ebo.call_tool(
"ebo_write_setpoint",
{"entity_id": f"hvac.{zone_id}.setpoint", "value": value}
)
),
StructuredTool(
name="query_twin",
description="Query digital twin for zone state",
func=lambda selector: self.mcp_twin.call_tool(
"twin_query",
{"selector": selector}
)
)
]

def build_graph(self) -> StateGraph:
workflow = StateGraph(EnergyAgentState)
# ... use self.tools in graph nodes
return workflow.compile()

Correlation with CloudEvents

Every MCP tool call generates audit CloudEvents:

async def emit_audit_event(self, call_id: str, tool_name: str, arguments: dict):
"""Emit CloudEvent for MCP tool call audit."""

event = CloudEvent(
type="citadel.mcp.tool_call",
source=self.spiffe_id,
subject=f"tool.{tool_name}",
id=call_id,
data={
"tool": tool_name,
"arguments": arguments,
"caller": self.caller_spiffe_id,
"timestamp": datetime.utcnow().isoformat()
}
)

await event_bus.publish("audit.mcp.tool_calls", event)

Security

Authentication

MCP servers authenticate callers via SPIFFE:

from spiffe import X509Source

class SecureMCPServer(MCPServer):
def __init__(self):
self.x509_source = X509Source()

async def authenticate_caller(self, request):
"""Verify caller's SPIFFE identity."""
client_cert = request.client_cert
svid = self.x509_source.parse_x509_svid(client_cert)

# Verify caller is authorized agent
if not svid.spiffe_id.path.startswith("/agent/"):
raise Unauthorized("Only agents can call MCP tools")

return svid.spiffe_id

Authorization

OPA policies control tool access:

package citadel.mcp.authz

default allow := false

# Energy agents can use EBO tools
allow if {
input.spiffe_id == "spiffe://citadel.mesh/agent/energy"
startswith(input.tool, "ebo_")
}

# Security agents can use security tools
allow if {
input.spiffe_id == "spiffe://citadel.mesh/agent/security"
input.tool in ["securityexpert_door_lock", "avigilon_snapshot"]
}

# All agents can read twin
allow if {
startswith(input.spiffe_id, "spiffe://citadel.mesh/agent/")
input.tool == "twin_query"
}

See Also