Digital Twin Architecture
The CitadelMesh digital twin provides a live, queryable representation of building state, synchronized across all vendor systems and agents. This document explains the twin architecture, ontology strategy, and state synchronization.
Digital Twin Concept
Definition: A digital twin is a virtual representation of physical building assets, spaces, and systems that mirrors their current state, relationships, and capabilities.
Purpose in CitadelMesh:
- Unified View: Single source of truth across heterogeneous vendor systems
- State Reconciliation: Resolve conflicts when multiple systems report different states
- Queryable Graph: Navigate building hierarchy and relationships
- Historical Context: Time-travel queries for incident investigation
- Semantic Layer: Standardized ontology (Brick/Haystack) over proprietary schemas
Architecture
graph TB
subgraph "Vendor Systems"
EBO[EcoStruxure EBO]
SSE[Security Expert]
Avigilon[Avigilon]
HA[Home Assistant]
end
subgraph "Adapters"
EBOAdapter[EBO Adapter]
SSEAdapter[SSE Adapter]
AVIAdapter[Avigilon Adapter]
HAAdapter[HA Adapter]
end
subgraph "Event Bus"
NATS[NATS JetStream]
end
subgraph "Twin Service"
Reconciler[State Reconciler]
Validator[Ontology Validator]
GraphDB[(Neo4j/<br/>JanusGraph)]
end
subgraph "Agents"
Security[Security Agent]
Energy[Energy Agent]
Automation[Automation Agent]
end
EBO --> EBOAdapter
SSE --> SSEAdapter
Avigilon --> AVIAdapter
HA --> HAAdapter
EBOAdapter --> NATS
SSEAdapter --> NATS
AVIAdapter --> NATS
HAAdapter --> NATS
NATS --> Reconciler
Reconciler --> Validator
Validator --> GraphDB
Security --> GraphDB
Energy --> GraphDB
Automation --> GraphDB
Ontology Foundation
Brick Schema
Base ontology for building systems and equipment.
# Brick example (Turtle RDF syntax)
@prefix brick: <https://brickschema.org/schema/Brick#> .
@prefix unit: <http://qudt.org/vocab/unit/> .
# HVAC Zone
:Zone1 a brick:HVAC_Zone ;
brick:hasPoint :Zone1_Temp ;
brick:hasPoint :Zone1_Setpoint ;
brick:hasPart :Zone1_VAV .
# Temperature sensor
:Zone1_Temp a brick:Temperature_Sensor ;
brick:hasUnit unit:DEG_F ;
brick:isPointOf :Zone1 .
# Setpoint
:Zone1_Setpoint a brick:Temperature_Setpoint ;
brick:hasUnit unit:DEG_F ;
brick:isPointOf :Zone1 .
# VAV box
:Zone1_VAV a brick:Variable_Air_Volume_Box ;
brick:feeds :Zone1 ;
brick:controls :Zone1_Temp .
Project Haystack
Tagging convention for building data.
{
"id": "@zone1",
"dis": "Conference Room Zone 1",
"hvac": "m:",
"zone": "m:",
"floor": "2",
"area": {"_kind": "number", "val": 1200, "unit": "ft²"}
}
{
"id": "@zone1.temp",
"dis": "Zone 1 Temperature Sensor",
"point": "m:",
"sensor": "m:",
"temp": "m:",
"cur": {"_kind": "number", "val": 72.5, "unit": "°F"},
"equipRef": "@zone1.vav",
"siteRef": "@building_a"
}
CitadelMesh Extensions
Custom extensions for security, energy, and AI agents.
@prefix citadel: <https://citadelmesh.io/ontology#> .
# Security domain
:LobbyDoor a brick:Door, citadel:AccessControlPoint ;
citadel:controlledBy :SecurityExpert ;
citadel:monitorsBy :CameraLobby01 ;
citadel:requiresCredential citadel:BadgeAccess ;
citadel:emergencyEgress true .
# Camera with analytics
:CameraLobby01 a brick:Camera, citadel:AnalyticsCamera ;
citadel:hasAnalytic citadel:PersonDetection ;
citadel:hasAnalytic citadel:LoiteringDetection ;
citadel:monitors :LobbyDoor .
# Energy tariff
:BuildingA_ElectricMeter a brick:Electric_Meter ;
citadel:hasTariff :TOU_Schedule ;
citadel:hasDemandLimit 500 ; # kW
citadel:demandLimitUnit unit:KiloW .
:TOU_Schedule a citadel:TimeOfUseTariff ;
citadel:peakRate 0.25 ; # $/kWh
citadel:offPeakRate 0.12 ;
citadel:peakHours "09:00-21:00" .
Graph Database Schema
Neo4j Implementation
Node Types:
Entity: Base type for all twin entitiesZone: HVAC zones, security zonesEquipment: VAV boxes, boilers, camerasPoint: Sensors, setpoints, commandsPolicy: OPA policies, constraintsAgent: AI agents in the system
Relationship Types:
FEEDS: Energy/air flowCONTROLS: Control relationshipsPART_OF: HierarchyMONITORS: SurveillanceLOCATED_IN: Spatial relationshipsDEPENDS_ON: Dependencies
Cypher Schema:
// Create entity nodes
CREATE (z:Entity:Zone {
entity_id: "hvac.zone1",
type: "HVAC_Zone",
site_id: "building_a",
floor: 2,
area_sqft: 1200
})
CREATE (t:Entity:Point {
entity_id: "hvac.zone1.temp",
type: "Temperature_Sensor",
metric: "temp.zone",
unit: "°F"
})
CREATE (s:Entity:Point {
entity_id: "hvac.zone1.setpoint",
type: "Temperature_Setpoint",
metric: "setpoint.current",
unit: "°F"
})
// Create relationships
CREATE (t)-[:PART_OF]->(z)
CREATE (s)-[:PART_OF]->(z)
CREATE (s)-[:CONTROLS]->(t)
// Add current state
SET t.current_value = 72.5
SET t.updated_at = datetime()
SET s.current_value = 72.0
SET s.updated_at = datetime()
State Synchronization
Mutation Events
Adapters publish twin.mutations events when vendor state changes:
from citadel.v1 import twin_pb2
mutation = twin_pb2.TwinMutation(
entity_id="hvac.zone1.temp",
op=twin_pb2.UPSERT,
ontology_model="brick",
payload=json.dumps({
"type": "Temperature_Sensor",
"value": 72.5,
"unit": "°F",
"quality": "good",
"timestamp": datetime.utcnow().isoformat()
}),
source="spiffe://citadel.mesh/adapter/ebo",
timestamp=Timestamp(seconds=int(time.time()))
)
# Publish to event bus
await event_bus.publish("twin.mutations.building_a", CloudEvent(
type="citadel.twin.mutation",
source="spiffe://citadel.mesh/adapter/ebo",
subject="hvac.zone1.temp",
data=mutation
))
Reconciliation Logic
Twin service reconciles conflicts when multiple sources report different states:
class TwinReconciler:
"""Reconcile state from multiple vendor sources."""
async def reconcile_entity_state(self, entity_id: str, mutations: List[TwinMutation]):
"""Reconcile entity state from multiple mutations."""
# Group mutations by source
by_source = defaultdict(list)
for mutation in mutations:
by_source[mutation.source].append(mutation)
# Determine authoritative source per attribute
resolved_state = {}
for attr in self._get_attributes(entity_id):
# Get values from each source
values = {
source: self._extract_value(muts, attr)
for source, muts in by_source.items()
}
# Apply reconciliation rules
resolved_value = await self._apply_reconciliation_rules(
entity_id, attr, values
)
resolved_state[attr] = resolved_value
# Update graph database
await self._update_entity(entity_id, resolved_state)
return resolved_state
async def _apply_reconciliation_rules(
self,
entity_id: str,
attr: str,
values: Dict[str, Any]
) -> Any:
"""Apply reconciliation rules based on entity and attribute."""
# Rule 1: Prefer most recent timestamp
if len(values) == 1:
return list(values.values())[0]
# Rule 2: Prefer authoritative source for entity type
auth_source = await self._get_authoritative_source(entity_id, attr)
if auth_source and auth_source in values:
return values[auth_source]
# Rule 3: Use most recent value
return max(values.items(), key=lambda x: x[1].get("timestamp", 0))[1]["value"]
Consistency Model
Eventually Consistent: Twin converges to correct state within seconds
Reconciliation Windows:
- Real-time: < 1 second for high-priority entities
- Standard: < 5 seconds for normal entities
- Batch: < 60 seconds for low-priority entities
Query API
gRPC Service
service TwinService {
// Query entities by selector
rpc QueryEntities(QueryRequest) returns (QueryResponse);
// Get single entity
rpc GetEntity(GetEntityRequest) returns (Entity);
// Mutate entity
rpc MutateEntity(MutationRequest) returns (MutationResponse);
// Stream live updates
rpc StreamEntity(StreamRequest) returns (stream Entity);
// Query relationships
rpc QueryRelationships(RelationshipQuery) returns (RelationshipResponse);
// Time-travel query
rpc GetEntityAtTime(EntityTimeQuery) returns (Entity);
}
Query Examples
Get all HVAC zones:
request = QueryRequest(
selector="type:HVAC_Zone",
site_id="building_a"
)
response = await twin_client.QueryEntities(request)
zones = response.entities
Get zone with relationships:
MATCH (z:Zone {entity_id: $entity_id})
OPTIONAL MATCH (z)<-[:PART_OF]-(p:Point)
OPTIONAL MATCH (z)<-[:FEEDS]-(e:Equipment)
RETURN z, collect(p) as points, collect(e) as equipment
request = GetEntityRequest(entity_id="hvac.zone1", include_relationships=True)
entity = await twin_client.GetEntity(request)
print(f"Zone: {entity.entity_id}")
print(f"Points: {len(entity.relationships)}")
Time-travel query (state at specific time):
request = EntityTimeQuery(
entity_id="door.lobby.main",
timestamp=Timestamp(seconds=int(incident_time.timestamp()))
)
# Get door state at time of incident
historical_entity = await twin_client.GetEntityAtTime(request)
door_state = historical_entity.attributes["status"] # "locked" or "unlocked"
Derived KPIs
Materialized Views
Compute aggregate metrics periodically:
// Energy efficiency score per zone
MATCH (z:Zone)<-[:PART_OF]-(m:Point {metric: "energy.consumption"})
WITH z, avg(m.current_value) as avg_consumption
MATCH (z)<-[:PART_OF]-(t:Point {metric: "temp.zone"})
WITH z, avg_consumption, avg(abs(t.current_value - t.setpoint_value)) as temp_deviation
SET z.efficiency_score = 100 - (temp_deviation * 5 + avg_consumption * 0.1)
RETURN z.entity_id, z.efficiency_score
Real-Time KPIs
Compute on query:
async def calculate_comfort_score(zone_id: str) -> float:
"""Calculate comfort score (0-100) for zone."""
# Get zone state
entity = await twin_client.GetEntity(GetEntityRequest(entity_id=zone_id))
temp = float(entity.attributes["current_temp"])
setpoint = float(entity.attributes["target_setpoint"])
humidity = float(entity.attributes.get("humidity", 50))
# Temperature deviation penalty
temp_penalty = abs(temp - setpoint) * 5
# Humidity penalty (ideal 40-60%)
humidity_penalty = max(0, abs(humidity - 50) - 10)
# Comfort score
score = 100 - temp_penalty - humidity_penalty
return max(0, min(100, score))
Simulation Integration
EnergyPlus Integration
class EnergyPlusSim:
"""Simulate HVAC scenarios with EnergyPlus."""
async def simulate_setpoint_change(
self,
zone_id: str,
target_temp: float,
duration_hours: int
) -> SimResult:
"""Simulate energy impact of setpoint change."""
# Get zone from twin
zone = await twin_client.GetEntity(GetEntityRequest(entity_id=zone_id))
# Build EnergyPlus model from twin
idf_model = self._build_idf_from_twin(zone)
# Run simulation
result = await energyplus.run_simulation(
idf_model,
setpoint=target_temp,
duration=duration_hours
)
return SimResult(
energy_kwh=result.total_energy,
cost_usd=result.total_energy * current_tariff,
comfort_score=result.comfort_metrics.avg_pmv
)
Related Documentation
- Protocol Strategy - Twin mutation CloudEvents
- Data Contracts - Twin protobuf schemas
- Integration Matrix - Vendor data mapping
- Agent Topology - Agents querying twin