Skip to main content

Digital Twin Architecture

The CitadelMesh digital twin provides a live, queryable representation of building state, synchronized across all vendor systems and agents. This document explains the twin architecture, ontology strategy, and state synchronization.

Digital Twin Concept

Definition: A digital twin is a virtual representation of physical building assets, spaces, and systems that mirrors their current state, relationships, and capabilities.

Purpose in CitadelMesh:

  1. Unified View: Single source of truth across heterogeneous vendor systems
  2. State Reconciliation: Resolve conflicts when multiple systems report different states
  3. Queryable Graph: Navigate building hierarchy and relationships
  4. Historical Context: Time-travel queries for incident investigation
  5. Semantic Layer: Standardized ontology (Brick/Haystack) over proprietary schemas

Architecture

graph TB
subgraph "Vendor Systems"
EBO[EcoStruxure EBO]
SSE[Security Expert]
Avigilon[Avigilon]
HA[Home Assistant]
end

subgraph "Adapters"
EBOAdapter[EBO Adapter]
SSEAdapter[SSE Adapter]
AVIAdapter[Avigilon Adapter]
HAAdapter[HA Adapter]
end

subgraph "Event Bus"
NATS[NATS JetStream]
end

subgraph "Twin Service"
Reconciler[State Reconciler]
Validator[Ontology Validator]
GraphDB[(Neo4j/<br/>JanusGraph)]
end

subgraph "Agents"
Security[Security Agent]
Energy[Energy Agent]
Automation[Automation Agent]
end

EBO --> EBOAdapter
SSE --> SSEAdapter
Avigilon --> AVIAdapter
HA --> HAAdapter

EBOAdapter --> NATS
SSEAdapter --> NATS
AVIAdapter --> NATS
HAAdapter --> NATS

NATS --> Reconciler
Reconciler --> Validator
Validator --> GraphDB

Security --> GraphDB
Energy --> GraphDB
Automation --> GraphDB

Ontology Foundation

Brick Schema

Base ontology for building systems and equipment.

# Brick example (Turtle RDF syntax)
@prefix brick: <https://brickschema.org/schema/Brick#> .
@prefix unit: <http://qudt.org/vocab/unit/> .

# HVAC Zone
:Zone1 a brick:HVAC_Zone ;
brick:hasPoint :Zone1_Temp ;
brick:hasPoint :Zone1_Setpoint ;
brick:hasPart :Zone1_VAV .

# Temperature sensor
:Zone1_Temp a brick:Temperature_Sensor ;
brick:hasUnit unit:DEG_F ;
brick:isPointOf :Zone1 .

# Setpoint
:Zone1_Setpoint a brick:Temperature_Setpoint ;
brick:hasUnit unit:DEG_F ;
brick:isPointOf :Zone1 .

# VAV box
:Zone1_VAV a brick:Variable_Air_Volume_Box ;
brick:feeds :Zone1 ;
brick:controls :Zone1_Temp .

Project Haystack

Tagging convention for building data.

{
"id": "@zone1",
"dis": "Conference Room Zone 1",
"hvac": "m:",
"zone": "m:",
"floor": "2",
"area": {"_kind": "number", "val": 1200, "unit": "ft²"}
}

{
"id": "@zone1.temp",
"dis": "Zone 1 Temperature Sensor",
"point": "m:",
"sensor": "m:",
"temp": "m:",
"cur": {"_kind": "number", "val": 72.5, "unit": "°F"},
"equipRef": "@zone1.vav",
"siteRef": "@building_a"
}

CitadelMesh Extensions

Custom extensions for security, energy, and AI agents.

@prefix citadel: <https://citadelmesh.io/ontology#> .

# Security domain
:LobbyDoor a brick:Door, citadel:AccessControlPoint ;
citadel:controlledBy :SecurityExpert ;
citadel:monitorsBy :CameraLobby01 ;
citadel:requiresCredential citadel:BadgeAccess ;
citadel:emergencyEgress true .

# Camera with analytics
:CameraLobby01 a brick:Camera, citadel:AnalyticsCamera ;
citadel:hasAnalytic citadel:PersonDetection ;
citadel:hasAnalytic citadel:LoiteringDetection ;
citadel:monitors :LobbyDoor .

