انتقل إلى المحتوى الرئيسي

Domain Models (Pydantic)

AuroraSOC defines its business domain using Pydantic BaseModel classes for validation, serialization, and type safety. These models are distinct from ORM models — they represent the transport layer used in API communication, agent messaging, and event payloads.

Why Two Model Layers?

LayerFrameworkPurpose
TransportPydanticValidation, serialization, API schemas
PersistenceSQLAlchemyDatabase CRUD, relationships, migrations

Separating them allows API schemas to evolve independently of the database schema. An alert sent over Redis Streams includes fields that don't exist in the database (like assigned_agent as an enum), while the database stores fields not needed in transport (like dedup_hash).

Enumerations

AuroraSOC defines 10 enumerations using Python's str, Enum pattern for JSON serialization:

Severity Levels

class Severity(str, Enum):
CRITICAL = "critical" # Immediate response required
HIGH = "high" # Response within 1 hour
MEDIUM = "medium" # Response within 4 hours
LOW = "low" # Response within 24 hours
INFO = "info" # Informational only

Alert Lifecycle

class AlertStatus(str, Enum):
NEW = "new" # Just ingested
TRIAGED = "triaged" # Severity assessed by orchestrator
INVESTIGATING = "investigating" # Agent actively analyzing
ESCALATED = "escalated" # Elevated to human analyst
RESOLVED = "resolved" # Investigation complete
FALSE_POSITIVE = "false_positive"

Case Lifecycle

