Skip to main content

Security Agent Architecture

Status: 🚧 In Progress
Phase: Phase 1 - Foundation
Completion: Designing → Implementation


Executive Summary

The Security Agent is CitadelMesh's first autonomous intelligent agent. It demonstrates the complete integration of our foundational infrastructure: SPIFFE identity, OPA policies, NATS event mesh, Protobuf protocols, and LangGraph state machines.

Mission: Provide autonomous building security orchestration with human-level intelligence and machine-level precision, all while maintaining strict safety guarantees.


Design Principles

1. Safety-First Architecture

Every action flows through OPA policy validation:

Event → Analyze → Decide → OPA Policy Check → Execute

Allow/Deny Decision

Critical Safety Rules:

  • ✅ No door unlock without policy approval
  • ✅ Maximum unlock duration: 300 seconds
  • ✅ Rate limiting: Max 10 unlocks/hour
  • ✅ Emergency override requires human approval
  • ✅ All actions logged with SPIFFE identity

2. Deterministic State Machine

LangGraph ensures predictable behavior:

  • Explicit state transitions - No hidden logic
  • Full observability - Every decision traced
  • Replay capability - Reproduce any scenario
  • Test coverage - 100% state machine paths tested

3. Zero-Trust Identity

SPIFFE/SPIRE integration:

  • Agent authenticates with X.509-SVID from SPIRE
  • All MCP tool calls include SPIFFE identity
  • Vendor systems verify agent identity
  • Audit logs contain cryptographic proof

4. Event-Driven Architecture

CloudEvents + Protobuf + NATS:

  • Subscribe to security events from vendors
  • Publish decisions for audit trail
  • Asynchronous, non-blocking processing
  • Graceful degradation if event bus unavailable

System Architecture

High-Level Data Flow

graph TB
subgraph "Vendor Systems"
SSE[Schneider Security Expert]
ACC[Avigilon Control Center]
end

subgraph "Event Mesh"
NATS[NATS JetStream]
end

subgraph "Security Agent"
Monitor[Monitor State]
Analyze[Analyze State]
Decide[Decide State]
Respond[Respond State]
Escalate[Escalate State]
end

subgraph "Safety Layer"
OPA[OPA Policy Engine]
SPIRE[SPIRE Agent]
end

subgraph "Observability"
Jaeger[Jaeger Traces]
Grafana[Grafana Metrics]
end

SSE -->|Door Events| NATS
ACC -->|Camera Analytics| NATS
NATS -->|CloudEvent| Monitor

Monitor --> Analyze
Analyze --> Decide
Decide -->|Check Policy| OPA
OPA -->|Allow/Deny| Decide
Decide --> Respond
Respond -->|Execute| SSE
Respond -->|Execute| ACC
Decide -->|High Threat| Escalate

SPIRE -->|X.509-SVID| Monitor
Monitor -->|Traces| Jaeger
Respond -->|Metrics| Grafana

Component Interactions

# Pseudo-code showing integration points

class SecurityAgent(BaseAgent):
def __init__(self, config: AgentConfig):
super().__init__(config)

# 1. SPIFFE Identity
self.spiffe_client = WorkloadAPIClient()
self.svid = self.spiffe_client.fetch_x509_svid()

# 2. OPA Safety Client
self.safety_client = SafetyClient(
opa_url="http://localhost:8181"
)

# 3. Event Bus (NATS + CloudEvents)
self.event_bus = EventBus(
nats_url="nats://localhost:4222"
)

# 4. Telemetry (OpenTelemetry)
self.telemetry = TelemetryCollector(
service_name="security-agent"
)

async def process_security_event(self, event: CloudEventMessage):
# Trace the entire workflow
with self.telemetry.trace_span("process_security_event") as span:
# 1. Analyze threat
threat = await self._analyze_threat(event)

# 2. Determine response
response = await self._determine_response(threat)

# 3. Check safety policy
allowed = await self.safety_client.evaluate_policy(
action=response.action,
params=response.params,
agent_spiffe_id=self.svid.spiffe_id
)

if not allowed:
span.set_attribute("policy_denied", True)
await self._escalate_to_human(threat, response)
return