# Energy tariff
:BuildingA_ElectricMeter a brick:Electric_Meter ;
citadel:hasTariff :TOU_Schedule ;
citadel:hasDemandLimit 500 ; # kW
citadel:demandLimitUnit unit:KiloW .

:TOU_Schedule a citadel:TimeOfUseTariff ;
citadel:peakRate 0.25 ; # $/kWh
citadel:offPeakRate 0.12 ;
citadel:peakHours "09:00-21:00" .

Graph Database Schema

Neo4j Implementation

Node Types:

  • Entity: Base type for all twin entities
  • Zone: HVAC zones, security zones
  • Equipment: VAV boxes, boilers, cameras
  • Point: Sensors, setpoints, commands
  • Policy: OPA policies, constraints
  • Agent: AI agents in the system

Relationship Types:

  • FEEDS: Energy/air flow
  • CONTROLS: Control relationships
  • PART_OF: Hierarchy
  • MONITORS: Surveillance
  • LOCATED_IN: Spatial relationships
  • DEPENDS_ON: Dependencies

Cypher Schema:

// Create entity nodes
CREATE (z:Entity:Zone {
entity_id: "hvac.zone1",
type: "HVAC_Zone",
site_id: "building_a",
floor: 2,
area_sqft: 1200
})

CREATE (t:Entity:Point {
entity_id: "hvac.zone1.temp",
type: "Temperature_Sensor",
metric: "temp.zone",
unit: "°F"
})

CREATE (s:Entity:Point {
entity_id: "hvac.zone1.setpoint",
type: "Temperature_Setpoint",
metric: "setpoint.current",
unit: "°F"
})

// Create relationships
CREATE (t)-[:PART_OF]->(z)
CREATE (s)-[:PART_OF]->(z)
CREATE (s)-[:CONTROLS]->(t)

// Add current state
SET t.current_value = 72.5
SET t.updated_at = datetime()
SET s.current_value = 72.0
SET s.updated_at = datetime()

State Synchronization

Mutation Events

Adapters publish twin.mutations events when vendor state changes:

from citadel.v1 import twin_pb2

mutation = twin_pb2.TwinMutation(
entity_id="hvac.zone1.temp",
op=twin_pb2.UPSERT,
ontology_model="brick",
payload=json.dumps({
"type": "Temperature_Sensor",
"value": 72.5,
"unit": "°F",
"quality": "good",
"timestamp": datetime.utcnow().isoformat()
}),
source="spiffe://citadel.mesh/adapter/ebo",
timestamp=Timestamp(seconds=int(time.time()))
)

# Publish to event bus
await event_bus.publish("twin.mutations.building_a", CloudEvent(
type="citadel.twin.mutation",
source="spiffe://citadel.mesh/adapter/ebo",
subject="hvac.zone1.temp",
data=mutation
))

Reconciliation Logic

Twin service reconciles conflicts when multiple sources report different states:

class TwinReconciler:
"""Reconcile state from multiple vendor sources."""

async def reconcile_entity_state(self, entity_id: str, mutations: List[TwinMutation]):
"""Reconcile entity state from multiple mutations."""

# Group mutations by source
by_source = defaultdict(list)
for mutation in mutations:
by_source[mutation.source].append(mutation)

# Determine authoritative source per attribute
resolved_state = {}

for attr in self._get_attributes(entity_id):
# Get values from each source
values = {
source: self._extract_value(muts, attr)
for source, muts in by_source.items()
}

# Apply reconciliation rules
resolved_value = await self._apply_reconciliation_rules(
entity_id, attr, values
)

resolved_state[attr] = resolved_value

# Update graph database
await self._update_entity(entity_id, resolved_state)

return resolved_state

async def _apply_reconciliation_rules(
self,
entity_id: str,
attr: str,
values: Dict[str, Any]
) -> Any:
"""Apply reconciliation rules based on entity and attribute."""

# Rule 1: Prefer most recent timestamp
if len(values) == 1:
return list(values.values())[0]

# Rule 2: Prefer authoritative source for entity type
auth_source = await self._get_authoritative_source(entity_id, attr)
if auth_source and auth_source in values:
return values[auth_source]

