Skip to main content

Protocol Strategy

CitadelMesh takes a protocol-first approach to ensure durable, vendor-neutral interoperability across heterogeneous runtimes, frameworks, and building systems. This document explains our protocol choices, message formats, and integration patterns.

Design Philosophy

Protocols over Frameworks: We prioritize standardized, durable protocols over framework-specific abstractions. This ensures:

  • Future-proof: Swap agent frameworks (LangGraph, AutoGen, Semantic Kernel) without rewriting integrations
  • Polyglot: Python, .NET, TypeScript, and Rust can all participate equally
  • Vendor-neutral: Building systems speak different protocols; we normalize to canonical models
  • Debuggable: Standard protocols have excellent tooling (Wireshark, CloudEvents SDKs, protoc)

Protocol Stack

graph TB
subgraph "Application Layer"
Agents[Agents & Services]
Tools[MCP Tool Servers]
end

subgraph "Messaging Layer"
CE[CloudEvents Envelope]
PB[Protobuf Payloads]
end

subgraph "Transport Layer"
GRPC[gRPC]
KAFKA[Kafka/NATS/MQTT]
end

subgraph "Security Layer"
SPIFFE[SPIFFE/SPIRE mTLS]
JWT[Capability JWTs]
end

Agents --> CE
Tools --> CE
CE --> PB
PB --> GRPC
PB --> KAFKA
GRPC --> SPIFFE
KAFKA --> SPIFFE
CE --> JWT

Layer 1: Internal Eventing (Authoritative)

All asynchronous communication in CitadelMesh flows through CloudEvents.

CloudEvents 1.0 Envelope

Every event uses a standard CloudEvents envelope:

{
"specversion": "1.0",
"type": "citadelmesh.security.incident",
"source": "spiffe://citadel.mesh/security-agent",
"subject": "door.lobby.main",
"id": "01HQZXYZ9ABCDEFGHIJK",
"time": "2025-09-30T15:30:00Z",
"datacontenttype": "application/x-protobuf",
"dataschema": "https://schemas.citadelmesh.io/citadel.v1.Incident.v1",
"data_base64": "CgYxMjM0NTY..."
}

Key Fields:

  • type: Hierarchical event type (domain.subdomain.event)
  • source: SPIFFE ID of the emitting agent/adapter
  • subject: Target entity in the digital twin
  • id: ULID for correlation and deduplication
  • dataschema: Versioned protobuf schema URI

Transport Options

Edge to Edge: MQTT (mosquitto) for publish-subscribe at building edge

Edge to Cloud: NATS JetStream for reliable delivery with persistence

Cloud: Kafka/Redpanda for high-throughput event streaming

Abstraction: Dapr pub/sub provides transport-agnostic abstraction where needed

Topic Naming Convention

Topics follow a hierarchical naming scheme:

telemetry.raw.<adapter>.<site>.<device>
telemetry.canonical.<site>.<entity>.<metric>
control.cmd.<site>.<entity>
control.res.<site>.<command_id>
incidents.events.<site>
twin.mutations.<site>
policy.evaluations.<site>

Example: telemetry.canonical.building_a.hvac.zone1.temp

Layer 2: Protobuf Payloads

CloudEvents carry protobuf-encoded payloads for efficiency and schema enforcement.

Why Protobuf?

  • Compact: 3-10x smaller than JSON
  • Fast: Native deserialization in all languages
  • Versioned: Field numbers enable backward/forward compatibility
  • Code Generation: Auto-generate type-safe SDKs for Python, .NET, Node, Rust
  • Schema Registry: Schemas as source of truth with CI validation

Core Protobuf Packages

citadel/v1/
├── telemetry.proto - Canonical metrics and status
├── commands.proto - Actuation commands and results
├── incidents.proto - Security and operational incidents
├── policy.proto - Policy evaluations and decisions
├── twin.proto - Digital twin mutations and snapshots
└── events.proto - Event type definitions

Example: Telemetry Point

syntax = "proto3";
package citadel.v1;

import "google/protobuf/timestamp.proto";

message Point {
string entity_id = 1; // Digital twin entity ID
string metric = 2; // Metric name (e.g., "temp.zone")
double value = 3; // Numeric value
string unit = 4; // SI unit (°C, kW, lux)
google.protobuf.Timestamp timestamp = 5;
string quality = 6; // good|bad|uncertain
map<string,string> attributes = 7; // Extra metadata
}

Example: Control Command

message Command {
string id = 1; // Unique command ID (ULID)
string target_id = 2; // Target entity (door, zone, light)
string action = 3; // Action (unlock, set_temp, dim)
map<string,string> params = 4; // Action parameters
int32 ttl_seconds = 5; // Time-to-live
string safety_token = 6; // OPA-issued safety token
string issued_by = 7; // Issuing agent SPIFFE ID
google.protobuf.Timestamp issued_at = 8;
Priority priority = 9; // Command priority
}

