MCP Adapter Architecture
Model Context Protocol (MCP) servers expose CitadelMesh building resources as tool servers that any AI agent, framework, or IDE can consume. This document explains the MCP adapter architecture, implementation patterns, and integration with the broader CitadelMesh system.
MCP in CitadelMesh
Purpose: Provide a universal, framework-agnostic interface to building systems
Benefits:
- Agent Framework Portability: LangGraph, AutoGen, Semantic Kernel can all use the same tools
- IDE Integration: VS Code, Cursor, Claude can directly control buildings
- Composability: Chain MCP tools into complex workflows
- Observability: Every tool call generates CloudEvents for audit
MCP Architecture
graph TB
subgraph "MCP Clients"
LangGraph[LangGraph Agent]
AutoGen[AutoGen Agent]
SK[Semantic Kernel]
IDE[VS Code/Cursor]
end
subgraph "MCP Servers (CitadelMesh)"
BACnet[BACnet MCP Server]
OPCUA[OPC UA MCP Server]
SecurityExpert[Security Expert MCP Server]
Avigilon[Avigilon MCP Server]
Twin[Digital Twin MCP Server]
Policy[Policy MCP Server]
end
subgraph "Backend"
EventBus[CloudEvents Bus]
OPA[OPA Engine]
Adapters[Protocol Adapters]
end
LangGraph --> BACnet
AutoGen --> SecurityExpert
SK --> Twin
IDE --> Policy
BACnet --> EventBus
SecurityExpert --> EventBus
Avigilon --> EventBus
Twin --> EventBus
Policy --> OPA
EventBus --> Adapters
MCP Server Implementation
Example: Security Expert MCP Server
import { MCPServer, Tool } from '@modelcontextprotocol/sdk';
class SecurityExpertMCPServer extends MCPServer {
constructor(private adapter: SecurityExpertAdapter) {
super({
name: 'citadel-security-expert',
version: '1.0.0',
description: 'Schneider Security Expert building access control'
});
}
async getTools(): Promise<Tool[]> {
return [
{
name: 'securityexpert_door_lock',
description: 'Lock a door immediately',
inputSchema: {
type: 'object',
properties: {
door_id: {
type: 'string',
description: 'Door identifier (e.g., door.lobby.main)'
}
},
required: ['door_id']
}
},
{
name: 'securityexpert_door_unlock',
description: 'Unlock a door temporarily (requires OPA approval)',
inputSchema: {
type: 'object',
properties: {
door_id: { type: 'string' },
duration_seconds: {
type: 'integer',
minimum: 60,
maximum: 900,
default: 300
},
priority: {
type: 'string',
enum: ['PRIORITY_LOW', 'PRIORITY_NORMAL', 'PRIORITY_HIGH'],
default: 'PRIORITY_NORMAL'
}
},
required: ['door_id']
}
},
{
name: 'securityexpert_subscribe_events',
description: 'Subscribe to access control events',
inputSchema: {
type: 'object',
properties: {
event_types: {
type: 'array',
items: {
enum: ['access_granted', 'access_denied', 'door_forced', 'door_held']
}
}
}
}
}
];
}
async callTool(name: string, args: any): Promise<any> {
// Every tool call generates audit event
const callId = ulid();
await this.emitAuditEvent(callId, name, args);
switch (name) {
case 'securityexpert_door_lock':
return await this.lockDoor(args.door_id, callId);
case 'securityexpert_door_unlock':
return await this.unlockDoor(
args.door_id,
args.duration_seconds || 300,
args.priority || 'PRIORITY_NORMAL',
callId
);
case 'securityexpert_subscribe_events':
return await this.subscribeEvents(args.event_types, callId);
default:
throw new Error(`Unknown tool: ${name}`);
}
}
private async lockDoor(doorId: string, callId: string): Promise<any> {
// Create command
const command = {
id: callId,
target_id: doorId,
action: 'lock_door',
params: {},
issued_by: this.spiffeId
};
// Validate with OPA
const decision = await this.opaClient.evaluate('citadel.security.door', {
action: 'lock_door',
target: doorId,
issued_by: this.spiffeId
});
if (!decision.allow) {
throw new Error(`Policy denied: ${decision.deny_reason}`);
}
// Execute via adapter
const result = await this.adapter.lockDoor(doorId);
// Publish CloudEvent
await this.publishCloudEvent({
type: 'citadel.control.command',
source: this.spiffeId,
subject: doorId,
data: { command, result }
});
return {
success: true,
door_id: doorId,
status: 'locked',
timestamp: new Date().toISOString()
};
}
}
Python MCP Server Example
from mcp import MCPServer, Tool
from typing import List, Dict, Any
class EBOMCPServer(MCPServer):
"""MCP server for EcoStruxure Building Operation."""
def __init__(self, adapter: EBOAdapter):
super().__init__(
name="citadel-ebo",
version="1.0.0",
description="EcoStruxure HVAC and BMS control"
)
self.adapter = adapter
async def list_tools(self) -> List[Tool]:
return [
Tool(
name="ebo_read_point",
description="Read a BACnet/OPC UA point value",
input_schema={
"type": "object",
"properties": {
"point_id": {
"type": "string",
"description": "Point identifier (e.g., hvac.zone1.temp)"
}
},
"required": ["point_id"]
}
),
Tool(
name="ebo_write_setpoint",
description="Write HVAC setpoint (OPA validated)",
input_schema={
"type": "object",
"properties": {
"entity_id": {"type": "string"},
"value": {"type": "number"},
"unit": {"type": "string", "default": "F"},
"priority": {"type": "integer", "minimum": 1, "maximum": 16, "default": 8}
},
"required": ["entity_id", "value"]
}
)
]
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
call_id = ulid()
if name == "ebo_read_point":
point = await self.adapter.read_point(arguments["point_id"])
return {
"entity_id": point.entity_id,
"metric": point.metric,
"value": point.value,
"unit": point.unit,
"timestamp": point.timestamp.isoformat(),
"quality": point.quality
}
elif name == "ebo_write_setpoint":
# OPA validation
decision = await opa_client.evaluate(
"citadel.hvac.setpoint",
{
"action": "write_setpoint",
"entity_id": arguments["entity_id"],
"value": arguments["value"]
}
)
if not decision["allow"]:
raise PolicyViolationError(decision["deny_reason"])
# Execute command
command = Command(
id=call_id,
target_id=arguments["entity_id"],
action="write_setpoint",
params={
"value": str(arguments["value"]),
"unit": arguments.get("unit", "F"),
"priority": str(arguments.get("priority", 8))
},
safety_token=decision["token"],
issued_by=self.spiffe_id
)
result = await self.adapter.execute(command)
# Publish CloudEvent
await self.event_bus.publish(CloudEvent(
type="citadel.control.command",
source=self.spiffe_id,
subject=arguments["entity_id"],
data=command
))
return {
"success": result.success,
"message": result.message,
"timestamp": result.executed_at.isoformat()
}
MCP Tool Catalog
Building Control Tools
| Tool | MCP Server | Description | Safety Level |
|---|---|---|---|
bacnet_read_point | bacnet | Read BACnet object value | Read-only |
bacnet_write_point | bacnet | Write BACnet point (OPA validated) | High |
opcua_browse | opcua | Browse OPC UA node hierarchy | Read-only |
opcua_read | opcua | Read OPC UA node value | Read-only |
opcua_write | opcua | Write OPC UA node (OPA validated) | High |
Security Tools
| Tool | MCP Server | Description | Safety Level |
|---|---|---|---|
securityexpert_door_lock | security-expert | Lock door | Medium |
securityexpert_door_unlock | security-expert | Unlock door temporarily | High |
avigilon_snapshot | avigilon | Capture camera snapshot | Low |
avigilon_track_person | avigilon | Track person across cameras | Medium |
Energy Tools
| Tool | MCP Server | Description | Safety Level |
|---|---|---|---|
ebo_read_point | ebo | Read HVAC point | Read-only |
ebo_write_setpoint | ebo | Write HVAC setpoint | High |
pme_query_power | pme | Query real-time power | Read-only |
Digital Twin Tools
| Tool | MCP Server | Description | Safety Level |
|---|---|---|---|
twin_query | twin | Query digital twin entities | Read-only |
twin_get_entity | twin | Get entity state | Read-only |
twin_mutate | twin | Update entity (validated) | Medium |
Policy Tools
| Tool | MCP Server | Description | Safety Level |
|---|---|---|---|
policy_evaluate | policy | Evaluate action against OPA policy | Read-only |
policy_explain | policy | Explain policy decision | Read-only |
Agent Integration
LangGraph Agent with MCP Tools
from langchain_core.tools import StructuredTool
from mcp.client import MCPClient
class EnergyAgentWithMCP:
def __init__(self):
# Connect to MCP servers
self.mcp_ebo = MCPClient("http://mcp-ebo.citadel.svc:8080")
self.mcp_twin = MCPClient("http://mcp-twin.citadel.svc:8080")
# Wrap MCP tools as LangChain tools
self.tools = self._create_tools()
def _create_tools(self) -> List[StructuredTool]:
return [
StructuredTool(
name="read_zone_temp",
description="Read current zone temperature",
func=lambda zone_id: self.mcp_ebo.call_tool(
"ebo_read_point",
{"point_id": f"hvac.{zone_id}.temp"}
)
),
StructuredTool(
name="write_setpoint",
description="Write HVAC setpoint (validated by OPA)",
func=lambda zone_id, value: self.mcp_ebo.call_tool(
"ebo_write_setpoint",
{"entity_id": f"hvac.{zone_id}.setpoint", "value": value}
)
),
StructuredTool(
name="query_twin",
description="Query digital twin for zone state",
func=lambda selector: self.mcp_twin.call_tool(
"twin_query",
{"selector": selector}
)
)
]
def build_graph(self) -> StateGraph:
workflow = StateGraph(EnergyAgentState)
# ... use self.tools in graph nodes
return workflow.compile()
Correlation with CloudEvents
Every MCP tool call generates audit CloudEvents:
async def emit_audit_event(self, call_id: str, tool_name: str, arguments: dict):
"""Emit CloudEvent for MCP tool call audit."""
event = CloudEvent(
type="citadel.mcp.tool_call",
source=self.spiffe_id,
subject=f"tool.{tool_name}",
id=call_id,
data={
"tool": tool_name,
"arguments": arguments,
"caller": self.caller_spiffe_id,
"timestamp": datetime.utcnow().isoformat()
}
)
await event_bus.publish("audit.mcp.tool_calls", event)
Security
Authentication
MCP servers authenticate callers via SPIFFE:
from spiffe import X509Source
class SecureMCPServer(MCPServer):
def __init__(self):
self.x509_source = X509Source()
async def authenticate_caller(self, request):
"""Verify caller's SPIFFE identity."""
client_cert = request.client_cert
svid = self.x509_source.parse_x509_svid(client_cert)
# Verify caller is authorized agent
if not svid.spiffe_id.path.startswith("/agent/"):
raise Unauthorized("Only agents can call MCP tools")
return svid.spiffe_id
Authorization
OPA policies control tool access:
package citadel.mcp.authz
default allow := false
# Energy agents can use EBO tools
allow if {
input.spiffe_id == "spiffe://citadel.mesh/agent/energy"
startswith(input.tool, "ebo_")
}
# Security agents can use security tools
allow if {
input.spiffe_id == "spiffe://citadel.mesh/agent/security"
input.tool in ["securityexpert_door_lock", "avigilon_snapshot"]
}
# All agents can read twin
allow if {
startswith(input.spiffe_id, "spiffe://citadel.mesh/agent/")
input.tool == "twin_query"
}
Related Documentation
- Integration Matrix - All vendor system integrations
- Protocol Strategy - MCP in protocol layer
- Agent Topology - Agents consuming MCP tools
- Safety Guardrails - OPA policy integration