# 4. Execute response
result = await self._execute_response(response)

# 5. Publish audit event
await self.event_bus.publish_audit(
action=response.action,
result=result,
spiffe_id=self.svid.spiffe_id
)

State Machine Design

5-State Workflow

┌──────────┐
│ MONITOR │ ← Entry Point
└────┬─────┘
│ Security Event Received

┌──────────┐
│ ANALYZE │ ← Assess Threat Level
└────┬─────┘
│ Threat Level: low/medium/high/critical

┌──────────┐
│ DECIDE │ ← Determine Response Action
└────┬─────┘

├─ Low Threat ──────────► MONITOR (loop back)

├─ Medium Threat ────────► RESPOND
│ │
│ ▼
│ ┌──────────┐
│ │ RESPOND │
│ └────┬─────┘
│ │
│ ├─ OPA Policy Check
│ │
│ ├─ Allowed ──► Execute Action ──► MONITOR
│ │
│ └─ Denied ───► ESCALATE

└─ High/Critical Threat ──► ESCALATE


┌──────────┐
│ ESCALATE │ ← Human-in-the-Loop
└────┬─────┘

└──► MONITOR (after human decision)

State Definitions

State 1: MONITOR

Purpose: Listen for security events from vendor systems

Inputs: CloudEvent messages from NATS topics

  • citadel.security.door.access_attempt
  • citadel.security.camera.person_detected
  • citadel.security.intrusion.alarm

Processing:

  1. Deserialize CloudEvent from NATS
  2. Validate Protobuf payload
  3. Enrich event with context (time, location, zone data)
  4. Add to agent memory

Outputs: Enriched SecurityState with events list

Transitions:

  • If events > 0: → ANALYZE
  • If no events: → MONITOR (loop)

Implementation:

async def _monitor_events(self, state: SecurityState) -> SecurityState:
"""Monitor incoming security events"""

# Subscribe to security event topics
events = []

async for cloud_event in self.event_bus.consume("citadel.security.*"):
# Deserialize Protobuf payload
if cloud_event.type == "citadel.security.door.access_attempt":
event_data = events_pb2.DoorAccessAttempt()
event_data.ParseFromString(cloud_event.data)

events.append(SecurityEvent(
event_id=cloud_event.id,
event_type="door_access",
source_system="schneider",
location=event_data.door_id,
timestamp=datetime.fromisoformat(cloud_event.time),
data=event_data,
confidence=1.0
))

return SecurityState(
events=events,
threat_level=ThreatLevel.LOW,
response_plan=[],
context={},
messages=[],
next_action="analyze" if events else "monitor"
)

State 2: ANALYZE

Purpose: Assess threat level based on events

Inputs: SecurityState with events list

Processing:

  1. Threat Scoring Algorithm:

    threat_score = 0.0

    # Factor 1: Event type severity
    severity_weights = {
    "unauthorized_access": 0.8,
    "forced_entry": 1.0,
    "loitering": 0.3,
    "tailgating": 0.6,
    "person_of_interest": 0.9
    }
    threat_score += severity_weights.get(event.type, 0.5)

    # Factor 2: Time context
    if is_after_hours():
    threat_score += 0.2

    # Factor 3: Location sensitivity
    if event.location in CRITICAL_ZONES:
    threat_score += 0.3

    # Factor 4: Historical pattern
    if recent_incidents_in_zone(event.location) > 3:
    threat_score += 0.2

    # Factor 5: Confidence level
    threat_score *= event.confidence
  2. Classify Threat Level:

    • score < 0.3: LOW
    • 0.3 <= score < 0.6: MEDIUM
    • 0.6 <= score < 0.8: HIGH
    • score >= 0.8: CRITICAL
  3. Incident Type Classification:

    • Match event patterns to known incident types
    • Use ML model for unknown patterns (future)

Outputs: Updated SecurityState with threat_level and incident_type

Transitions:

  • Always → DECIDE

Implementation:

async def _analyze_threat(self, state: SecurityState) -> SecurityState:
"""Analyze threat level from security events"""

with self.telemetry.trace_span("analyze_threat") as span:
threat_score = 0.0

