Skip to main content

Cloud Integration

The CitadelMesh cloud control plane provides centralized orchestration, analytics, long-term storage, and cross-site coordination while edge nodes maintain autonomous operation. This document describes the cloud architecture and edge-to-cloud integration patterns.

Cloud Architecture

graph TB
subgraph "Edge Sites"
Edge1[Building A Edge]
Edge2[Building B Edge]
Edge3[Building C Edge]
end

subgraph "Cloud Control Plane"
subgraph "Orchestration"
Orchestrator[Agent Orchestrator]
GitOps[ArgoCD/Flux]
Registry[Container Registry]
end

subgraph ".NET Services (Aspire)"
Scheduler[Scheduler Service]
Alarms[Alarm Service]
Sessions[Session Manager]
Orleans[Orleans Actors]
end

subgraph "Data Platform"
Kafka[Kafka/Redpanda]
Postgres[PostgreSQL]
TimescaleCloud[Timescale Cloud]
ObjectStore[S3/Blob Storage]
end

subgraph "Knowledge Layer"
TwinCloud[Digital Twin Service]
VectorDB[Vector Database]
RAG[RAG Service]
end

subgraph "Observability"
OTelCloud[OTel Collector]
Prometheus[Prometheus]
Grafana[Grafana]
Tempo[Tempo (Traces)]
end

subgraph "Gateway"
A2A[A2A Gateway]
MCP[MCP Registry]
end
end

Edge1 --> Kafka
Edge2 --> Kafka
Edge3 --> Kafka

Kafka --> TimescaleCloud
Kafka --> TwinCloud
Kafka --> Alarms

Orchestrator --> Edge1
Orchestrator --> Edge2
Orchestrator --> Edge3

OTelCloud --> Prometheus
OTelCloud --> Tempo
Prometheus --> Grafana

Agent Orchestrator

Centralized deployment and lifecycle management for edge agents.

Capabilities:

  • Deploy agent graph versions to edge sites
  • Feature flags and gradual rollouts
  • A/B testing for policies
  • Rollback on failures
  • Health monitoring

Implementation (.NET + Orleans):

// Agent deployment orchestration
public class AgentOrchestrator
{
private readonly IKubernetesClient k8sClient;
private readonly IArgoCD argoCD;

public async Task DeployAgentVersion(
string siteId,
string agentType,
string version,
DeploymentStrategy strategy)
{
// Update ArgoCD application
var app = await argoCD.GetApplication($"citadel-{siteId}");

app.Spec.Source.TargetRevision = version;

if (strategy == DeploymentStrategy.Canary)
{
// Deploy to 10% of pods first
app.Spec.SyncPolicy.Automated.Canary = new CanaryConfig
{
Steps = new[] { 10, 25, 50, 100 },
Interval = TimeSpan.FromMinutes(15)
};
}

await argoCD.UpdateApplication(app);

// Monitor deployment health
await MonitorDeployment(siteId, agentType, version);
}
}

.NET Microservices (Aspire Composition)

Aspire provides cloud-native service composition for .NET services.

Scheduler Service

Manages time-based triggers for agents (occupancy schedules, demand response).

// Aspire service host
var builder = DistributedApplication.CreateBuilder(args);

// Add services
var cache = builder.AddRedis("cache");
var postgres = builder.AddPostgres("postgres");
var nats = builder.AddNats("nats");

builder.AddProject<SchedulerService>("scheduler")
.WithReference(cache)
.WithReference(postgres)
.WithReference(nats);

builder.AddProject<AlarmService>("alarms")
.WithReference(postgres)
.WithReference(nats);

builder.Build().Run();

Orleans Actors for Stateful Workflows

// Long-lived incident tracking actor
public class IncidentActor : Grain, IIncidentActor
{
private IncidentState state;

public async Task ReportIncident(IncidentReport report)
{
state.Severity = report.Severity;
state.Actions = new List<string>();

// Publish to event stream
await PublishIncidentEvent(report);

// Set reminder for follow-up
await RegisterOrUpdateReminder(
"followup",
TimeSpan.FromMinutes(15),
TimeSpan.FromMinutes(15));
}

public async Task ReceiveReminder(string reminderName, TickStatus status)
{
if (state.Status != IncidentStatus.Resolved)
{
// Escalate if not resolved
await EscalateIncident(state);
}
}
}

Data Lake and Analytics

Event Streaming (Kafka/Redpanda)

# Kafka topics for cloud aggregation
citadel.telemetry.all-sites
citadel.incidents.all-sites
citadel.control.all-sites
citadel.audit.all-sites

Long-Term Storage

TimescaleDB (Cloud-hosted):

-- Hypertable for telemetry
CREATE TABLE telemetry_points (
time TIMESTAMPTZ NOT NULL,
site_id TEXT NOT NULL,
entity_id TEXT NOT NULL,
metric TEXT NOT NULL,
value DOUBLE PRECISION,
unit TEXT,
quality TEXT
);

SELECT create_hypertable('telemetry_points', 'time');

-- Retention policy: 2 years
SELECT add_retention_policy('telemetry_points', INTERVAL '2 years');

-- Continuous aggregates for analytics
CREATE MATERIALIZED VIEW telemetry_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
site_id,
entity_id,
metric,
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM telemetry_points
GROUP BY hour, site_id, entity_id, metric;

Object Storage (S3/Azure Blob):

