انتقل إلى المحتوى الرئيسي

نماذج المجال (Pydantic)

يعرّف AuroraSOC مجاله الوظيفي باستخدام أصناف Pydantic من نوع BaseModel لأغراض التحقق والتسلسل وأمان الأنواع. هذه النماذج منفصلة عن نماذج ORM، إذ تمثل طبقة النقل المستخدمة في تواصل API ورسائل الوكلاء وحمولات الأحداث.

لماذا طبقتان للنماذج؟

الطبقةالإطارالغرض
النقلPydanticالتحقق والتسلسل ومخططات API
الاستمراريةSQLAlchemyعمليات CRUD والعلاقات والترحيلات في قاعدة البيانات

يسمح الفصل بينهما بتطور مخططات API بشكل مستقل عن مخطط قاعدة البيانات. قد يتضمن التنبيه المُرسل عبر Redis Streams حقولًا غير موجودة في قاعدة البيانات (مثل assigned_agent كتعداد)، بينما تخزن قاعدة البيانات حقولًا لا تحتاجها طبقة النقل (مثل dedup_hash).

التعدادات

يعرّف AuroraSOC عشرة تعدادات باستخدام نمط Python str, Enum لتسلسل JSON:

مستويات الشدة

class Severity(str, Enum):
CRITICAL = "critical" # Immediate response required
HIGH = "high" # Response within 1 hour
MEDIUM = "medium" # Response within 4 hours
LOW = "low" # Response within 24 hours
INFO = "info" # Informational only

دورة حياة التنبيه

class AlertStatus(str, Enum):
NEW = "new" # Just ingested
TRIAGED = "triaged" # Severity assessed by orchestrator
INVESTIGATING = "investigating" # Agent actively analyzing
ESCALATED = "escalated" # Elevated to human analyst
RESOLVED = "resolved" # Investigation complete
FALSE_POSITIVE = "false_positive"

دورة حياة القضية