for event in state.events:
# Calculate threat score
score = self._calculate_event_threat_score(event)
threat_score = max(threat_score, score)

# Classify threat level
if threat_score < 0.3:
threat_level = ThreatLevel.LOW
elif threat_score < 0.6:
threat_level = ThreatLevel.MEDIUM
elif threat_score < 0.8:
threat_level = ThreatLevel.HIGH
else:
threat_level = ThreatLevel.CRITICAL

span.set_attribute("threat_score", threat_score)
span.set_attribute("threat_level", threat_level.value)

state.threat_level = threat_level
state.context["threat_score"] = threat_score
state.next_action = "decide"

self.logger.info(
"Threat analyzed",
threat_level=threat_level.value,
threat_score=threat_score,
event_count=len(state.events)
)

return state

State 3: DECIDE

Purpose: Determine response action based on threat level

Inputs: SecurityState with threat_level

Processing:

Decision Matrix:

Threat LevelIncident TypeResponse ActionRequires Approval
LOWAnyMONITORNo
MEDIUMUnauthorized AccessLOCK_DOORSNo
MEDIUMLoiteringTRACK_PERSONNo
HIGHForced EntryLOCK_DOORS + ALERT_SECURITYNo
HIGHPerson of InterestTRACK_PERSON + ALERT_SECURITYNo
CRITICALAnyESCALATE_HUMANYes

Outputs: SecurityState with response_plan list

Transitions:

  • If threat_level == LOW: → MONITOR
  • If threat_level == MEDIUM and response_requires_approval == False: → RESPOND
  • If threat_level == HIGH and response_requires_approval == False: → RESPOND
  • If response_requires_approval == True: → ESCALATE
  • If threat_level == CRITICAL: → ESCALATE

Implementation:

async def _determine_response(self, state: SecurityState) -> SecurityState:
"""Determine response actions based on threat level"""

response_plan = []
requires_approval = False

if state.threat_level == ThreatLevel.LOW:
response_plan = [ResponseAction.MONITOR]
state.next_action = "monitor"

elif state.threat_level == ThreatLevel.MEDIUM:
if state.incident_type == IncidentType.UNAUTHORIZED_ACCESS:
response_plan = [ResponseAction.LOCK_DOORS]
elif state.incident_type == IncidentType.LOITERING:
response_plan = [ResponseAction.TRACK_PERSON]
else:
response_plan = [ResponseAction.MONITOR]
state.next_action = "respond"

elif state.threat_level == ThreatLevel.HIGH:
response_plan = [
ResponseAction.LOCK_DOORS,
ResponseAction.ALERT_SECURITY,
ResponseAction.TRACK_PERSON
]
state.next_action = "respond"

elif state.threat_level == ThreatLevel.CRITICAL:
response_plan = [ResponseAction.ESCALATE_HUMAN]
requires_approval = True
state.next_action = "escalate"

state.response_plan = response_plan
state.context["requires_approval"] = requires_approval

self.logger.info(
"Response determined",
threat_level=state.threat_level.value,
response_plan=[a.value for a in response_plan],
requires_approval=requires_approval
)

return state

State 4: RESPOND

Purpose: Execute response actions with safety checks

Inputs: SecurityState with response_plan

Processing:

For each action in response_plan:

  1. Check OPA Policy:

    policy_input = {
    "action": action.value,
    "agent_spiffe_id": self.svid.spiffe_id,
    "door_id": door_id,
    "duration_seconds": 300,
    "priority": "PRIORITY_HIGH",
    "threat_level": state.threat_level.value
    }

    result = await self.safety_client.evaluate_policy(
    path="citadel/security/allow_door_unlock",
    input_data=policy_input
    )

    if not result["allow"]:
    self.logger.warning("Policy denied action", reason=result.get("reason"))
    state.next_action = "escalate"
    return state
  2. Execute MCP Tool Call:

    if action == ResponseAction.LOCK_DOORS:
    for door_id in affected_doors:
    result = await self.mcp_client.execute_door_action(
    action="lock",
    door_id=door_id
    )

    elif action == ResponseAction.TRACK_PERSON:
    result = await self.mcp_client.execute_camera_action(
    action="track_person",
    person_id=person_id
    )
  3. Publish Audit Event:

    audit_event = events_pb2.AuditEvent(
    action=action.value,
    result=result,
    agent_spiffe_id=self.svid.spiffe_id,
    timestamp=datetime.utcnow().isoformat()
    )

    await self.event_bus.publish("citadel.audit", audit_event)

