نماذج المجال (Pydantic)
يعرّف AuroraSOC مجاله الوظيفي باستخدام أصناف Pydantic من نوع BaseModel لأغراض التحقق والتسلسل وأمان الأنواع. هذه النماذج منفصلة عن نماذج ORM، إذ تمثل طبقة النقل المستخدمة في تواصل API ورسائل الوكلاء وحمولات الأحداث.
لماذا طبقتان للنماذج؟
| الطبقة | الإطار | الغرض |
|---|---|---|
| النقل | Pydantic | التحقق والتسلسل ومخططات API |
| الاستمرارية | SQLAlchemy | عمليات CRUD والعلاقات والترحيلات في قاعدة البيانات |
يسمح الفصل بينهما بتطور مخططات API بشكل مستقل عن مخطط قاعدة البيانات. قد يتضمن التنبيه المُرسل عبر Redis Streams حقولًا غير موجودة في قاعدة البيانات (مثل assigned_agent كتعداد)، بينما تخزن قاعدة البيانات حقولًا لا تحتاجها طبقة النقل (مثل dedup_hash).
التعدادات
يعرّف AuroraSOC عشرة تعدادات باستخدام نمط Python str, Enum لتسلسل JSON:
مستويات الشدة
class Severity(str, Enum):
CRITICAL = "critical" # Immediate response required
HIGH = "high" # Response within 1 hour
MEDIUM = "medium" # Response within 4 hours
LOW = "low" # Response within 24 hours
INFO = "info" # Informational only
دورة حياة التنبيه
class AlertStatus(str, Enum):
NEW = "new" # Just ingested
TRIAGED = "triaged" # Severity assessed by orchestrator
INVESTIGATING = "investigating" # Agent actively analyzing
ESCALATED = "escalated" # Elevated to human analyst
RESOLVED = "resolved" # Investigation complete
FALSE_POSITIVE = "false_positive"
دورة حياة القضية
class CaseStatus(str, Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
PENDING_APPROVAL = "pending_approval" # Human gate
CONTAINED = "contained"
ERADICATED = "eradicated"
RECOVERED = "recovered"
CLOSED = "closed"
أنواع IOC
class IOCType(str, Enum):
IP = "ip"
DOMAIN = "domain"
URL = "url"
HASH_MD5 = "hash_md5"
HASH_SHA1 = "hash_sha1"
HASH_SHA256 = "hash_sha256"
EMAIL = "email"
CVE = "cve"
FILE_PATH = "file_path"
REGISTRY_KEY = "registry_key"
MUTEX = "mutex"
USER_AGENT = "user_agent"
أنواع الأصول
class AssetType(str, Enum):
SERVER = "server"
WORKSTATION = "workstation"
NETWORK_DEVICE = "network_device"
IOT_SENSOR = "iot_sensor"
CPS_CONTROLLER = "cps_controller"
OT_PLC = "ot_plc"
CLOUD_INSTANCE = "cloud_instance"
CONTAINER = "container"
MOBILE = "mobile"
تعدادات خاصة بـ CPS
class DeviceFamily(str, Enum):
STM32 = "stm32"
NRF52 = "nrf52"
ESP32S3 = "esp32s3"
UNKNOWN = "unknown"
class FirmwareStack(str, Enum):
ADA_SPARK = "ada_spark" # STM32 — formal verification
RUST_EMBASSY = "rust_embassy" # nRF52840 — memory safety
ZEPHYR_RTOS = "zephyr_rtos" # ESP32-S3 — real-time OS
UNKNOWN = "unknown"
class AttestationStatus(str, Enum):
VALID = "valid"
REVOKED = "revoked"
EXPIRED = "expired"
UNKNOWN = "unknown"
FAILED = "failed"
class PhysicalCyberCorrelation(str, Enum):
PHYSICAL_ONLY = "physical_only"
CYBER_ONLY = "cyber_only"
CORRELATED = "correlated" # Both physical + cyber events detected
ESCALATED = "escalated" # Correlated and escalated to human
أنواع الوكلاء
class AgentType(str, Enum):
ORCHESTRATOR = "orchestrator"
SECURITY_ANALYST = "security_analyst"
THREAT_HUNTER = "threat_hunter"
MALWARE_ANALYST = "malware_analyst"
INCIDENT_RESPONDER = "incident_responder"
NETWORK_SECURITY = "network_security"
WEB_SECURITY = "web_security"
CLOUD_SECURITY = "cloud_security"
THREAT_INTEL = "threat_intel"
CPS_SECURITY = "cps_security"
ENDPOINT_SECURITY = "endpoint_security"
UEBA_ANALYST = "ueba_analyst"
FORENSIC_ANALYST = "forensic_analyst"
COMPLIANCE_ANALYST = "compliance_analyst"
VULNERABILITY_MANAGER = "vulnerability_manager"
REPORT_GENERATOR = "report_generator"
نماذج النقل
IOC
class IOC(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
type: IOCType
value: str
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
source: str = ""
first_seen: datetime = Field(default_factory=_utcnow)
last_seen: datetime = Field(default_factory=_utcnow)
tags: list[str] = Field(default_factory=list)
context: dict[str, Any] = Field(default_factory=dict)
Alert
class Alert(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
description: str = ""
severity: Severity = Severity.MEDIUM
status: AlertStatus = AlertStatus.NEW
source: str = ""
asset_type: AssetType = AssetType.SERVER
affected_assets: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
raw_log: str = ""
timestamp: datetime = Field(default_factory=_utcnow)
assigned_agent: AgentType | None = None
case_id: str | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
Case
class Case(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
severity: Severity = Severity.MEDIUM
status: CaseStatus = CaseStatus.OPEN
alert_ids: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
affected_assets: list[str] = Field(default_factory=list)
cps_devices_involved: list[str] = Field(default_factory=list)
physical_impact_assessment: str = ""
timeline: list[CaseTimelineEntry] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
assigned_agents: list[AgentType] = Field(default_factory=list)
created_at: datetime = Field(default_factory=_utcnow)
updated_at: datetime = Field(default_factory=_utcnow)
closed_at: datetime | None = None
outcome: str = ""
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
CPS Device
class CPSDevice(BaseModel):
id: str
name: str
family: DeviceFamily
firmware_stack: FirmwareStack
firmware_version: str = ""
firmware_hash: str = ""
location: str = ""
ip_address: str = ""
attestation_status: AttestationStatus = AttestationStatus.UNKNOWN
risk_score: int = Field(ge=0, le=100, default=0)
network_segment: str = ""
tags: list[str] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
registered_at: datetime = Field(default_factory=_utcnow)
تواصل الوكلاء
class AgentTask(BaseModel):
"""Task dispatched to a specialist agent."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
agent_type: AgentType
prompt: str
context: dict[str, Any] = Field(default_factory=dict)
priority: int = Field(ge=1, le=5, default=3) # 1=lowest, 5=highest
case_id: str | None = None
alert_id: str | None = None
class AgentResponse(BaseModel):
"""Response from a specialist agent."""
task_id: str
agent_type: AgentType
result: dict[str, Any] = Field(default_factory=dict)
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
tool_calls: list[str] = Field(default_factory=list)
reasoning_steps: list[str] = Field(default_factory=list)
duration_ms: int = 0
نماذج CPS المتخصصة
class SensorTelemetry(BaseModel):
"""Telemetry data point from an edge sensor."""
device_id: str
metric: str # e.g., "temperature", "motion_count"
value: float
unit: str = "" # e.g., "celsius", "count"
timestamp: datetime = Field(default_factory=_utcnow)
class AttestationResult(BaseModel):
"""Hardware attestation verification result."""
device_id: str
status: AttestationStatus
firmware_hash: str
expected_hash: str
certificate_valid: bool
board_family: DeviceFamily
boot_count: int = 0
class PhysicalCyberEvent(BaseModel):
"""Correlated physical + cyber event."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
correlation_type: PhysicalCyberCorrelation
physical_event_id: str
cyber_event_ids: list[str] = Field(default_factory=list)
device_id: str
time_window_seconds: int = 120
risk_score: int = Field(ge=0, le=100, default=50)
class InvestigationReport(BaseModel):
"""Structured output from multi-agent investigation."""
case_id: str
severity: Severity
confidence: float = Field(ge=0.0, le=1.0)
mitre_techniques: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
agent_findings: dict[str, Any] = Field(default_factory=dict)