class CaseStatus(str, Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
PENDING_APPROVAL = "pending_approval" # Human gate
CONTAINED = "contained"
ERADICATED = "eradicated"
RECOVERED = "recovered"
CLOSED = "closed"

أنواع IOC

class IOCType(str, Enum):
IP = "ip"
DOMAIN = "domain"
URL = "url"
HASH_MD5 = "hash_md5"
HASH_SHA1 = "hash_sha1"
HASH_SHA256 = "hash_sha256"
EMAIL = "email"
CVE = "cve"
FILE_PATH = "file_path"
REGISTRY_KEY = "registry_key"
MUTEX = "mutex"
USER_AGENT = "user_agent"

أنواع الأصول

class AssetType(str, Enum):
SERVER = "server"
WORKSTATION = "workstation"
NETWORK_DEVICE = "network_device"
IOT_SENSOR = "iot_sensor"
CPS_CONTROLLER = "cps_controller"
OT_PLC = "ot_plc"
CLOUD_INSTANCE = "cloud_instance"
CONTAINER = "container"
MOBILE = "mobile"

تعدادات خاصة بـ CPS

class DeviceFamily(str, Enum):
STM32 = "stm32"
NRF52 = "nrf52"
ESP32S3 = "esp32s3"
UNKNOWN = "unknown"

class FirmwareStack(str, Enum):
ADA_SPARK = "ada_spark" # STM32 — formal verification
RUST_EMBASSY = "rust_embassy" # nRF52840 — memory safety
ZEPHYR_RTOS = "zephyr_rtos" # ESP32-S3 — real-time OS
UNKNOWN = "unknown"

class AttestationStatus(str, Enum):
VALID = "valid"
REVOKED = "revoked"
EXPIRED = "expired"
UNKNOWN = "unknown"
FAILED = "failed"

class PhysicalCyberCorrelation(str, Enum):
PHYSICAL_ONLY = "physical_only"
CYBER_ONLY = "cyber_only"
CORRELATED = "correlated" # Both physical + cyber events detected
ESCALATED = "escalated" # Correlated and escalated to human

أنواع الوكلاء

class AgentType(str, Enum):
ORCHESTRATOR = "orchestrator"
SECURITY_ANALYST = "security_analyst"
THREAT_HUNTER = "threat_hunter"
MALWARE_ANALYST = "malware_analyst"
INCIDENT_RESPONDER = "incident_responder"
NETWORK_SECURITY = "network_security"
WEB_SECURITY = "web_security"
CLOUD_SECURITY = "cloud_security"
THREAT_INTEL = "threat_intel"
CPS_SECURITY = "cps_security"
ENDPOINT_SECURITY = "endpoint_security"
UEBA_ANALYST = "ueba_analyst"
FORENSIC_ANALYST = "forensic_analyst"
COMPLIANCE_ANALYST = "compliance_analyst"
VULNERABILITY_MANAGER = "vulnerability_manager"
REPORT_GENERATOR = "report_generator"

نماذج النقل

IOC

class IOC(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
type: IOCType
value: str
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
source: str = ""
first_seen: datetime = Field(default_factory=_utcnow)
last_seen: datetime = Field(default_factory=_utcnow)
tags: list[str] = Field(default_factory=list)
context: dict[str, Any] = Field(default_factory=dict)

Alert

class Alert(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
description: str = ""
severity: Severity = Severity.MEDIUM
status: AlertStatus = AlertStatus.NEW
source: str = ""
asset_type: AssetType = AssetType.SERVER
affected_assets: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
raw_log: str = ""
timestamp: datetime = Field(default_factory=_utcnow)
assigned_agent: AgentType | None = None
case_id: str | None = None
metadata: dict[str, Any] = Field(default_factory=dict)

Case

class Case(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
severity: Severity = Severity.MEDIUM
status: CaseStatus = CaseStatus.OPEN
alert_ids: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
mitre_techniques: list[str] = Field(default_factory=list)
affected_assets: list[str] = Field(default_factory=list)
cps_devices_involved: list[str] = Field(default_factory=list)
physical_impact_assessment: str = ""
timeline: list[CaseTimelineEntry] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
assigned_agents: list[AgentType] = Field(default_factory=list)
created_at: datetime = Field(default_factory=_utcnow)
updated_at: datetime = Field(default_factory=_utcnow)
closed_at: datetime | None = None
outcome: str = ""
confidence: float = Field(ge=0.0, le=1.0, default=0.5)

CPS Device

class CPSDevice(BaseModel):
id: str
name: str
family: DeviceFamily
firmware_stack: FirmwareStack
firmware_version: str = ""
firmware_hash: str = ""
location: str = ""
ip_address: str = ""
attestation_status: AttestationStatus = AttestationStatus.UNKNOWN
risk_score: int = Field(ge=0, le=100, default=0)
network_segment: str = ""
tags: list[str] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
registered_at: datetime = Field(default_factory=_utcnow)

تواصل الوكلاء

class AgentTask(BaseModel):
"""Task dispatched to a specialist agent."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
agent_type: AgentType
prompt: str
context: dict[str, Any] = Field(default_factory=dict)
priority: int = Field(ge=1, le=5, default=3) # 1=lowest, 5=highest
case_id: str | None = None
alert_id: str | None = None

class AgentResponse(BaseModel):
"""Response from a specialist agent."""
task_id: str
agent_type: AgentType
result: dict[str, Any] = Field(default_factory=dict)
confidence: float = Field(ge=0.0, le=1.0, default=0.5)
tool_calls: list[str] = Field(default_factory=list)
reasoning_steps: list[str] = Field(default_factory=list)
duration_ms: int = 0

نماذج CPS المتخصصة

class SensorTelemetry(BaseModel):
"""Telemetry data point from an edge sensor."""
device_id: str
metric: str # e.g., "temperature", "motion_count"
value: float
unit: str = "" # e.g., "celsius", "count"
timestamp: datetime = Field(default_factory=_utcnow)

class AttestationResult(BaseModel):
"""Hardware attestation verification result."""
device_id: str
status: AttestationStatus
firmware_hash: str
expected_hash: str
certificate_valid: bool
board_family: DeviceFamily
boot_count: int = 0

class PhysicalCyberEvent(BaseModel):
"""Correlated physical + cyber event."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
correlation_type: PhysicalCyberCorrelation
physical_event_id: str
cyber_event_ids: list[str] = Field(default_factory=list)
device_id: str
time_window_seconds: int = 120
risk_score: int = Field(ge=0, le=100, default=50)

class InvestigationReport(BaseModel):
"""Structured output from multi-agent investigation."""
case_id: str
severity: Severity
confidence: float = Field(ge=0.0, le=1.0)
mitre_techniques: list[str] = Field(default_factory=list)
iocs: list[IOC] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
requires_human_approval: bool = False
agent_findings: dict[str, Any] = Field(default_factory=dict)

خريطة علاقات النماذج