Outputs: Updated SecurityState with execution results

Transitions:

  • If all actions executed successfully: → MONITOR
  • If policy denied any action: → ESCALATE
  • If execution error: → ESCALATE

Implementation:

async def _execute_response(self, state: SecurityState) -> SecurityState:
"""Execute response actions with safety checks"""

with self.telemetry.trace_span("execute_response") as span:
span.set_attribute("action_count", len(state.response_plan))

for action in state.response_plan:
# Skip monitor-only actions
if action == ResponseAction.MONITOR:
continue

# Check OPA policy
policy_input = self._build_policy_input(state, action)

policy_result = await self.safety_client.evaluate_policy(
path="citadel/security/allow_action",
input_data=policy_input
)

if not policy_result["allow"]:
self.logger.warning(
"Policy denied action",
action=action.value,
reason=policy_result.get("reason")
)
span.set_attribute("policy_denied", True)
state.next_action = "escalate"
return state

# Execute action via MCP
try:
result = await self._execute_mcp_action(action, state)

# Log success
self.logger.info(
"Action executed",
action=action.value,
result=result
)

# Publish audit event
await self._publish_audit_event(state, action, result)

except Exception as e:
self.logger.error(
"Action execution failed",
action=action.value,
error=str(e)
)
span.set_attribute("execution_error", str(e))
state.next_action = "escalate"
return state

state.next_action = "monitor"
return state

State 5: ESCALATE

Purpose: Human-in-the-loop for critical decisions

Inputs: SecurityState with context explaining escalation reason

Processing:

  1. Create Incident Report:

    incident = {
    "id": f"INC-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}",
    "threat_level": state.threat_level.value,
    "events": [asdict(e) for e in state.events],
    "proposed_response": [a.value for a in state.response_plan],
    "escalation_reason": state.context.get("escalation_reason"),
    "timestamp": datetime.utcnow().isoformat()
    }
  2. Notify Human Operators:

    • Send to security operations center (SOC)
    • Page on-call security team
    • Display on living building interface
  3. Wait for Human Decision:

    # Poll for human response
    decision = await self.wait_for_human_decision(
    incident_id=incident["id"],
    timeout_seconds=300
    )

    if decision == "approve":
    # Execute proposed response
    state.next_action = "respond"
    elif decision == "deny":
    # Log denial and monitor
    state.next_action = "monitor"
    elif decision == "timeout":
    # Default to safe action (monitor only)
    state.next_action = "monitor"
  4. Log Human Decision:

    await self.event_bus.publish_audit({
    "incident_id": incident["id"],
    "human_decision": decision,
    "operator_id": operator_id,
    "timestamp": datetime.utcnow().isoformat()
    })

Outputs: SecurityState with human decision

Transitions:

  • If approved: → RESPOND
  • If denied or timeout: → MONITOR

Implementation:

async def _escalate_to_human(self, state: SecurityState) -> SecurityState:
"""Escalate to human operator for decision"""

with self.telemetry.trace_span("escalate_to_human") as span:
# Create incident report
incident_id = f"INC-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"

incident_report = {
"id": incident_id,
"threat_level": state.threat_level.value,
"incident_type": state.incident_type.value if state.incident_type else "unknown",
"events_summary": self._summarize_events(state.events),
"proposed_response": [a.value for a in state.response_plan],
"escalation_reason": state.context.get("escalation_reason", "High threat level"),
"requires_decision_by": (datetime.utcnow() + timedelta(minutes=5)).isoformat()
}

span.set_attribute("incident_id", incident_id)

# Publish escalation event
escalation_event = events_pb2.IncidentEscalation(
incident_id=incident_id,
threat_level=state.threat_level.value,
requires_approval=True
)

await self.event_bus.publish("citadel.security.escalation", escalation_event)