enum Priority {
PRIORITY_UNSPECIFIED = 0;
PRIORITY_LOW = 1;
PRIORITY_NORMAL = 2;
PRIORITY_HIGH = 3;
PRIORITY_EMERGENCY = 4;
}

Schema Evolution Strategy

  1. Never reuse field numbers: Reserved fields prevent conflicts
  2. Add optional fields: New fields must be optional with defaults
  3. Version packages: Major breaking changes get new package version (v2)
  4. CI validation: Consumer-driven contract tests prevent breaking changes
  5. Dual writing: Rolling upgrades support old + new schemas temporarily

Layer 3: Cross-Runtime RPC

Synchronous calls between agents and services use gRPC with protobuf.

When to Use gRPC vs Events

Use Events (CloudEvents + Kafka/NATS):

  • Asynchronous notifications (telemetry, incidents)
  • Fan-out to multiple subscribers
  • Replay and audit trail required
  • Temporal decoupling (offline tolerance)

Use gRPC:

  • Request-response patterns (read digital twin state)
  • Low-latency requirements (< 10ms)
  • Streaming (live camera feeds, continuous telemetry)
  • Direct Python ↔ .NET calls

Example gRPC Service

service TwinService {
// Query digital twin entities
rpc QueryEntities(QueryRequest) returns (QueryResponse);

// Get entity state
rpc GetEntity(GetEntityRequest) returns (Entity);

// Mutate entity (publishes twin.mutation event)
rpc MutateEntity(MutationRequest) returns (MutationResponse);

// Stream live entity updates
rpc StreamEntity(StreamRequest) returns (stream Entity);
}

SDK Generation

Protobuf schemas generate SDKs for all languages:

