Domain Models (Pydantic)
AuroraSOC defines its business domain using Pydantic BaseModel classes for validation, serialization, and type safety. These models are distinct from ORM models — they represent the transport layer used in API communication, agent messaging, and event payloads.
Why Two Model Layers?
| Layer | Framework | Purpose |
|---|---|---|
| Transport | Pydantic | Validation, serialization, API schemas |
| Persistence | SQLAlchemy | Database CRUD, relationships, migrations |
Separating them allows API schemas to evolve independently of the database schema. An alert sent over Redis Streams includes fields that don't exist in the database (like assigned_agent as an enum), while the database stores fields not needed in transport (like dedup_hash).
Enumerations
AuroraSOC defines 10 enumerations using Python's str, Enum pattern for JSON serialization:
Severity Levels
class Severity(str, Enum):
CRITICAL = "critical" # Immediate response required
HIGH = "high" # Response within 1 hour
MEDIUM = "medium" # Response within 4 hours
LOW = "low" # Response within 24 hours
INFO = "info" # Informational only
Alert Lifecycle
class AlertStatus(str, Enum):
NEW = "new" # Just ingested
TRIAGED = "triaged" # Severity assessed by orchestrator
INVESTIGATING = "investigating" # Agent actively analyzing
ESCALATED = "escalated" # Elevated to human analyst
RESOLVED = "resolved" # Investigation complete
FALSE_POSITIVE = "false_positive"
Case Lifecycle
class CaseStatus(str, Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
PENDING_APPROVAL = "pending_approval" # Human gate
CONTAINED = "contained"
ERADICATED = "eradicated"
RECOVERED = "recovered"
CLOSED = "closed"
IOC Types
class IOCType(str, Enum):
IP = "ip"
DOMAIN = "domain"
URL = "url"
HASH_MD5 = "hash_md5"
HASH_SHA1 = "hash_sha1"
HASH_SHA256 = "hash_sha256"
EMAIL = "email"
CVE = "cve"
FILE_PATH = "file_path"
REGISTRY_KEY = "registry_key"
MUTEX = "mutex"
USER_AGENT = "user_agent"
Asset Types
class AssetType(str, Enum):
SERVER = "server"
WORKSTATION = "workstation"
NETWORK_DEVICE = "network_device"
IOT_SENSOR = "iot_sensor"
CPS_CONTROLLER = "cps_controller"
OT_PLC = "ot_plc"
CLOUD_INSTANCE = "cloud_instance"
CONTAINER = "container"
MOBILE = "mobile"
CPS-Specific Enums
class DeviceFamily(str, Enum):
STM32 = "stm32"
NRF52 = "nrf52"
ESP32S3 = "esp32s3"
UNKNOWN = "unknown"
class FirmwareStack(str, Enum):
ADA_SPARK = "ada_spark" # STM32 — formal verification
RUST_EMBASSY = "rust_embassy" # nRF52840 — memory safety
ZEPHYR_RTOS = "zephyr_rtos" # ESP32-S3 — real-time OS
UNKNOWN = "unknown"
class AttestationStatus(str, Enum):
VALID = "valid"
REVOKED = "revoked"
EXPIRED = "expired"
UNKNOWN = "unknown"
FAILED = "failed"
class PhysicalCyberCorrelation(str, Enum):
PHYSICAL_ONLY = "physical_only"
CYBER_ONLY = "cyber_only"
CORRELATED = "correlated" # Both physical + cyber events detected
ESCALATED = "escalated" # Correlated and escalated to human
Agent Types
class AgentType(str, Enum):
ORCHESTRATOR = "orchestrator"
SECURITY_ANALYST = "security_analyst"
THREAT_HUNTER = "threat_hunter"
MALWARE_ANALYST = "malware_analyst"
INCIDENT_RESPONDER = "incident_responder"
NETWORK_SECURITY = "network_security"
WEB_SECURITY = "web_security"
CLOUD_SECURITY = "cloud_security"
THREAT_INTEL = "threat_intel"
CPS_SECURITY = "cps_security"
ENDPOINT_SECURITY = "endpoint_security"
UEBA_ANALYST = "ueba_analyst"
FORENSIC_ANALYST = "forensic_analyst"
COMPLIANCE_ANALYST = "compliance_analyst"
VULNERABILITY_MANAGER = "vulnerability_manager"
REPORT_GENERATOR = "report_generator"
Transport Models
IOC
class IOC(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
type: IOCType
value: str
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
source: str = ""
first_seen: datetime = Field(default_factory=_utcnow)
last_seen: datetime = Field(default_factory=_utcnow)
tags: list[str] = Field(default_factory=list)
context: dict[str, Any] = Field(default_factory=dict)
Alert
class Alert(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
description: str = ""
severity: Severity = Severity.MEDIUM
status: AlertStatus = AlertStatus.NEW
source: str = ""
asset_type: AssetType = AssetType.SERVER
affected_assets: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
raw_log: str = ""
timestamp: datetime = Field(default_factory=_utcnow)
assigned_agent: AgentType | None = None
case_id: str | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
Case
class Case(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
severity: Severity = Severity.MEDIUM
status: CaseStatus = CaseStatus.OPEN
alert_ids: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
affected_assets: list[str] = Field(default_factory=list)
cps_devices_involved: list[str] = Field(default_factory=list)
physical_impact_assessment: str = ""
timeline: list[CaseTimelineEntry] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
assigned_agents: list[AgentType] = Field(default_factory=list)
created_at: datetime = Field(default_factory=_utcnow)
updated_at: datetime = Field(default_factory=_utcnow)
closed_at: datetime | None = None
outcome: str = ""
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
CPS Device
class CPSDevice(BaseModel):
id: str
name: str
family: DeviceFamily
firmware_stack: FirmwareStack
firmware_version: str = ""
firmware_hash: str = ""
location: str = ""
ip_address: str = ""
attestation_status: AttestationStatus = AttestationStatus.UNKNOWN
risk_score: int = Field(ge=0, le=100, default=0)
network_segment: str = ""
tags: list[str] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
registered_at: datetime = Field(default_factory=_utcnow)
Agent Communication
class AgentTask(BaseModel):
"""Task dispatched to a specialist agent."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
agent_type: AgentType
prompt: str
context: dict[str, Any] = Field(default_factory=dict)
priority: int = Field(ge=1, le=5, default=3) # 1=lowest, 5=highest
case_id: str | None = None
alert_id: str | None = None
class AgentResponse(BaseModel):
"""Response from a specialist agent."""
task_id: str
agent_type: AgentType
result: dict[str, Any] = Field(default_factory=dict)
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
tool_calls: list[str] = Field(default_factory=list)
reasoning_steps: list[str] = Field(default_factory=list)
duration_ms: int = 0
Specialized CPS Models
class SensorTelemetry(BaseModel):
"""Telemetry data point from an edge sensor."""
device_id: str
metric: str # e.g., "temperature", "motion_count"
value: float
unit: str = "" # e.g., "celsius", "count"
timestamp: datetime = Field(default_factory=_utcnow)
class AttestationResult(BaseModel):
"""Hardware attestation verification result."""
device_id: str
status: AttestationStatus
firmware_hash: str
expected_hash: str
certificate_valid: bool
board_family: DeviceFamily
boot_count: int = 0
class PhysicalCyberEvent(BaseModel):
"""Correlated physical + cyber event."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
correlation_type: PhysicalCyberCorrelation
physical_event_id: str
cyber_event_ids: list[str] = Field(default_factory=list)
device_id: str
time_window_seconds: int = 120
risk_score: int = Field(ge=0, le=100, default=50)
class InvestigationReport(BaseModel):
"""Structured output from multi-agent investigation."""
case_id: str
severity: Severity
confidence: float = Field(ge=0.0, le=1.0)
mitre_techniques: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
agent_findings: dict[str, Any] = Field(default_factory=dict)