s3://citadel-data-lake/
├── telemetry/
│ └── year=2025/month=09/day=30/
│ └── building_a_telemetry_20250930.parquet
├── video/
│ └── building_a/camera_lobby_01/
│ └── 2025-09-30T15-30-00.mp4
└── audit/
└── year=2025/month=09/
└── audit_logs_20250930.json.gz

Digital Twin (Cloud)

Cloud-hosted digital twin provides global view across sites.

# Digital Twin Service (gRPC)
class TwinService:
def __init__(self, graph_db: Neo4jClient):
self.graph = graph_db

async def QueryEntities(self, request: QueryRequest) -> QueryResponse:
"""Query entities across all sites."""

# Cypher query
query = """
MATCH (e:Entity)
WHERE e.site_id IN $site_ids
AND e.type = $entity_type
RETURN e
"""

results = await self.graph.run(query, {
"site_ids": request.site_ids,
"entity_type": request.entity_type
})

return QueryResponse(entities=[
self._to_entity_proto(r["e"]) for r in results
])

async def GetCrossSiteMetric(self, metric: str) -> Dict[str, float]:
"""Aggregate metric across all buildings."""

query = """
MATCH (e:Entity)-[:HAS_METRIC]->(m:Metric {name: $metric})
RETURN e.site_id as site, AVG(m.value) as avg_value
"""

results = await self.graph.run(query, {"metric": metric})
return {r["site"]: r["avg_value"] for r in results}

Knowledge Services

Vector Database (RAG)

from qdrant_client import QdrantClient

class KnowledgeService:
def __init__(self):
self.vector_db = QdrantClient("qdrant.citadel.cloud")

async def index_building_docs(self, site_id: str, docs: List[str]):
"""Index building documentation for RAG."""

# Generate embeddings
embeddings = await self.embed_documents(docs)

# Store in vector DB
await self.vector_db.upsert(
collection_name=f"building_{site_id}_docs",
points=[
{
"id": idx,
"vector": emb,
"payload": {"text": doc, "site_id": site_id}
}
for idx, (doc, emb) in enumerate(zip(docs, embeddings))
]
)

async def search_knowledge(self, query: str, site_id: str) -> List[str]:
"""Search building knowledge base."""

query_embedding = await self.embed_query(query)

results = await self.vector_db.search(
collection_name=f"building_{site_id}_docs",
query_vector=query_embedding,
limit=5
)

return [r.payload["text"] for r in results]

Edge-to-Cloud Sync

WireGuard Tunnel

# WireGuard config (edge)
[Interface]
PrivateKey = <edge-private-key>
Address = 10.100.1.10/24

[Peer]
PublicKey = <cloud-public-key>
Endpoint = cloud.citadel.io:51820
AllowedIPs = 10.100.0.0/16
PersistentKeepalive = 25

Event Sync Strategy

Real-time Sync (when connected):

  • Telemetry streamed to Kafka every 15 seconds
  • Incidents pushed immediately
  • Command results published

Batch Sync (on reconnect):

async def sync_queued_events():
"""Sync queued NATS events to cloud Kafka."""

# Pull from NATS stream
msgs = await nats_js.fetch(stream="citadel-events", batch=1000)

# Batch publish to Kafka
producer = aiokafka.AIOKafkaProducer()
for msg in msgs:
await producer.send(
topic=f"citadel.{msg.subject}",
value=msg.data
)

await producer.flush()

Observability

Metrics (Prometheus)

# Prometheus scrape config
scrape_configs:
- job_name: 'citadel-agents'
kubernetes_sd_configs:
- role: pod
namespaces:
names: ['citadel-agents']
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: (security-agent|energy-agent)

Traces (Tempo)

OpenTelemetry traces from all agents aggregated in Tempo:

# Trace edge-to-cloud flow
with tracer.start_as_current_span("hvac_setpoint_optimization") as span:
span.set_attribute("site.id", "building_a")
span.set_attribute("zone.id", "zone1")

# Edge calculation
optimal_temp = await calculate_optimal_setpoint(zone_id)

# Query cloud for weather forecast
forecast = await cloud_api.get_weather_forecast(site_id)

# Adjust based on forecast
adjusted_temp = adjust_for_weather(optimal_temp, forecast)

# Write setpoint (traced)
await write_setpoint(zone_id, adjusted_temp)

A2A and MCP Gateways

A2A Gateway (Cross-Organization)

class A2AGateway:
"""Map CloudEvents to A2A protocol for cross-org collaboration."""

async def handle_cloudevent(self, event: CloudEvent):
# Convert to A2A envelope
a2a_message = {
"protocol": "agent-to-agent/v1",
"from": event.source, # spiffe://citadel.mesh/agent/energy
"to": event.subject,
"capability": self._extract_capability(event.type),
"payload": event.data,
"signature": self._sign(event.data)
}

# Route to external organization
await self.send_a2a(a2a_message)

MCP Registry

Central registry for MCP tool servers:

# Register MCP server
POST /mcp/registry/servers
{
"server_id": "citadel-ebo-server",
"server_url": "https://mcp-ebo.citadel.cloud",
"tools": [
"ebo_read_point",
"ebo_write_setpoint"
],
"capabilities": {
"spiffe_ids": ["spiffe://citadel.mesh/agent/energy"],
"rate_limit": "100/minute"
}
}

See Also