# Python
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. citadel/v1/*.proto

# .NET
dotnet grpc add-file citadel/v1/commands.proto

# Node.js
grpc_tools_node_protoc --js_out=import_style=commonjs:. --grpc_out=. citadel/v1/*.proto

Layer 4: MCP Tool Interoperability

Model Context Protocol (MCP) exposes building resources as tool servers that any agent framework can consume.

MCP Architecture

graph LR
Agent[LangGraph Agent] --> MCP[MCP Client]
IDE[VS Code/Cursor] --> MCP
SK[Semantic Kernel] --> MCP

MCP --> BACnet[BACnet MCP Server]
MCP --> OPCUA[OPC UA MCP Server]
MCP --> Twin[Digital Twin MCP Server]
MCP --> Policy[Policy MCP Server]

BACnet --> Device[BACnet Devices]
OPCUA --> PLC[Industrial PLCs]
Twin --> Graph[Graph Database]
Policy --> OPA[OPA Engine]

Example MCP Tool Definition

// BACnet MCP Server exposes building automation tools
const bacnetTools = [
{
name: "bacnet_read_point",
description: "Read a BACnet point value",
inputSchema: {
type: "object",
properties: {
point_id: { type: "string", description: "BACnet object identifier" }
},
required: ["point_id"]
}
},
{
name: "bacnet_write_point",
description: "Write to a BACnet point (requires OPA approval)",
inputSchema: {
type: "object",
properties: {
point_id: { type: "string" },
value: { type: "number" },
priority: { type: "integer", minimum: 1, maximum: 16 }
}
}
}
];

Benefits of MCP

  1. Framework-agnostic: LangGraph, AutoGen, Semantic Kernel can all use the same tools
  2. IDE integration: Copilot and Claude can directly control building systems
  3. Composability: Chain MCP tools into complex workflows
  4. Standardization: One tool server serves all consumers

MCP Correlation with CloudEvents

Every MCP tool call generates a CloudEvent for audit:

# Agent calls MCP tool
result = await mcp_client.call_tool("bacnet_write_point", {
"point_id": "hvac.zone1.setpoint",
"value": 72.0,
"priority": 8
})

# MCP server emits CloudEvent
event = CloudEvent(
type="citadel.control.command",
source="spiffe://citadel.mesh/bacnet-adapter",
subject="hvac.zone1.setpoint",
data=Command(
id=ulid(),
target_id="hvac.zone1.setpoint",
action="write_setpoint",
params={"value": "72.0", "priority": "8"}
)
)

Layer 5: Agent-to-Agent (A2A) Gateway

For cross-organization agent collaboration, we provide an A2A gateway that maps CloudEvents to A2A protocol.

A2A Use Cases

  • Multi-tenant coordination: Energy aggregator coordinating multiple buildings
  • Vendor collaboration: Security vendor agents collaborating with facility agents
  • Cross-site workflows: Enterprise policies across multiple buildings

Capability Descriptors

Agents publish signed capability descriptors:

{
"id": "urn:citadel:agent:security:v1",
"version": "1.0.0",
"issuer": "spiffe://citadel.mesh/security-agent",
"signature": "...",
"actions": [
{
"name": "lock_door",
"input_schema": "https://schemas.citadelmesh.io/LockDoorRequest",
"output_schema": "https://schemas.citadelmesh.io/LockDoorResponse",
"safety_level": "high",
"rate_limit": "10/minute"
}
],
"resources": [
{
"uri": "citadel://building_a/doors/*",
"verbs": ["read", "lock", "unlock"]
}
],
"constraints": [
{
"type": "policy",
"policy_ids": ["citadel.security.door_access"]
}
]
}

Discovery and Authorization

  1. Registration: Agents publish capability descriptors to discovery registry (signed)
  2. Discovery: Consumers query by action/resource tags
  3. Authorization: Exchange short-lived capability tokens via mTLS + JWT
  4. Invocation: CloudEvents mapped to A2A envelopes at gateway
  5. Audit: All cross-agent calls logged immutably

Security and Identity

All protocol layers integrate with SPIFFE/SPIRE for zero-trust identity:

mTLS Everywhere

  • Every event source authenticated via SPIFFE ID
  • Kafka/NATS clients present SVIDs for authentication
  • gRPC connections use mutual TLS
  • MCP servers verify caller identity

Capability JWTs

CloudEvents carry signed JWTs with scoped capabilities:

{
"sub": "spiffe://citadel.mesh/energy-agent",
"iss": "spiffe://citadel.mesh/opa-policy-engine",
"aud": ["citadel.control.hvac"],
"exp": 1704067200,
"capabilities": [
"hvac:write_setpoint",
"hvac:read_point"
],
"constraints": {
"temp_min": 65,
"temp_max": 78
}
}

Audit Trail

Every protocol interaction is auditable:

# CloudEvent attributes provide full audit trail
audit_event = CloudEvent(
type="citadel.audit.action",
source=agent.spiffe_id,
subject=entity_id,
data={
"action": "write_setpoint",
"actor": agent.spiffe_id,
"target": entity_id,
"timestamp": now,
"approved_by": opa_policy,
"safety_token": token,
"result": "success"
}
)

Protocol Roadmap

v1.0 (Current)

  • CloudEvents + Protobuf internal event bus
  • gRPC for cross-runtime RPC
  • MCP tool servers for adapters and twin
  • SPIFFE/SPIRE identity foundation

v1.5 (Q2 2025)

  • A2A gateway for agent-to-agent interop
  • Discovery registry with signed capabilities
  • Enhanced MCP servers (streaming, subscriptions)

v2.0 (Q4 2025)

  • Policy-aware capability negotiation
  • Cross-site federation with trust domains
  • Vendor certification framework

Implementation Patterns

Publishing Events

from cloudevents.http import CloudEvent
from citadel.v1 import telemetry_pb2

# Create protobuf payload
point = telemetry_pb2.Point(
entity_id="hvac.zone1.temp",
metric="temp.zone",
value=72.5,
unit="°F",
quality="good"
)

# Wrap in CloudEvent
event = CloudEvent(
type="citadel.telemetry.point",
source="spiffe://citadel.mesh/ebo-adapter",
subject="hvac.zone1.temp",
datacontenttype="application/x-protobuf",
dataschema="https://schemas.citadelmesh.io/citadel.v1.Point",
data=point.SerializeToString()
)

# Publish to event bus
await event_bus.publish("telemetry.canonical.building_a", event)

Consuming Events

async def handle_telemetry(event: CloudEvent):
# Deserialize protobuf payload
point = telemetry_pb2.Point()
point.ParseFromString(event.data)

# Process based on event type
if event["type"] == "citadel.telemetry.point":
await process_telemetry_point(point)

# Verify source identity
assert event["source"].startswith("spiffe://citadel.mesh/")

# Subscribe to topic
await event_bus.subscribe(
"telemetry.canonical.building_a.hvac.>",
handle_telemetry
)

Calling gRPC Services

import grpc
from citadel.v1 import twin_service_pb2_grpc, twin_service_pb2

# Create gRPC channel with mTLS
credentials = grpc.ssl_channel_credentials(
root_certificates=ca_cert,
private_key=client_key,
certificate_chain=client_cert
)
channel = grpc.secure_channel("twin-service:8443", credentials)
client = twin_service_pb2_grpc.TwinServiceStub(channel)

# Call service
request = twin_service_pb2.GetEntityRequest(entity_id="hvac.zone1")
entity = await client.GetEntity(request)

See Also