class CaseStatus(str, Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
PENDING_APPROVAL = "pending_approval" # Human gate
CONTAINED = "contained"
ERADICATED = "eradicated"
RECOVERED = "recovered"
CLOSED = "closed"

IOC Types

class IOCType(str, Enum):
IP = "ip"
DOMAIN = "domain"
URL = "url"
HASH_MD5 = "hash_md5"
HASH_SHA1 = "hash_sha1"
HASH_SHA256 = "hash_sha256"
EMAIL = "email"
CVE = "cve"
FILE_PATH = "file_path"
REGISTRY_KEY = "registry_key"
MUTEX = "mutex"
USER_AGENT = "user_agent"

Asset Types

class AssetType(str, Enum):
SERVER = "server"
WORKSTATION = "workstation"
NETWORK_DEVICE = "network_device"
IOT_SENSOR = "iot_sensor"
CPS_CONTROLLER = "cps_controller"
OT_PLC = "ot_plc"
CLOUD_INSTANCE = "cloud_instance"
CONTAINER = "container"
MOBILE = "mobile"

CPS-Specific Enums

class DeviceFamily(str, Enum):
STM32 = "stm32"
NRF52 = "nrf52"
ESP32S3 = "esp32s3"
UNKNOWN = "unknown"

class FirmwareStack(str, Enum):
ADA_SPARK = "ada_spark" # STM32 — formal verification
RUST_EMBASSY = "rust_embassy" # nRF52840 — memory safety
ZEPHYR_RTOS = "zephyr_rtos" # ESP32-S3 — real-time OS
UNKNOWN = "unknown"

class AttestationStatus(str, Enum):
VALID = "valid"
REVOKED = "revoked"
EXPIRED = "expired"
UNKNOWN = "unknown"
FAILED = "failed"

class PhysicalCyberCorrelation(str, Enum):
PHYSICAL_ONLY = "physical_only"
CYBER_ONLY = "cyber_only"
CORRELATED = "correlated" # Both physical + cyber events detected
ESCALATED = "escalated" # Correlated and escalated to human

Agent Types

class AgentType(str, Enum):
ORCHESTRATOR = "orchestrator"
SECURITY_ANALYST = "security_analyst"
THREAT_HUNTER = "threat_hunter"
MALWARE_ANALYST = "malware_analyst"
INCIDENT_RESPONDER = "incident_responder"
NETWORK_SECURITY = "network_security"
WEB_SECURITY = "web_security"
CLOUD_SECURITY = "cloud_security"
THREAT_INTEL = "threat_intel"
CPS_SECURITY = "cps_security"
ENDPOINT_SECURITY = "endpoint_security"
UEBA_ANALYST = "ueba_analyst"
FORENSIC_ANALYST = "forensic_analyst"
COMPLIANCE_ANALYST = "compliance_analyst"
VULNERABILITY_MANAGER = "vulnerability_manager"
REPORT_GENERATOR = "report_generator"

Transport Models

IOC

class IOC(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
type: IOCType
value: str
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
source: str = ""
first_seen: datetime = Field(default_factory=_utcnow)
last_seen: datetime = Field(default_factory=_utcnow)
tags: list[str] = Field(default_factory=list)
context: dict[str, Any] = Field(default_factory=dict)

Alert

class Alert(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
description: str = ""
severity: Severity = Severity.MEDIUM
status: AlertStatus = AlertStatus.NEW
source: str = ""
asset_type: AssetType = AssetType.SERVER
affected_assets: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
raw_log: str = ""
timestamp: datetime = Field(default_factory=_utcnow)
assigned_agent: AgentType | None = None
case_id: str | None = None
metadata: dict[str, Any] = Field(default_factory=dict)

Case

class Case(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
severity: Severity = Severity.MEDIUM
status: CaseStatus = CaseStatus.OPEN
alert_ids: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
affected_assets: list[str] = Field(default_factory=list)
cps_devices_involved: list[str] = Field(default_factory=list)
physical_impact_assessment: str = ""
timeline: list[CaseTimelineEntry] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
assigned_agents: list[AgentType] = Field(default_factory=list)
created_at: datetime = Field(default_factory=_utcnow)
updated_at: datetime = Field(default_factory=_utcnow)
closed_at: datetime | None = None
outcome: str = ""
confidence: float = Field(ge=0.0, le=1.0, default=0.5)

CPS Device

class CPSDevice(BaseModel):
id: str
name: str
family: DeviceFamily
firmware_stack: FirmwareStack
firmware_version: str = ""
firmware_hash: str = ""
location: str = ""
ip_address: str = ""
attestation_status: AttestationStatus = AttestationStatus.UNKNOWN
risk_score: int = Field(ge=0, le=100, default=0)
network_segment: str = ""
tags: list[str] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
registered_at: datetime = Field(default_factory=_utcnow)

Agent Communication

class AgentTask(BaseModel):
"""Task dispatched to a specialist agent."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
agent_type: AgentType
prompt: str
context: dict[str, Any] = Field(default_factory=dict)
priority: int = Field(ge=1, le=5, default=3) # 1=lowest, 5=highest
case_id: str | None = None
alert_id: str | None = None

class AgentResponse(BaseModel):
"""Response from a specialist agent."""
task_id: str
agent_type: AgentType
result: dict[str, Any] = Field(default_factory=dict)
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
tool_calls: list[str] = Field(default_factory=list)
reasoning_steps: list[str] = Field(default_factory=list)
duration_ms: int = 0

Specialized CPS Models

class SensorTelemetry(BaseModel):
"""Telemetry data point from an edge sensor."""
device_id: str
metric: str # e.g., "temperature", "motion_count"
value: float
unit: str = "" # e.g., "celsius", "count"
timestamp: datetime = Field(default_factory=_utcnow)

class AttestationResult(BaseModel):
"""Hardware attestation verification result."""
device_id: str
status: AttestationStatus
firmware_hash: str
expected_hash: str
certificate_valid: bool
board_family: DeviceFamily
boot_count: int = 0

class PhysicalCyberEvent(BaseModel):
"""Correlated physical + cyber event."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
correlation_type: PhysicalCyberCorrelation
physical_event_id: str
cyber_event_ids: list[str] = Field(default_factory=list)
device_id: str
time_window_seconds: int = 120
risk_score: int = Field(ge=0, le=100, default=50)

class InvestigationReport(BaseModel):
"""Structured output from multi-agent investigation."""
case_id: str
severity: Severity
confidence: float = Field(ge=0.0, le=1.0)
mitre_techniques: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
agent_findings: dict[str, Any] = Field(default_factory=dict)

Model Relationship Map