# Rule 3: Use most recent value
return max(values.items(), key=lambda x: x[1].get("timestamp", 0))[1]["value"]

Consistency Model

Eventually Consistent: Twin converges to correct state within seconds

Reconciliation Windows:

  • Real-time: < 1 second for high-priority entities
  • Standard: < 5 seconds for normal entities
  • Batch: < 60 seconds for low-priority entities

Query API

gRPC Service

service TwinService {
// Query entities by selector
rpc QueryEntities(QueryRequest) returns (QueryResponse);

// Get single entity
rpc GetEntity(GetEntityRequest) returns (Entity);

// Mutate entity
rpc MutateEntity(MutationRequest) returns (MutationResponse);

// Stream live updates
rpc StreamEntity(StreamRequest) returns (stream Entity);

// Query relationships
rpc QueryRelationships(RelationshipQuery) returns (RelationshipResponse);

// Time-travel query
rpc GetEntityAtTime(EntityTimeQuery) returns (Entity);
}

Query Examples

Get all HVAC zones:

request = QueryRequest(
selector="type:HVAC_Zone",
site_id="building_a"
)

response = await twin_client.QueryEntities(request)
zones = response.entities

Get zone with relationships:

MATCH (z:Zone {entity_id: $entity_id})
OPTIONAL MATCH (z)<-[:PART_OF]-(p:Point)
OPTIONAL MATCH (z)<-[:FEEDS]-(e:Equipment)
RETURN z, collect(p) as points, collect(e) as equipment
request = GetEntityRequest(entity_id="hvac.zone1", include_relationships=True)
entity = await twin_client.GetEntity(request)

print(f"Zone: {entity.entity_id}")
print(f"Points: {len(entity.relationships)}")

Time-travel query (state at specific time):

request = EntityTimeQuery(
entity_id="door.lobby.main",
timestamp=Timestamp(seconds=int(incident_time.timestamp()))
)

# Get door state at time of incident
historical_entity = await twin_client.GetEntityAtTime(request)
door_state = historical_entity.attributes["status"] # "locked" or "unlocked"

Derived KPIs

Materialized Views

Compute aggregate metrics periodically:

// Energy efficiency score per zone
MATCH (z:Zone)<-[:PART_OF]-(m:Point {metric: "energy.consumption"})
WITH z, avg(m.current_value) as avg_consumption
MATCH (z)<-[:PART_OF]-(t:Point {metric: "temp.zone"})
WITH z, avg_consumption, avg(abs(t.current_value - t.setpoint_value)) as temp_deviation
SET z.efficiency_score = 100 - (temp_deviation * 5 + avg_consumption * 0.1)
RETURN z.entity_id, z.efficiency_score

Real-Time KPIs

Compute on query:

async def calculate_comfort_score(zone_id: str) -> float:
"""Calculate comfort score (0-100) for zone."""

# Get zone state
entity = await twin_client.GetEntity(GetEntityRequest(entity_id=zone_id))

temp = float(entity.attributes["current_temp"])
setpoint = float(entity.attributes["target_setpoint"])
humidity = float(entity.attributes.get("humidity", 50))

# Temperature deviation penalty
temp_penalty = abs(temp - setpoint) * 5

# Humidity penalty (ideal 40-60%)
humidity_penalty = max(0, abs(humidity - 50) - 10)

# Comfort score
score = 100 - temp_penalty - humidity_penalty

return max(0, min(100, score))

Simulation Integration

EnergyPlus Integration

class EnergyPlusSim:
"""Simulate HVAC scenarios with EnergyPlus."""

async def simulate_setpoint_change(
self,
zone_id: str,
target_temp: float,
duration_hours: int
) -> SimResult:
"""Simulate energy impact of setpoint change."""

# Get zone from twin
zone = await twin_client.GetEntity(GetEntityRequest(entity_id=zone_id))

# Build EnergyPlus model from twin
idf_model = self._build_idf_from_twin(zone)

# Run simulation
result = await energyplus.run_simulation(
idf_model,
setpoint=target_temp,
duration=duration_hours
)

return SimResult(
energy_kwh=result.total_energy,
cost_usd=result.total_energy * current_tariff,
comfort_score=result.comfort_metrics.avg_pmv
)

See Also