Protocol Strategy
CitadelMesh takes a protocol-first approach to ensure durable, vendor-neutral interoperability across heterogeneous runtimes, frameworks, and building systems. This document explains our protocol choices, message formats, and integration patterns.
Design Philosophy
Protocols over Frameworks: We prioritize standardized, durable protocols over framework-specific abstractions. This ensures:
- Future-proof: Swap agent frameworks (LangGraph, AutoGen, Semantic Kernel) without rewriting integrations
- Polyglot: Python, .NET, TypeScript, and Rust can all participate equally
- Vendor-neutral: Building systems speak different protocols; we normalize to canonical models
- Debuggable: Standard protocols have excellent tooling (Wireshark, CloudEvents SDKs, protoc)
Protocol Stack
graph TB
subgraph "Application Layer"
Agents[Agents & Services]
Tools[MCP Tool Servers]
end
subgraph "Messaging Layer"
CE[CloudEvents Envelope]
PB[Protobuf Payloads]
end
subgraph "Transport Layer"
GRPC[gRPC]
KAFKA[Kafka/NATS/MQTT]
end
subgraph "Security Layer"
SPIFFE[SPIFFE/SPIRE mTLS]
JWT[Capability JWTs]
end
Agents --> CE
Tools --> CE
CE --> PB
PB --> GRPC
PB --> KAFKA
GRPC --> SPIFFE
KAFKA --> SPIFFE
CE --> JWT
Layer 1: Internal Eventing (Authoritative)
All asynchronous communication in CitadelMesh flows through CloudEvents.
CloudEvents 1.0 Envelope
Every event uses a standard CloudEvents envelope:
{
"specversion": "1.0",
"type": "citadelmesh.security.incident",
"source": "spiffe://citadel.mesh/security-agent",
"subject": "door.lobby.main",
"id": "01HQZXYZ9ABCDEFGHIJK",
"time": "2025-09-30T15:30:00Z",
"datacontenttype": "application/x-protobuf",
"dataschema": "https://schemas.citadelmesh.io/citadel.v1.Incident.v1",
"data_base64": "CgYxMjM0NTY..."
}
Key Fields:
type: Hierarchical event type (domain.subdomain.event)source: SPIFFE ID of the emitting agent/adaptersubject: Target entity in the digital twinid: ULID for correlation and deduplicationdataschema: Versioned protobuf schema URI
Transport Options
Edge to Edge: MQTT (mosquitto) for publish-subscribe at building edge
Edge to Cloud: NATS JetStream for reliable delivery with persistence
Cloud: Kafka/Redpanda for high-throughput event streaming
Abstraction: Dapr pub/sub provides transport-agnostic abstraction where needed
Topic Naming Convention
Topics follow a hierarchical naming scheme:
telemetry.raw.<adapter>.<site>.<device>
telemetry.canonical.<site>.<entity>.<metric>
control.cmd.<site>.<entity>
control.res.<site>.<command_id>
incidents.events.<site>
twin.mutations.<site>
policy.evaluations.<site>
Example: telemetry.canonical.building_a.hvac.zone1.temp
Layer 2: Protobuf Payloads
CloudEvents carry protobuf-encoded payloads for efficiency and schema enforcement.
Why Protobuf?
- Compact: 3-10x smaller than JSON
- Fast: Native deserialization in all languages
- Versioned: Field numbers enable backward/forward compatibility
- Code Generation: Auto-generate type-safe SDKs for Python, .NET, Node, Rust
- Schema Registry: Schemas as source of truth with CI validation
Core Protobuf Packages
citadel/v1/
├── telemetry.proto - Canonical metrics and status
├── commands.proto - Actuation commands and results
├── incidents.proto - Security and operational incidents
├── policy.proto - Policy evaluations and decisions
├── twin.proto - Digital twin mutations and snapshots
└── events.proto - Event type definitions
Example: Telemetry Point
syntax = "proto3";
package citadel.v1;
import "google/protobuf/timestamp.proto";
message Point {
string entity_id = 1; // Digital twin entity ID
string metric = 2; // Metric name (e.g., "temp.zone")
double value = 3; // Numeric value
string unit = 4; // SI unit (°C, kW, lux)
google.protobuf.Timestamp timestamp = 5;
string quality = 6; // good|bad|uncertain
map<string,string> attributes = 7; // Extra metadata
}
Example: Control Command
message Command {
string id = 1; // Unique command ID (ULID)
string target_id = 2; // Target entity (door, zone, light)
string action = 3; // Action (unlock, set_temp, dim)
map<string,string> params = 4; // Action parameters
int32 ttl_seconds = 5; // Time-to-live
string safety_token = 6; // OPA-issued safety token
string issued_by = 7; // Issuing agent SPIFFE ID
google.protobuf.Timestamp issued_at = 8;
Priority priority = 9; // Command priority
}
enum Priority {
PRIORITY_UNSPECIFIED = 0;
PRIORITY_LOW = 1;
PRIORITY_NORMAL = 2;
PRIORITY_HIGH = 3;
PRIORITY_EMERGENCY = 4;
}
Schema Evolution Strategy
- Never reuse field numbers: Reserved fields prevent conflicts
- Add optional fields: New fields must be optional with defaults
- Version packages: Major breaking changes get new package version (
v2) - CI validation: Consumer-driven contract tests prevent breaking changes
- Dual writing: Rolling upgrades support old + new schemas temporarily
Layer 3: Cross-Runtime RPC
Synchronous calls between agents and services use gRPC with protobuf.
When to Use gRPC vs Events
Use Events (CloudEvents + Kafka/NATS):
- Asynchronous notifications (telemetry, incidents)
- Fan-out to multiple subscribers
- Replay and audit trail required
- Temporal decoupling (offline tolerance)
Use gRPC:
- Request-response patterns (read digital twin state)
- Low-latency requirements (< 10ms)
- Streaming (live camera feeds, continuous telemetry)
- Direct Python ↔ .NET calls
Example gRPC Service
service TwinService {
// Query digital twin entities
rpc QueryEntities(QueryRequest) returns (QueryResponse);
// Get entity state
rpc GetEntity(GetEntityRequest) returns (Entity);
// Mutate entity (publishes twin.mutation event)
rpc MutateEntity(MutationRequest) returns (MutationResponse);
// Stream live entity updates
rpc StreamEntity(StreamRequest) returns (stream Entity);
}
SDK Generation
Protobuf schemas generate SDKs for all languages:
# Python
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. citadel/v1/*.proto
# .NET
dotnet grpc add-file citadel/v1/commands.proto
# Node.js
grpc_tools_node_protoc --js_out=import_style=commonjs:. --grpc_out=. citadel/v1/*.proto
Layer 4: MCP Tool Interoperability
Model Context Protocol (MCP) exposes building resources as tool servers that any agent framework can consume.
MCP Architecture
graph LR
Agent[LangGraph Agent] --> MCP[MCP Client]
IDE[VS Code/Cursor] --> MCP
SK[Semantic Kernel] --> MCP
MCP --> BACnet[BACnet MCP Server]
MCP --> OPCUA[OPC UA MCP Server]
MCP --> Twin[Digital Twin MCP Server]
MCP --> Policy[Policy MCP Server]
BACnet --> Device[BACnet Devices]
OPCUA --> PLC[Industrial PLCs]
Twin --> Graph[Graph Database]
Policy --> OPA[OPA Engine]
Example MCP Tool Definition
// BACnet MCP Server exposes building automation tools
const bacnetTools = [
{
name: "bacnet_read_point",
description: "Read a BACnet point value",
inputSchema: {
type: "object",
properties: {
point_id: { type: "string", description: "BACnet object identifier" }
},
required: ["point_id"]
}
},
{
name: "bacnet_write_point",
description: "Write to a BACnet point (requires OPA approval)",
inputSchema: {
type: "object",
properties: {
point_id: { type: "string" },
value: { type: "number" },
priority: { type: "integer", minimum: 1, maximum: 16 }
}
}
}
];
Benefits of MCP
- Framework-agnostic: LangGraph, AutoGen, Semantic Kernel can all use the same tools
- IDE integration: Copilot and Claude can directly control building systems
- Composability: Chain MCP tools into complex workflows
- Standardization: One tool server serves all consumers
MCP Correlation with CloudEvents
Every MCP tool call generates a CloudEvent for audit:
# Agent calls MCP tool
result = await mcp_client.call_tool("bacnet_write_point", {
"point_id": "hvac.zone1.setpoint",
"value": 72.0,
"priority": 8
})
# MCP server emits CloudEvent
event = CloudEvent(
type="citadel.control.command",
source="spiffe://citadel.mesh/bacnet-adapter",
subject="hvac.zone1.setpoint",
data=Command(
id=ulid(),
target_id="hvac.zone1.setpoint",
action="write_setpoint",
params={"value": "72.0", "priority": "8"}
)
)
Layer 5: Agent-to-Agent (A2A) Gateway
For cross-organization agent collaboration, we provide an A2A gateway that maps CloudEvents to A2A protocol.
A2A Use Cases
- Multi-tenant coordination: Energy aggregator coordinating multiple buildings
- Vendor collaboration: Security vendor agents collaborating with facility agents
- Cross-site workflows: Enterprise policies across multiple buildings
Capability Descriptors
Agents publish signed capability descriptors:
{
"id": "urn:citadel:agent:security:v1",
"version": "1.0.0",
"issuer": "spiffe://citadel.mesh/security-agent",
"signature": "...",
"actions": [
{
"name": "lock_door",
"input_schema": "https://schemas.citadelmesh.io/LockDoorRequest",
"output_schema": "https://schemas.citadelmesh.io/LockDoorResponse",
"safety_level": "high",
"rate_limit": "10/minute"
}
],
"resources": [
{
"uri": "citadel://building_a/doors/*",
"verbs": ["read", "lock", "unlock"]
}
],
"constraints": [
{
"type": "policy",
"policy_ids": ["citadel.security.door_access"]
}
]
}
Discovery and Authorization
- Registration: Agents publish capability descriptors to discovery registry (signed)
- Discovery: Consumers query by action/resource tags
- Authorization: Exchange short-lived capability tokens via mTLS + JWT
- Invocation: CloudEvents mapped to A2A envelopes at gateway
- Audit: All cross-agent calls logged immutably
Security and Identity
All protocol layers integrate with SPIFFE/SPIRE for zero-trust identity:
mTLS Everywhere
- Every event source authenticated via SPIFFE ID
- Kafka/NATS clients present SVIDs for authentication
- gRPC connections use mutual TLS
- MCP servers verify caller identity
Capability JWTs
CloudEvents carry signed JWTs with scoped capabilities:
{
"sub": "spiffe://citadel.mesh/energy-agent",
"iss": "spiffe://citadel.mesh/opa-policy-engine",
"aud": ["citadel.control.hvac"],
"exp": 1704067200,
"capabilities": [
"hvac:write_setpoint",
"hvac:read_point"
],
"constraints": {
"temp_min": 65,
"temp_max": 78
}
}
Audit Trail
Every protocol interaction is auditable:
# CloudEvent attributes provide full audit trail
audit_event = CloudEvent(
type="citadel.audit.action",
source=agent.spiffe_id,
subject=entity_id,
data={
"action": "write_setpoint",
"actor": agent.spiffe_id,
"target": entity_id,
"timestamp": now,
"approved_by": opa_policy,
"safety_token": token,
"result": "success"
}
)
Protocol Roadmap
v1.0 (Current)
- CloudEvents + Protobuf internal event bus
- gRPC for cross-runtime RPC
- MCP tool servers for adapters and twin
- SPIFFE/SPIRE identity foundation
v1.5 (Q2 2025)
- A2A gateway for agent-to-agent interop
- Discovery registry with signed capabilities
- Enhanced MCP servers (streaming, subscriptions)
v2.0 (Q4 2025)
- Policy-aware capability negotiation
- Cross-site federation with trust domains
- Vendor certification framework
Implementation Patterns
Publishing Events
from cloudevents.http import CloudEvent
from citadel.v1 import telemetry_pb2
# Create protobuf payload
point = telemetry_pb2.Point(
entity_id="hvac.zone1.temp",
metric="temp.zone",
value=72.5,
unit="°F",
quality="good"
)
# Wrap in CloudEvent
event = CloudEvent(
type="citadel.telemetry.point",
source="spiffe://citadel.mesh/ebo-adapter",
subject="hvac.zone1.temp",
datacontenttype="application/x-protobuf",
dataschema="https://schemas.citadelmesh.io/citadel.v1.Point",
data=point.SerializeToString()
)
# Publish to event bus
await event_bus.publish("telemetry.canonical.building_a", event)
Consuming Events
async def handle_telemetry(event: CloudEvent):
# Deserialize protobuf payload
point = telemetry_pb2.Point()
point.ParseFromString(event.data)
# Process based on event type
if event["type"] == "citadel.telemetry.point":
await process_telemetry_point(point)
# Verify source identity
assert event["source"].startswith("spiffe://citadel.mesh/")
# Subscribe to topic
await event_bus.subscribe(
"telemetry.canonical.building_a.hvac.>",
handle_telemetry
)
Calling gRPC Services
import grpc
from citadel.v1 import twin_service_pb2_grpc, twin_service_pb2
# Create gRPC channel with mTLS
credentials = grpc.ssl_channel_credentials(
root_certificates=ca_cert,
private_key=client_key,
certificate_chain=client_cert
)
channel = grpc.secure_channel("twin-service:8443", credentials)
client = twin_service_pb2_grpc.TwinServiceStub(channel)
# Call service
request = twin_service_pb2.GetEntityRequest(entity_id="hvac.zone1")
entity = await client.GetEntity(request)
Related Documentation
- Data Contracts - Detailed protobuf schema documentation
- Identity Foundation - SPIFFE/SPIRE architecture
- MCP Adapters - MCP server implementation patterns
- Safety Guardrails - Policy enforcement in protocol layer