self.logger.warning(
"Incident escalated to human",
incident_id=incident_id,
threat_level=state.threat_level.value
)

# Wait for human decision (with timeout)
decision = await self._wait_for_human_decision(
incident_id=incident_id,
timeout_seconds=300
)

span.set_attribute("human_decision", decision)

if decision == "approve":
state.next_action = "respond"
else:
# Default to safe action (monitor only)
state.response_plan = [ResponseAction.MONITOR]
state.next_action = "monitor"

return state

async def _wait_for_human_decision(
self,
incident_id: str,
timeout_seconds: int
) -> str:
"""Wait for human decision on escalated incident"""

# Subscribe to decision topic
decision_topic = f"citadel.security.decision.{incident_id}"

try:
decision_event = await asyncio.wait_for(
self.event_bus.receive_one(decision_topic),
timeout=timeout_seconds
)

# Parse decision
return decision_event.data.get("decision", "deny")

except asyncio.TimeoutError:
self.logger.warning(
"Human decision timeout",
incident_id=incident_id
)
return "timeout"

Integration Points

SPIFFE/SPIRE Identity

Workload Registration:

# Register security agent with SPIRE
spire-server entry create \
-parentID spiffe://citadel.mesh/agent/node1 \
-spiffeID spiffe://citadel.mesh/security-agent \
-selector unix:uid:1000 \
-dns security-agent \
-ttl 3600

SVID Usage in Agent:

from spiffe import WorkloadApiClient

class SecurityAgent(BaseAgent):
async def start(self):
# Fetch X.509-SVID from SPIRE agent
spiffe_client = WorkloadApiClient('/run/spire/sockets/agent.sock')
self.svid = spiffe_client.fetch_x509_svid()

self.logger.info(
"SPIFFE identity acquired",
spiffe_id=self.svid.spiffe_id.to_string(),
serial_number=self.svid.cert.serial_number
)

# Use SVID for mTLS connections
await super().start()

OPA Policy Integration

Policy File: policies/security.rego

package citadel.security

import rego.v1

# Default deny for safety
default allow := false

# Door unlock policy
allow if {
input.action == "lock_doors"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
input.threat_level in ["medium", "high", "critical"]
}

allow if {
input.action == "unlock_doors"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
input.priority == "PRIORITY_EMERGENCY"
input.duration_seconds <= 300
}

allow if {
input.action == "track_person"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
}

allow if {
input.action == "alert_security"
input.agent_spiffe_id == "spiffe://citadel.mesh/security-agent"
}

# Rate limiting
deny if {
count(recent_actions) > 10
}

recent_actions contains action if {
action := data.recent_actions[_]
action.timestamp > (time.now_ns() - 3600000000000) # Last hour
}

# Violation reasons
violation_reason := "Rate limit exceeded: more than 10 actions in last hour" if {
deny
count(recent_actions) > 10
}

Policy Check in Agent:

async def _check_policy(self, action: ResponseAction, state: SecurityState) -> bool:
"""Check OPA policy before executing action"""

policy_input = {
"action": action.value,
"agent_spiffe_id": self.svid.spiffe_id.to_string(),
"threat_level": state.threat_level.value,
"timestamp": datetime.utcnow().isoformat()
}

if action == ResponseAction.LOCK_DOORS:
policy_input["door_ids"] = self._get_affected_doors(state)

result = await self.safety_client.evaluate_policy(
path="citadel/security/allow",
input_data=policy_input
)

if not result["allow"]:
self.logger.warning(
"Policy denied action",
action=action.value,
reason=result.get("violation_reason")
)

# Emit policy denial event
await self.telemetry.increment_counter(
"policy_denials",
tags={"action": action.value}
)

return result["allow"]

NATS Event Consumption

CloudEvent Subscription:

async def _start_event_loop(self):
"""Start consuming security events from NATS"""

# Subscribe to all security event topics
topics = [
"citadel.security.door.*",
"citadel.security.camera.*",
"citadel.security.intrusion.*"
]

async with self.event_bus.subscribe(topics) as subscription:
async for cloud_event in subscription:
# Process event through state machine
await self._process_cloud_event(cloud_event)

