Security Agent Architecture
Status: 🚧 In Progress
Phase: Phase 1 - Foundation
Completion: Designing → Implementation
Executive Summary
The Security Agent is CitadelMesh's first autonomous intelligent agent. It demonstrates the complete integration of our foundational infrastructure: SPIFFE identity, OPA policies, NATS event mesh, Protobuf protocols, and LangGraph state machines.
Mission: Provide autonomous building security orchestration with human-level intelligence and machine-level precision, all while maintaining strict safety guarantees.
Design Principles
1. Safety-First Architecture
Every action flows through OPA policy validation:
Event → Analyze → Decide → OPA Policy Check → Execute
↓
Allow/Deny Decision
Critical Safety Rules:
- ✅ No door unlock without policy approval
- ✅ Maximum unlock duration: 300 seconds
- ✅ Rate limiting: Max 10 unlocks/hour
- ✅ Emergency override requires human approval
- ✅ All actions logged with SPIFFE identity
2. Deterministic State Machine
LangGraph ensures predictable behavior:
- Explicit state transitions - No hidden logic
- Full observability - Every decision traced
- Replay capability - Reproduce any scenario
- Test coverage - 100% state machine paths tested
3. Zero-Trust Identity
SPIFFE/SPIRE integration:
- Agent authenticates with X.509-SVID from SPIRE
- All MCP tool calls include SPIFFE identity
- Vendor systems verify agent identity
- Audit logs contain cryptographic proof
4. Event-Driven Architecture
CloudEvents + Protobuf + NATS:
- Subscribe to security events from vendors
- Publish decisions for audit trail
- Asynchronous, non-blocking processing
- Graceful degradation if event bus unavailable
System Architecture
High-Level Data Flow
graph TB
subgraph "Vendor Systems"
SSE[Schneider Security Expert]
ACC[Avigilon Control Center]
end
subgraph "Event Mesh"
NATS[NATS JetStream]
end
subgraph "Security Agent"
Monitor[Monitor State]
Analyze[Analyze State]
Decide[Decide State]
Respond[Respond State]
Escalate[Escalate State]
end
subgraph "Safety Layer"
OPA[OPA Policy Engine]
SPIRE[SPIRE Agent]
end
subgraph "Observability"
Jaeger[Jaeger Traces]
Grafana[Grafana Metrics]
end
SSE -->|Door Events| NATS
ACC -->|Camera Analytics| NATS
NATS -->|CloudEvent| Monitor
Monitor --> Analyze
Analyze --> Decide
Decide -->|Check Policy| OPA
OPA -->|Allow/Deny| Decide
Decide --> Respond
Respond -->|Execute| SSE
Respond -->|Execute| ACC
Decide -->|High Threat| Escalate
SPIRE -->|X.509-SVID| Monitor
Monitor -->|Traces| Jaeger
Respond -->|Metrics| Grafana
Component Interactions
# Pseudo-code showing integration points
class SecurityAgent(BaseAgent):
def __init__(self, config: AgentConfig):
super().__init__(config)
# 1. SPIFFE Identity
self.spiffe_client = WorkloadAPIClient()
self.svid = self.spiffe_client.fetch_x509_svid()
# 2. OPA Safety Client
self.safety_client = SafetyClient(
opa_url="http://localhost:8181"
)
# 3. Event Bus (NATS + CloudEvents)
self.event_bus = EventBus(
nats_url="nats://localhost:4222"
)
# 4. Telemetry (OpenTelemetry)
self.telemetry = TelemetryCollector(
service_name="security-agent"
)
async def process_security_event(self, event: CloudEventMessage):
# Trace the entire workflow
with self.telemetry.trace_span("process_security_event") as span:
# 1. Analyze threat
threat = await self._analyze_threat(event)
# 2. Determine response
response = await self._determine_response(threat)
# 3. Check safety policy
allowed = await self.safety_client.evaluate_policy(
action=response.action,
params=response.params,
agent_spiffe_id=self.svid.spiffe_id
)
if not allowed:
span.set_attribute("policy_denied", True)
await self._escalate_to_human(threat, response)
return
# 4. Execute response
result = await self._execute_response(response)
# 5. Publish audit event
await self.event_bus.publish_audit(
action=response.action,
result=result,
spiffe_id=self.svid.spiffe_id
)
State Machine Design
5-State Workflow
┌──────────┐
│ MONITOR │ ← Entry Point
└────┬─────┘
│ Security Event Received
▼
┌──────────┐
│ ANALYZE │ ← Assess Threat Level
└────┬─────┘
│ Threat Level: low/medium/high/critical
▼
┌──────────┐
│ DECIDE │ ← Determine Response Action
└────┬─────┘
│
├─ Low Threat ──────────► MONITOR (loop back)
│
├─ Medium Threat ────────► RESPOND
│ │
│ ▼
│ ┌──────────┐
│ │ RESPOND │
│ └────┬─────┘
│ │
│ ├─ OPA Policy Check
│ │
│ ├─ Allowed ──► Execute Action ──► MONITOR
│ │
│ └─ Denied ───► ESCALATE
│
└─ High/Critical Threat ──► ESCALATE
│
▼
┌──────────┐
│ ESCALATE │ ← Human-in-the-Loop
└────┬─────┘
│
└──► MONITOR (after human decision)
State Definitions
State 1: MONITOR
Purpose: Listen for security events from vendor systems
Inputs: CloudEvent messages from NATS topics
citadel.security.door.access_attemptcitadel.security.camera.person_detectedcitadel.security.intrusion.alarm
Processing:
- Deserialize CloudEvent from NATS
- Validate Protobuf payload
- Enrich event with context (time, location, zone data)
- Add to agent memory
Outputs: Enriched SecurityState with events list
Transitions:
- If events > 0: → ANALYZE
- If no events: → MONITOR (loop)
Implementation:
async def _monitor_events(self, state: SecurityState) -> SecurityState:
"""Monitor incoming security events"""
# Subscribe to security event topics
events = []
async for cloud_event in self.event_bus.consume("citadel.security.*"):
# Deserialize Protobuf payload
if cloud_event.type == "citadel.security.door.access_attempt":
event_data = events_pb2.DoorAccessAttempt()
event_data.ParseFromString(cloud_event.data)
events.append(SecurityEvent(
event_id=cloud_event.id,
event_type="door_access",
source_system="schneider",
location=event_data.door_id,
timestamp=datetime.fromisoformat(cloud_event.time),
data=event_data,
confidence=1.0
))
return SecurityState(
events=events,
threat_level=ThreatLevel.LOW,
response_plan=[],
context={},
messages=[],
next_action="analyze" if events else "monitor"
)
State 2: ANALYZE
Purpose: Assess threat level based on events
Inputs: SecurityState with events list
Processing:
-
Threat Scoring Algorithm:
threat_score = 0.0
# Factor 1: Event type severity
severity_weights = {
"unauthorized_access": 0.8,
"forced_entry": 1.0,
"loitering": 0.3,
"tailgating": 0.6,
"person_of_interest": 0.9
}
threat_score += severity_weights.get(event.type, 0.5)
# Factor 2: Time context
if is_after_hours():
threat_score += 0.2
# Factor 3: Location sensitivity
if event.location in CRITICAL_ZONES:
threat_score += 0.3
# Factor 4: Historical pattern
if recent_incidents_in_zone(event.location) > 3:
threat_score += 0.2
# Factor 5: Confidence level
threat_score *= event.confidence -
Classify Threat Level:
score < 0.3: LOW0.3 <= score < 0.6: MEDIUM0.6 <= score < 0.8: HIGHscore >= 0.8: CRITICAL
-
Incident Type Classification:
- Match event patterns to known incident types
- Use ML model for unknown patterns (future)
Outputs: Updated SecurityState with threat_level and incident_type
Transitions:
- Always → DECIDE
Implementation:
async def _analyze_threat(self, state: SecurityState) -> SecurityState:
"""Analyze threat level from security events"""
with self.telemetry.trace_span("analyze_threat") as span:
threat_score = 0.0
for event in state.events:
# Calculate threat score
score = self._calculate_event_threat_score(event)
threat_score = max(threat_score, score)
# Classify threat level
if threat_score < 0.3:
threat_level = ThreatLevel.LOW
elif threat_score < 0.6:
threat_level = ThreatLevel.MEDIUM
elif threat_score < 0.8:
threat_level = ThreatLevel.HIGH
else:
threat_level = ThreatLevel.CRITICAL
span.set_attribute("threat_score", threat_score)
span.set_attribute("threat_level", threat_level.value)
state.threat_level = threat_level
state.context["threat_score"] = threat_score
state.next_action = "decide"
self.logger.info(
"Threat analyzed",
threat_level=threat_level.value,
threat_score=threat_score,
event_count=len(state.events)
)
return state
State 3: DECIDE
Purpose: Determine response action based on threat level
Inputs: SecurityState with threat_level
Processing:
Decision Matrix:
| Threat Level | Incident Type | Response Action | Requires Approval |
|---|---|---|---|
| LOW | Any | MONITOR | No |
| MEDIUM | Unauthorized Access | LOCK_DOORS | No |
| MEDIUM | Loitering | TRACK_PERSON | No |
| HIGH | Forced Entry | LOCK_DOORS + ALERT_SECURITY | No |
| HIGH | Person of Interest | TRACK_PERSON + ALERT_SECURITY | No |
| CRITICAL | Any | ESCALATE_HUMAN | Yes |
Outputs: SecurityState with response_plan list
Transitions:
- If
threat_level == LOW: → MONITOR - If
threat_level == MEDIUMandresponse_requires_approval == False: → RESPOND - If
threat_level == HIGHandresponse_requires_approval == False: → RESPOND - If
response_requires_approval == True: → ESCALATE - If
threat_level == CRITICAL: → ESCALATE
Implementation:
async def _determine_response(self, state: SecurityState) -> SecurityState:
"""Determine response actions based on threat level"""
response_plan = []
requires_approval = False
if state.threat_level == ThreatLevel.LOW:
response_plan = [ResponseAction.MONITOR]
state.next_action = "monitor"
elif state.threat_level == ThreatLevel.MEDIUM:
if state.incident_type == IncidentType.UNAUTHORIZED_ACCESS:
response_plan = [ResponseAction.LOCK_DOORS]
elif state.incident_type == IncidentType.LOITERING:
response_plan = [ResponseAction.TRACK_PERSON]
else:
response_plan = [ResponseAction.MONITOR]
state.next_action = "respond"
elif state.threat_level == ThreatLevel.HIGH:
response_plan = [
ResponseAction.LOCK_DOORS,
ResponseAction.ALERT_SECURITY,
ResponseAction.TRACK_PERSON
]
state.next_action = "respond"
elif state.threat_level == ThreatLevel.CRITICAL:
response_plan = [ResponseAction.ESCALATE_HUMAN]
requires_approval = True
state.next_action = "escalate"
state.response_plan = response_plan
state.context["requires_approval"] = requires_approval
self.logger.info(
"Response determined",
threat_level=state.threat_level.value,
response_plan=[a.value for a in response_plan],
requires_approval=requires_approval
)
return state
State 4: RESPOND
Purpose: Execute response actions with safety checks
Inputs: SecurityState with response_plan
Processing:
For each action in response_plan:
-
Check OPA Policy:
policy_input = {
"action": action.value,
"agent_spiffe_id": self.svid.spiffe_id,
"door_id": door_id,
"duration_seconds": 300,
"priority": "PRIORITY_HIGH",
"threat_level": state.threat_level.value
}
result = await self.safety_client.evaluate_policy(
path="citadel/security/allow_door_unlock",
input_data=policy_input
)
if not result["allow"]:
self.logger.warning("Policy denied action", reason=result.get("reason"))
state.next_action = "escalate"
return state -
Execute MCP Tool Call:
if action == ResponseAction.LOCK_DOORS:
for door_id in affected_doors:
result = await self.mcp_client.execute_door_action(
action="lock",
door_id=door_id
)
elif action == ResponseAction.TRACK_PERSON:
result = await self.mcp_client.execute_camera_action(
action="track_person",
person_id=person_id
) -
Publish Audit Event:
audit_event = events_pb2.AuditEvent(
action=action.value,
result=result,
agent_spiffe_id=self.svid.spiffe_id,
timestamp=datetime.utcnow().isoformat()
)
await self.event_bus.publish("citadel.audit", audit_event)
Outputs: Updated SecurityState with execution results
Transitions:
- If all actions executed successfully: → MONITOR
- If policy denied any action: → ESCALATE
- If execution error: → ESCALATE
Implementation:
async def _execute_response(self, state: SecurityState) -> SecurityState:
"""Execute response actions with safety checks"""
with self.telemetry.trace_span("execute_response") as span:
span.set_attribute("action_count", len(state.response_plan))
for action in state.response_plan:
# Skip monitor-only actions
if action == ResponseAction.MONITOR:
continue
# Check OPA policy
policy_input = self._build_policy_input(state, action)
policy_result = await self.safety_client.evaluate_policy(
path="citadel/security/allow_action",
input_data=policy_input
)
if not policy_result["allow"]:
self.logger.warning(
"Policy denied action",
action=action.value,
reason=policy_result.get("reason")
)
span.set_attribute("policy_denied", True)
state.next_action = "escalate"
return state
# Execute action via MCP
try:
result = await self._execute_mcp_action(action, state)
# Log success
self.logger.info(
"Action executed",
action=action.value,
result=result
)
# Publish audit event
await self._publish_audit_event(state, action, result)
except Exception as e:
self.logger.error(
"Action execution failed",
action=action.value,
error=str(e)
)
span.set_attribute("execution_error", str(e))
state.next_action = "escalate"
return state
state.next_action = "monitor"
return state
State 5: ESCALATE
Purpose: Human-in-the-loop for critical decisions
Inputs: SecurityState with context explaining escalation reason
Processing:
-
Create Incident Report:
incident = {
"id": f"INC-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}",
"threat_level": state.threat_level.value,
"events": [asdict(e) for e in state.events],
"proposed_response": [a.value for a in state.response_plan],
"escalation_reason": state.context.get("escalation_reason"),
"timestamp": datetime.utcnow().isoformat()
} -
Notify Human Operators:
- Send to security operations center (SOC)
- Page on-call security team
- Display on living building interface
-
Wait for Human Decision:
# Poll for human response
decision = await self.wait_for_human_decision(
incident_id=incident["id"],
timeout_seconds=300
)
if decision == "approve":
# Execute proposed response
state.next_action = "respond"
elif decision == "deny":
# Log denial and monitor
state.next_action = "monitor"
elif decision == "timeout":
# Default to safe action (monitor only)
state.next_action = "monitor" -
Log Human Decision:
await self.event_bus.publish_audit({
"incident_id": incident["id"],
"human_decision": decision,
"operator_id": operator_id,
"timestamp": datetime.utcnow().isoformat()
})
Outputs: SecurityState with human decision
Transitions:
- If approved: → RESPOND
- If denied or timeout: → MONITOR
Implementation:
async def _escalate_to_human(self, state: SecurityState) -> SecurityState:
"""Escalate to human operator for decision"""
with self.telemetry.trace_span("escalate_to_human") as span:
# Create incident report
incident_id = f"INC-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
incident_report = {
"id": incident_id,
"threat_level": state.threat_level.value,
"incident_type": state.incident_type.value if state.incident_type else "unknown",
"events_summary": self._summarize_events(state.events),
"proposed_response": [a.value for a in state.response_plan],
"escalation_reason": state.context.get("escalation_reason", "High threat level"),
"requires_decision_by": (datetime.utcnow() + timedelta(minutes=5)).isoformat()
}
span.set_attribute("incident_id", incident_id)
# Publish escalation event
escalation_event = events_pb2.IncidentEscalation(
incident_id=incident_id,
threat_level=state.threat_level.value,
requires_approval=True
)
await self.event_bus.publish("citadel.security.escalation", escalation_event)
self.logger.warning(
"Incident escalated to human",
incident_id=incident_id,
threat_level=state.threat_level.value
)
# Wait for human decision (with timeout)
decision = await self._wait_for_human_decision(
incident_id=incident_id,
timeout_seconds=300
)
span.set_attribute("human_decision", decision)
if decision == "approve":
state.next_action = "respond"
else:
# Default to safe action (monitor only)
state.response_plan = [ResponseAction.MONITOR]
state.next_action = "monitor"
return state
async def _wait_for_human_decision(
self,
incident_id: str,
timeout_seconds: int
) -> str:
"""Wait for human decision on escalated incident"""
# Subscribe to decision topic
decision_topic = f"citadel.security.decision.{incident_id}"
try:
decision_event = await asyncio.wait_for(
self.event_bus.receive_one(decision_topic),
timeout=timeout_seconds
)
# Parse decision
return decision_event.data.get("decision", "deny")
except asyncio.TimeoutError:
self.logger.warning(
"Human decision timeout",
incident_id=incident_id
)
return "timeout"
Integration Points
SPIFFE/SPIRE Identity
Workload Registration:
# Register security agent with SPIRE
spire-server entry create \
-parentID spiffe://citadel.mesh/agent/node1 \
-spiffeID spiffe://citadel.mesh/security-agent \
-selector unix:uid:1000 \
-dns security-agent \
-ttl 3600
SVID Usage in Agent:
from spiffe import WorkloadApiClient
class SecurityAgent(BaseAgent):
async def start(self):
# Fetch X.509-SVID from SPIRE agent
spiffe_client = WorkloadApiClient('/run/spire/sockets/agent.sock')
self.svid = spiffe_client.fetch_x509_svid()
self.logger.info(
"SPIFFE identity acquired",
spiffe_id=self.svid.spiffe_id.to_string(),
serial_number=self.svid.cert.serial_number
)
# Use SVID for mTLS connections
await super().start()
OPA Policy Integration
Policy File: policies/security.rego
package citadel.security
import rego.v1
# Default deny for safety
default allow := false
# Door unlock policy
allow if {
input.action == "lock_doors"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
input.threat_level in ["medium", "high", "critical"]
}
allow if {
input.action == "unlock_doors"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
input.priority == "PRIORITY_EMERGENCY"
input.duration_seconds <= 300
}
allow if {
input.action == "track_person"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
}
allow if {
input.action == "alert_security"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
}
# Rate limiting
deny if {
count(recent_actions) > 10
}
recent_actions contains action if {
action := data.recent_actions[_]
action.timestamp > (time.now_ns() - 3600000000000) # Last hour
}
# Violation reasons
violation_reason := "Rate limit exceeded: more than 10 actions in last hour" if {
deny
count(recent_actions) > 10
}
Policy Check in Agent:
async def _check_policy(self, action: ResponseAction, state: SecurityState) -> bool:
"""Check OPA policy before executing action"""
policy_input = {
"action": action.value,
"agent_spiffe_id": self.svid.spiffe_id.to_string(),
"threat_level": state.threat_level.value,
"timestamp": datetime.utcnow().isoformat()
}
if action == ResponseAction.LOCK_DOORS:
policy_input["door_ids"] = self._get_affected_doors(state)
result = await self.safety_client.evaluate_policy(
path="citadel/security/allow",
input_data=policy_input
)
if not result["allow"]:
self.logger.warning(
"Policy denied action",
action=action.value,
reason=result.get("violation_reason")
)
# Emit policy denial event
await self.telemetry.increment_counter(
"policy_denials",
tags={"action": action.value}
)
return result["allow"]
NATS Event Consumption
CloudEvent Subscription:
async def _start_event_loop(self):
"""Start consuming security events from NATS"""
# Subscribe to all security event topics
topics = [
"citadel.security.door.*",
"citadel.security.camera.*",
"citadel.security.intrusion.*"
]
async with self.event_bus.subscribe(topics) as subscription:
async for cloud_event in subscription:
# Process event through state machine
await self._process_cloud_event(cloud_event)
async def _process_cloud_event(self, cloud_event: CloudEventMessage):
"""Process incoming CloudEvent through state machine"""
with self.telemetry.trace_span("process_cloud_event") as span:
span.set_attribute("event_id", cloud_event.id)
span.set_attribute("event_type", cloud_event.type)
# Deserialize Protobuf payload
event_data = self._deserialize_event(cloud_event)
# Create SecurityEvent
security_event = SecurityEvent(
event_id=cloud_event.id,
event_type=cloud_event.type,
source_system=cloud_event.source,
location=event_data.location,
timestamp=datetime.fromisoformat(cloud_event.time),
data=event_data,
confidence=event_data.confidence if hasattr(event_data, 'confidence') else 1.0
)
# Run through state machine
initial_state = SecurityState(
events=[security_event],
threat_level=ThreatLevel.LOW,
response_plan=[],
context={},
messages=[]
)
final_state = await self.graph.ainvoke(initial_state)
span.set_attribute("threat_level", final_state.threat_level.value)
span.set_attribute("actions_executed", len(final_state.response_plan))
MCP Tool Invocation
Schneider Security Expert Adapter:
async def _execute_door_control(self, action: ResponseAction, door_ids: List[str]) -> Dict:
"""Execute door control via Schneider MCP adapter"""
results = []
for door_id in door_ids:
if action == ResponseAction.LOCK_DOORS:
result = await self.mcp_clients['schneider'].call_tool(
tool_name="lock_door",
arguments={
"door_id": door_id,
"force": False
}
)
elif action == ResponseAction.UNLOCK_EMERGENCY:
result = await self.mcp_clients['schneider'].call_tool(
tool_name="unlock_door",
arguments={
"door_id": door_id,
"duration_seconds": 300,
"priority": "PRIORITY_EMERGENCY"
}
)
results.append(result)
return {"door_actions": results}
Avigilon Camera Analytics:
async def _track_person(self, person_id: str) -> Dict:
"""Track person movement via Avigilon MCP adapter"""
result = await self.mcp_clients['avigilon'].call_tool(
tool_name="track_person_movement",
arguments={
"person_id": person_id,
"duration_seconds": 600,
"zones": ["entrance", "lobby", "corridor_a"]
}
)
return result
Testing Strategy
Unit Tests
State Machine Logic:
@pytest.mark.asyncio
async def test_low_threat_monitoring():
"""Test that low threats continue monitoring"""
agent = SecurityAgent(config)
state = SecurityState(
events=[create_loitering_event()],
threat_level=ThreatLevel.LOW,
response_plan=[],
context={},
messages=[]
)
result = await agent._determine_response(state)
assert result.response_plan == [ResponseAction.MONITOR]
assert result.next_action == "monitor"
@pytest.mark.asyncio
async def test_high_threat_escalation():
"""Test that high threats escalate to human"""
agent = SecurityAgent(config)
state = SecurityState(
events=[create_forced_entry_event()],
threat_level=ThreatLevel.HIGH,
response_plan=[],
context={},
messages=[]
)
result = await agent._determine_response(state)
assert ResponseAction.LOCK_DOORS in result.response_plan
assert ResponseAction.ALERT_SECURITY in result.response_plan
assert result.next_action == "respond"
OPA Policy Tests:
package citadel.security
test_allow_lock_doors_medium_threat if {
allow with input as {
"action": "lock_doors",
"agent_spiffe_id": "spiffe://citadel.mesh/security-agent",
"threat_level": "medium"
}
}
test_deny_unlock_without_emergency if {
not allow with input as {
"action": "unlock_doors",
"priority": "PRIORITY_NORMAL",
"duration_seconds": 600
}
}
Integration Tests
End-to-End Scenario:
@pytest.mark.integration
@pytest.mark.asyncio
async def test_unauthorized_access_scenario():
"""Test complete unauthorized access incident response"""
# 1. Setup infrastructure
opa_server = await start_opa_server()
nats_server = await start_nats_server()
spire_agent = await start_spire_agent()
# 2. Start security agent
agent = SecurityAgent(config)
await agent.start()
# 3. Simulate unauthorized access event
event = create_cloud_event(
type="citadel.security.door.access_denied",
data=create_door_access_event(
door_id="DOOR_DATACENTER_01",
access_denied=True,
badge_id="UNKNOWN"
)
)
await nats_server.publish("citadel.security.door.access_denied", event)
# 4. Wait for agent processing
await asyncio.sleep(2)
# 5. Verify response
# - Check door was locked
door_status = await get_door_status("DOOR_DATACENTER_01")
assert door_status["locked"] == True
# - Check security alert was sent
alerts = await get_security_alerts()
assert len(alerts) == 1
assert alerts[0]["severity"] == "high"
# - Check audit trail
audit_logs = await get_audit_logs()
assert any(log["action"] == "lock_doors" for log in audit_logs)
# - Check OpenTelemetry traces
traces = await get_jaeger_traces(service_name="security-agent")
assert len(traces) > 0
assert traces[0].spans[0].name == "process_security_event"
Performance Tests
Throughput:
@pytest.mark.performance
async def test_event_processing_throughput():
"""Test agent can handle 100 events/second"""
agent = SecurityAgent(config)
await agent.start()
# Send 1000 events over 10 seconds
events = [create_test_event() for _ in range(1000)]
start_time = time.time()
for event in events:
await nats_server.publish("citadel.security.test", event)
# Wait for all events to be processed
await asyncio.sleep(5)
end_time = time.time()
duration = end_time - start_time
throughput = len(events) / duration
assert throughput >= 100 # 100 events/second minimum
Latency:
@pytest.mark.performance
async def test_end_to_end_latency():
"""Test event-to-response latency < 200ms"""
agent = SecurityAgent(config)
await agent.start()
latencies = []
for _ in range(100):
event = create_test_event()
start_time = time.time()
await nats_server.publish("citadel.security.test", event)
# Wait for response
response = await wait_for_response(event.id, timeout=5)
end_time = time.time()
latency = (end_time - start_time) * 1000 # Convert to ms
latencies.append(latency)
# P95 latency should be < 200ms
p95_latency = np.percentile(latencies, 95)
assert p95_latency < 200
Observability
Metrics
Prometheus Metrics:
# Events processed
security_events_total = Counter(
"citadel_security_events_total",
"Total security events processed",
["event_type", "source_system"]
)
# Threat levels detected
threat_level_total = Counter(
"citadel_security_threat_level_total",
"Threats detected by level",
["threat_level"]
)
# Actions executed
actions_executed_total = Counter(
"citadel_security_actions_executed_total",
"Response actions executed",
["action", "result"]
)
# Policy decisions
policy_decisions_total = Counter(
"citadel_security_policy_decisions_total",
"OPA policy decisions",
["decision"]
)
# Processing latency
event_processing_duration = Histogram(
"citadel_security_event_processing_duration_seconds",
"Event processing duration",
buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0]
)
Grafana Dashboard:
- Events per second
- Threat level distribution
- Action execution success rate
- Policy approval/denial rate
- P50/P95/P99 latency
Distributed Tracing
OpenTelemetry Spans:
with tracer.start_as_current_span("process_security_event") as span:
span.set_attribute("event.id", event.id)
span.set_attribute("event.type", event.type)
with tracer.start_as_current_span("analyze_threat"):
threat_level = await self._analyze_threat(state)
span.set_attribute("threat.level", threat_level.value)
with tracer.start_as_current_span("determine_response"):
response_plan = await self._determine_response(state)
span.set_attribute("response.action_count", len(response_plan))
with tracer.start_as_current_span("check_policy"):
allowed = await self._check_policy(action, state)
span.set_attribute("policy.allowed", allowed)
if allowed:
with tracer.start_as_current_span("execute_response"):
result = await self._execute_response(state)
span.set_attribute("response.status", result["status"])
Jaeger Trace Example:
process_security_event [200ms]
├─ analyze_threat [50ms]
│ ├─ calculate_threat_score [10ms]
│ └─ classify_threat_level [5ms]
├─ determine_response [20ms]
│ └─ build_response_plan [10ms]
├─ check_policy [30ms]
│ ├─ http_post [25ms]
│ └─ parse_response [2ms]
└─ execute_response [100ms]
├─ mcp_lock_door [40ms]
├─ mcp_alert_security [30ms]
└─ publish_audit_event [15ms]
Structured Logging
Log Format:
{
"timestamp": "2025-10-01T14:30:45.123Z",
"level": "info",
"service": "security-agent",
"agent_id": "security-agent-001",
"trace_id": "a7b3c9d4-e8f2-4a5b-9c7d-1e3f5a8b2c6d",
"span_id": "f5a8b2c6-1e3d-4a5b-9c7d-e8f2a7b3c9d4",
"event": "action_executed",
"action": "lock_doors",
"door_ids": ["DOOR_DATACENTER_01", "DOOR_SERVER_ROOM_01"],
"result": "success",
"duration_ms": 45
}
Deployment
Docker Container
Dockerfile:
FROM python:3.12-slim
WORKDIR /app
# Install SPIRE agent
RUN curl -L https://github.com/spiffe/spire/releases/download/v1.9.6/spire-1.9.6-linux-amd64-musl.tar.gz | tar xz
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy agent code
COPY src/agents /app/agents
COPY src/proto_gen /app/proto_gen
# Run agent
CMD ["python", "-m", "agents.security.security_agent"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: security-agent
namespace: citadelmesh
spec:
replicas: 1
selector:
matchLabels:
app: security-agent
template:
metadata:
labels:
app: security-agent
spec:
serviceAccountName: security-agent
containers:
- name: security-agent
image: citadelmesh/security-agent:latest
env:
- name: NATS_URL
value: "nats://nats:4222"
- name: OPA_URL
value: "http://opa:8181"
- name: SPIFFE_SOCKET
value: "/run/spire/sockets/agent.sock"
volumeMounts:
- name: spire-agent-socket
mountPath: /run/spire/sockets
readOnly: true
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
volumes:
- name: spire-agent-socket
hostPath:
path: /run/spire/sockets
type: Directory
Success Criteria
Phase 1 Completion Checklist
- ✅ State machine implemented - All 5 states functional
- ✅ SPIFFE integration - Agent authenticates with X.509-SVID
- ✅ OPA policy checks - All actions validated before execution
- ✅ NATS event consumption - CloudEvents deserialized from Protobuf
- ✅ MCP tool invocation - Schneider and Avigilon adapters called
- ✅ Telemetry instrumented - OpenTelemetry traces and metrics
- ✅ Unit tests passing - 100% state machine coverage
- ✅ Integration tests passing - End-to-end scenarios validated
- ✅ Performance tests passing - <200ms P95 latency, >100 events/sec throughput
- ✅ Documentation complete - Architecture, API, and operations docs
Validation Tests
Test 1: Unauthorized Access Response
# Publish unauthorized access event
nats pub citadel.security.door.access_denied \
'{"door_id": "DOOR_DATACENTER_01", "badge_id": "UNKNOWN"}'
# Expected: Door locks within 200ms, security alert sent, audit log created
Test 2: Policy Denial
# Attempt action that violates policy (e.g., unlock for >300 seconds)
nats pub citadel.security.test \
'{"action": "unlock_door", "duration": 600}'
# Expected: Policy denies, escalation event published, no door action executed
Test 3: Human Escalation
# Publish critical threat event
nats pub citadel.security.intrusion.alarm \
'{"zone": "vault", "severity": "critical"}'
# Expected: Escalation event published, human decision requested, no auto-action
Next Steps
Implementation Plan
- Week 1: Core state machine (Monitor, Analyze, Decide states)
- Week 2: Safety integration (Respond state with OPA checks)
- Week 3: Escalation logic (Escalate state with human-in-the-loop)
- Week 4: Integration testing and observability
Future Enhancements
- Machine Learning: Anomaly detection for unknown threat patterns
- Predictive Analytics: Forecast security incidents before they occur
- Multi-agent Coordination: Collaborate with Energy Agent for emergency protocols
- Advanced Reasoning: Use LLMs for natural language incident reports
- Automated Response Tuning: RL to optimize response actions over time
🎯 This design document serves as the blueprint for implementing the Security Agent. Once approved, we'll proceed to Option A: Implementation.