Cloud Integration
The CitadelMesh cloud control plane provides centralized orchestration, analytics, long-term storage, and cross-site coordination while edge nodes maintain autonomous operation. This document describes the cloud architecture and edge-to-cloud integration patterns.
Cloud Architecture
graph TB
subgraph "Edge Sites"
Edge1[Building A Edge]
Edge2[Building B Edge]
Edge3[Building C Edge]
end
subgraph "Cloud Control Plane"
subgraph "Orchestration"
Orchestrator[Agent Orchestrator]
GitOps[ArgoCD/Flux]
Registry[Container Registry]
end
subgraph ".NET Services (Aspire)"
Scheduler[Scheduler Service]
Alarms[Alarm Service]
Sessions[Session Manager]
Orleans[Orleans Actors]
end
subgraph "Data Platform"
Kafka[Kafka/Redpanda]
Postgres[PostgreSQL]
TimescaleCloud[Timescale Cloud]
ObjectStore[S3/Blob Storage]
end
subgraph "Knowledge Layer"
TwinCloud[Digital Twin Service]
VectorDB[Vector Database]
RAG[RAG Service]
end
subgraph "Observability"
OTelCloud[OTel Collector]
Prometheus[Prometheus]
Grafana[Grafana]
Tempo[Tempo (Traces)]
end
subgraph "Gateway"
A2A[A2A Gateway]
MCP[MCP Registry]
end
end
Edge1 --> Kafka
Edge2 --> Kafka
Edge3 --> Kafka
Kafka --> TimescaleCloud
Kafka --> TwinCloud
Kafka --> Alarms
Orchestrator --> Edge1
Orchestrator --> Edge2
Orchestrator --> Edge3
OTelCloud --> Prometheus
OTelCloud --> Tempo
Prometheus --> Grafana
Agent Orchestrator
Centralized deployment and lifecycle management for edge agents.
Capabilities:
- Deploy agent graph versions to edge sites
- Feature flags and gradual rollouts
- A/B testing for policies
- Rollback on failures
- Health monitoring
Implementation (.NET + Orleans):
// Agent deployment orchestration
public class AgentOrchestrator
{
private readonly IKubernetesClient k8sClient;
private readonly IArgoCD argoCD;
public async Task DeployAgentVersion(
string siteId,
string agentType,
string version,
DeploymentStrategy strategy)
{
// Update ArgoCD application
var app = await argoCD.GetApplication($"citadel-{siteId}");
app.Spec.Source.TargetRevision = version;
if (strategy == DeploymentStrategy.Canary)
{
// Deploy to 10% of pods first
app.Spec.SyncPolicy.Automated.Canary = new CanaryConfig
{
Steps = new[] { 10, 25, 50, 100 },
Interval = TimeSpan.FromMinutes(15)
};
}
await argoCD.UpdateApplication(app);
// Monitor deployment health
await MonitorDeployment(siteId, agentType, version);
}
}
.NET Microservices (Aspire Composition)
Aspire provides cloud-native service composition for .NET services.
Scheduler Service
Manages time-based triggers for agents (occupancy schedules, demand response).
// Aspire service host
var builder = DistributedApplication.CreateBuilder(args);
// Add services
var cache = builder.AddRedis("cache");
var postgres = builder.AddPostgres("postgres");
var nats = builder.AddNats("nats");
builder.AddProject<SchedulerService>("scheduler")
.WithReference(cache)
.WithReference(postgres)
.WithReference(nats);
builder.AddProject<AlarmService>("alarms")
.WithReference(postgres)
.WithReference(nats);
builder.Build().Run();
Orleans Actors for Stateful Workflows
// Long-lived incident tracking actor
public class IncidentActor : Grain, IIncidentActor
{
private IncidentState state;
public async Task ReportIncident(IncidentReport report)
{
state.Severity = report.Severity;
state.Actions = new List<string>();
// Publish to event stream
await PublishIncidentEvent(report);
// Set reminder for follow-up
await RegisterOrUpdateReminder(
"followup",
TimeSpan.FromMinutes(15),
TimeSpan.FromMinutes(15));
}
public async Task ReceiveReminder(string reminderName, TickStatus status)
{
if (state.Status != IncidentStatus.Resolved)
{
// Escalate if not resolved
await EscalateIncident(state);
}
}
}
Data Lake and Analytics
Event Streaming (Kafka/Redpanda)
# Kafka topics for cloud aggregation
citadel.telemetry.all-sites
citadel.incidents.all-sites
citadel.control.all-sites
citadel.audit.all-sites
Long-Term Storage
TimescaleDB (Cloud-hosted):
-- Hypertable for telemetry
CREATE TABLE telemetry_points (
time TIMESTAMPTZ NOT NULL,
site_id TEXT NOT NULL,
entity_id TEXT NOT NULL,
metric TEXT NOT NULL,
value DOUBLE PRECISION,
unit TEXT,
quality TEXT
);
SELECT create_hypertable('telemetry_points', 'time');
-- Retention policy: 2 years
SELECT add_retention_policy('telemetry_points', INTERVAL '2 years');
-- Continuous aggregates for analytics
CREATE MATERIALIZED VIEW telemetry_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
site_id,
entity_id,
metric,
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM telemetry_points
GROUP BY hour, site_id, entity_id, metric;
Object Storage (S3/Azure Blob):
s3://citadel-data-lake/
├── telemetry/
│ └── year=2025/month=09/day=30/
│ └── building_a_telemetry_20250930.parquet
├── video/
│ └── building_a/camera_lobby_01/
│ └── 2025-09-30T15-30-00.mp4
└── audit/
└── year=2025/month=09/
└── audit_logs_20250930.json.gz
Digital Twin (Cloud)
Cloud-hosted digital twin provides global view across sites.
# Digital Twin Service (gRPC)
class TwinService:
def __init__(self, graph_db: Neo4jClient):
self.graph = graph_db
async def QueryEntities(self, request: QueryRequest) -> QueryResponse:
"""Query entities across all sites."""
# Cypher query
query = """
MATCH (e:Entity)
WHERE e.site_id IN $site_ids
AND e.type = $entity_type
RETURN e
"""
results = await self.graph.run(query, {
"site_ids": request.site_ids,
"entity_type": request.entity_type
})
return QueryResponse(entities=[
self._to_entity_proto(r["e"]) for r in results
])
async def GetCrossSiteMetric(self, metric: str) -> Dict[str, float]:
"""Aggregate metric across all buildings."""
query = """
MATCH (e:Entity)-[:HAS_METRIC]->(m:Metric {name: $metric})
RETURN e.site_id as site, AVG(m.value) as avg_value
"""
results = await self.graph.run(query, {"metric": metric})
return {r["site"]: r["avg_value"] for r in results}
Knowledge Services
Vector Database (RAG)
from qdrant_client import QdrantClient
class KnowledgeService:
def __init__(self):
self.vector_db = QdrantClient("qdrant.citadel.cloud")
async def index_building_docs(self, site_id: str, docs: List[str]):
"""Index building documentation for RAG."""
# Generate embeddings
embeddings = await self.embed_documents(docs)
# Store in vector DB
await self.vector_db.upsert(
collection_name=f"building_{site_id}_docs",
points=[
{
"id": idx,
"vector": emb,
"payload": {"text": doc, "site_id": site_id}
}
for idx, (doc, emb) in enumerate(zip(docs, embeddings))
]
)
async def search_knowledge(self, query: str, site_id: str) -> List[str]:
"""Search building knowledge base."""
query_embedding = await self.embed_query(query)
results = await self.vector_db.search(
collection_name=f"building_{site_id}_docs",
query_vector=query_embedding,
limit=5
)
return [r.payload["text"] for r in results]
Edge-to-Cloud Sync
WireGuard Tunnel
# WireGuard config (edge)
[Interface]
PrivateKey = <edge-private-key>
Address = 10.100.1.10/24
[Peer]
PublicKey = <cloud-public-key>
Endpoint = cloud.citadel.io:51820
AllowedIPs = 10.100.0.0/16
PersistentKeepalive = 25
Event Sync Strategy
Real-time Sync (when connected):
- Telemetry streamed to Kafka every 15 seconds
- Incidents pushed immediately
- Command results published
Batch Sync (on reconnect):
async def sync_queued_events():
"""Sync queued NATS events to cloud Kafka."""
# Pull from NATS stream
msgs = await nats_js.fetch(stream="citadel-events", batch=1000)
# Batch publish to Kafka
producer = aiokafka.AIOKafkaProducer()
for msg in msgs:
await producer.send(
topic=f"citadel.{msg.subject}",
value=msg.data
)
await producer.flush()
Observability
Metrics (Prometheus)
# Prometheus scrape config
scrape_configs:
- job_name: 'citadel-agents'
kubernetes_sd_configs:
- role: pod
namespaces:
names: ['citadel-agents']
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: (security-agent|energy-agent)
Traces (Tempo)
OpenTelemetry traces from all agents aggregated in Tempo:
# Trace edge-to-cloud flow
with tracer.start_as_current_span("hvac_setpoint_optimization") as span:
span.set_attribute("site.id", "building_a")
span.set_attribute("zone.id", "zone1")
# Edge calculation
optimal_temp = await calculate_optimal_setpoint(zone_id)
# Query cloud for weather forecast
forecast = await cloud_api.get_weather_forecast(site_id)
# Adjust based on forecast
adjusted_temp = adjust_for_weather(optimal_temp, forecast)
# Write setpoint (traced)
await write_setpoint(zone_id, adjusted_temp)
A2A and MCP Gateways
A2A Gateway (Cross-Organization)
class A2AGateway:
"""Map CloudEvents to A2A protocol for cross-org collaboration."""
async def handle_cloudevent(self, event: CloudEvent):
# Convert to A2A envelope
a2a_message = {
"protocol": "agent-to-agent/v1",
"from": event.source, # spiffe://citadel.mesh/agent/energy
"to": event.subject,
"capability": self._extract_capability(event.type),
"payload": event.data,
"signature": self._sign(event.data)
}
# Route to external organization
await self.send_a2a(a2a_message)
MCP Registry
Central registry for MCP tool servers:
# Register MCP server
POST /mcp/registry/servers
{
"server_id": "citadel-ebo-server",
"server_url": "https://mcp-ebo.citadel.cloud",
"tools": [
"ebo_read_point",
"ebo_write_setpoint"
],
"capabilities": {
"spiffe_ids": ["spiffe://citadel.mesh/agent/energy"],
"rate_limit": "100/minute"
}
}
Related Documentation
- Edge Architecture - Edge deployment details
- Observability - Cloud monitoring stack
- Protocol Strategy - Edge-cloud protocol design