async def _process_cloud_event(self, cloud_event: CloudEventMessage):
"""Process incoming CloudEvent through state machine"""

with self.telemetry.trace_span("process_cloud_event") as span:
span.set_attribute("event_id", cloud_event.id)
span.set_attribute("event_type", cloud_event.type)

# Deserialize Protobuf payload
event_data = self._deserialize_event(cloud_event)

# Create SecurityEvent
security_event = SecurityEvent(
event_id=cloud_event.id,
event_type=cloud_event.type,
source_system=cloud_event.source,
location=event_data.location,
timestamp=datetime.fromisoformat(cloud_event.time),
data=event_data,
confidence=event_data.confidence if hasattr(event_data, 'confidence') else 1.0
)

# Run through state machine
initial_state = SecurityState(
events=[security_event],
threat_level=ThreatLevel.LOW,
response_plan=[],
context={},
messages=[]
)

final_state = await self.graph.ainvoke(initial_state)

span.set_attribute("threat_level", final_state.threat_level.value)
span.set_attribute("actions_executed", len(final_state.response_plan))

MCP Tool Invocation

Schneider Security Expert Adapter:

async def _execute_door_control(self, action: ResponseAction, door_ids: List[str]) -> Dict:
"""Execute door control via Schneider MCP adapter"""

results = []

for door_id in door_ids:
if action == ResponseAction.LOCK_DOORS:
result = await self.mcp_clients['schneider'].call_tool(
tool_name="lock_door",
arguments={
"door_id": door_id,
"force": False
}
)
elif action == ResponseAction.UNLOCK_EMERGENCY:
result = await self.mcp_clients['schneider'].call_tool(
tool_name="unlock_door",
arguments={
"door_id": door_id,
"duration_seconds": 300,
"priority": "PRIORITY_EMERGENCY"
}
)

results.append(result)

return {"door_actions": results}

Avigilon Camera Analytics:

async def _track_person(self, person_id: str) -> Dict:
"""Track person movement via Avigilon MCP adapter"""

result = await self.mcp_clients['avigilon'].call_tool(
tool_name="track_person_movement",
arguments={
"person_id": person_id,
"duration_seconds": 600,
"zones": ["entrance", "lobby", "corridor_a"]
}
)

return result

Testing Strategy

Unit Tests

State Machine Logic:

@pytest.mark.asyncio
async def test_low_threat_monitoring():
"""Test that low threats continue monitoring"""
agent = SecurityAgent(config)

state = SecurityState(
events=[create_loitering_event()],
threat_level=ThreatLevel.LOW,
response_plan=[],
context={},
messages=[]
)

result = await agent._determine_response(state)

assert result.response_plan == [ResponseAction.MONITOR]
assert result.next_action == "monitor"

@pytest.mark.asyncio
async def test_high_threat_escalation():
"""Test that high threats escalate to human"""
agent = SecurityAgent(config)

state = SecurityState(
events=[create_forced_entry_event()],
threat_level=ThreatLevel.HIGH,
response_plan=[],
context={},
messages=[]
)

result = await agent._determine_response(state)

assert ResponseAction.LOCK_DOORS in result.response_plan
assert ResponseAction.ALERT_SECURITY in result.response_plan
assert result.next_action == "respond"

OPA Policy Tests:

package citadel.security

test_allow_lock_doors_medium_threat if {
allow with input as {
"action": "lock_doors",
"agent_spiffe_id": "spiffe://citadel.mesh/security-agent",
"threat_level": "medium"
}
}

test_deny_unlock_without_emergency if {
not allow with input as {
"action": "unlock_doors",
"priority": "PRIORITY_NORMAL",
"duration_seconds": 600
}
}

Integration Tests

End-to-End Scenario:

@pytest.mark.integration
@pytest.mark.asyncio
async def test_unauthorized_access_scenario():
"""Test complete unauthorized access incident response"""

# 1. Setup infrastructure
opa_server = await start_opa_server()
nats_server = await start_nats_server()
spire_agent = await start_spire_agent()

# 2. Start security agent
agent = SecurityAgent(config)
await agent.start()

# 3. Simulate unauthorized access event
event = create_cloud_event(
type="citadel.security.door.access_denied",
data=create_door_access_event(
door_id="DOOR_DATACENTER_01",
access_denied=True,
badge_id="UNKNOWN"
)
)

await nats_server.publish("citadel.security.door.access_denied", event)

# 4. Wait for agent processing
await asyncio.sleep(2)

# 5. Verify response
# - Check door was locked
door_status = await get_door_status("DOOR_DATACENTER_01")
assert door_status["locked"] == True

# - Check security alert was sent
alerts = await get_security_alerts()
assert len(alerts) == 1
assert alerts[0]["severity"] == "high"

# - Check audit trail
audit_logs = await get_audit_logs()
assert any(log["action"] == "lock_doors" for log in audit_logs)

# - Check OpenTelemetry traces
traces = await get_jaeger_traces(service_name="security-agent")
assert len(traces) > 0
assert traces[0].spans[0].name == "process_security_event"

Performance Tests

Throughput:

@pytest.mark.performance
async def test_event_processing_throughput():
"""Test agent can handle 100 events/second"""

agent = SecurityAgent(config)
await agent.start()

# Send 1000 events over 10 seconds
events = [create_test_event() for _ in range(1000)]

start_time = time.time()

for event in events:
await nats_server.publish("citadel.security.test", event)

# Wait for all events to be processed
await asyncio.sleep(5)

end_time = time.time()
duration = end_time - start_time

throughput = len(events) / duration
assert throughput >= 100 # 100 events/second minimum

Latency:

@pytest.mark.performance
async def test_end_to_end_latency():
"""Test event-to-response latency < 200ms"""

agent = SecurityAgent(config)
await agent.start()

latencies = []

for _ in range(100):
event = create_test_event()

start_time = time.time()
await nats_server.publish("citadel.security.test", event)

# Wait for response
response = await wait_for_response(event.id, timeout=5)

end_time = time.time()
latency = (end_time - start_time) * 1000 # Convert to ms

latencies.append(latency)

# P95 latency should be < 200ms
p95_latency = np.percentile(latencies, 95)
assert p95_latency < 200

Observability

Metrics

Prometheus Metrics:

# Events processed
security_events_total = Counter(
"citadel_security_events_total",
"Total security events processed",
["event_type", "source_system"]
)

# Threat levels detected
threat_level_total = Counter(
"citadel_security_threat_level_total",
"Threats detected by level",
["threat_level"]
)

# Actions executed
actions_executed_total = Counter(
"citadel_security_actions_executed_total",
"Response actions executed",
["action", "result"]
)

# Policy decisions
policy_decisions_total = Counter(
"citadel_security_policy_decisions_total",
"OPA policy decisions",
["decision"]
)

# Processing latency
event_processing_duration = Histogram(
"citadel_security_event_processing_duration_seconds",
"Event processing duration",
buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0]
)

Grafana Dashboard:

  • Events per second
  • Threat level distribution
  • Action execution success rate
  • Policy approval/denial rate
  • P50/P95/P99 latency

Distributed Tracing

OpenTelemetry Spans:

with tracer.start_as_current_span("process_security_event") as span:
span.set_attribute("event.id", event.id)
span.set_attribute("event.type", event.type)

with tracer.start_as_current_span("analyze_threat"):
threat_level = await self._analyze_threat(state)
span.set_attribute("threat.level", threat_level.value)

with tracer.start_as_current_span("determine_response"):
response_plan = await self._determine_response(state)
span.set_attribute("response.action_count", len(response_plan))

with tracer.start_as_current_span("check_policy"):
allowed = await self._check_policy(action, state)
span.set_attribute("policy.allowed", allowed)

if allowed:
with tracer.start_as_current_span("execute_response"):
result = await self._execute_response(state)
span.set_attribute("response.status", result["status"])

Jaeger Trace Example:

process_security_event [200ms]
├─ analyze_threat [50ms]
│ ├─ calculate_threat_score [10ms]
│ └─ classify_threat_level [5ms]
├─ determine_response [20ms]
│ └─ build_response_plan [10ms]
├─ check_policy [30ms]
│ ├─ http_post [25ms]
│ └─ parse_response [2ms]
└─ execute_response [100ms]
├─ mcp_lock_door [40ms]
├─ mcp_alert_security [30ms]
└─ publish_audit_event [15ms]

Structured Logging

Log Format:

{
"timestamp": "2025-10-01T14:30:45.123Z",
"level": "info",
"service": "security-agent",
"agent_id": "security-agent-001",
"trace_id": "a7b3c9d4-e8f2-4a5b-9c7d-1e3f5a8b2c6d",
"span_id": "f5a8b2c6-1e3d-4a5b-9c7d-e8f2a7b3c9d4",
"event": "action_executed",
"action": "lock_doors",
"door_ids": ["DOOR_DATACENTER_01", "DOOR_SERVER_ROOM_01"],
"result": "success",
"duration_ms": 45
}

Deployment

Docker Container

Dockerfile:

FROM python:3.12-slim

WORKDIR /app

# Install SPIRE agent
RUN curl -L https://github.com/spiffe/spire/releases/download/v1.9.6/spire-1.9.6-linux-amd64-musl.tar.gz | tar xz

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy agent code
COPY src/agents /app/agents
COPY src/proto_gen /app/proto_gen

# Run agent
CMD ["python", "-m", "agents.security.security_agent"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
name: security-agent
namespace: citadelmesh
spec:
replicas: 1
selector:
matchLabels:
app: security-agent
template:
metadata:
labels:
app: security-agent
spec:
serviceAccountName: security-agent
containers:
- name: security-agent
image: citadelmesh/security-agent:latest
env:
- name: NATS_URL
value: "nats://nats:4222"
- name: OPA_URL
value: "http://opa:8181"
- name: SPIFFE_SOCKET
value: "/run/spire/sockets/agent.sock"
volumeMounts:
- name: spire-agent-socket
mountPath: /run/spire/sockets
readOnly: true
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
volumes:
- name: spire-agent-socket
hostPath:
path: /run/spire/sockets
type: Directory

Success Criteria

Phase 1 Completion Checklist

  • State machine implemented - All 5 states functional
  • SPIFFE integration - Agent authenticates with X.509-SVID
  • OPA policy checks - All actions validated before execution
  • NATS event consumption - CloudEvents deserialized from Protobuf
  • MCP tool invocation - Schneider and Avigilon adapters called
  • Telemetry instrumented - OpenTelemetry traces and metrics
  • Unit tests passing - 100% state machine coverage
  • Integration tests passing - End-to-end scenarios validated
  • Performance tests passing - <200ms P95 latency, >100 events/sec throughput
  • Documentation complete - Architecture, API, and operations docs

Validation Tests

Test 1: Unauthorized Access Response

# Publish unauthorized access event
nats pub citadel.security.door.access_denied \
'{"door_id": "DOOR_DATACENTER_01", "badge_id": "UNKNOWN"}'

# Expected: Door locks within 200ms, security alert sent, audit log created

Test 2: Policy Denial

# Attempt action that violates policy (e.g., unlock for &gt;300 seconds)
nats pub citadel.security.test \
'{"action": "unlock_door", "duration": 600}'

# Expected: Policy denies, escalation event published, no door action executed

Test 3: Human Escalation

# Publish critical threat event
nats pub citadel.security.intrusion.alarm \
'{"zone": "vault", "severity": "critical"}'

# Expected: Escalation event published, human decision requested, no auto-action

Next Steps

Implementation Plan

  1. Week 1: Core state machine (Monitor, Analyze, Decide states)
  2. Week 2: Safety integration (Respond state with OPA checks)
  3. Week 3: Escalation logic (Escalate state with human-in-the-loop)
  4. Week 4: Integration testing and observability

Future Enhancements

  • Machine Learning: Anomaly detection for unknown threat patterns
  • Predictive Analytics: Forecast security incidents before they occur
  • Multi-agent Coordination: Collaborate with Energy Agent for emergency protocols
  • Advanced Reasoning: Use LLMs for natural language incident reports
  • Automated Response Tuning: RL to optimize response actions over time

🎯 This design document serves as the blueprint for implementing the Security Agent. Once approved, we'll proceed to Option A: Implementation.