انتقل إلى المحتوى الرئيسي

نماذج قاعدة البيانات (SQLAlchemy)

يستخدم AuroraSOC مكتبة SQLAlchemy 2.0 مع دعم async وقاعدة PostgreSQL 16. توفّر طبقة ORM أمانًا نوعيًا وإدارةً للعلاقات ودعمًا للترحيل عبر Alembic.

لماذا SQLAlchemy 2.0 Async؟

المتطلبالحل
إدخال/إخراج غير حاجب في FastAPIAsyncSession مع برنامج التشغيل asyncpg
تعريفات أعمدة آمنة نوعيًاتوضيحات Mapped[T] + mapped_column()
استعلامات JSONB معقدةلهجة PostgreSQL مع النوع JSONB
تحميل العلاقاتتحميل كسول/مبكر عبر relationship()
ترحيل المخططتكامل Alembic

مخطط علاقات الكيانات

الصنف الأساسي

from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase

class Base(AsyncAttrs, DeclarativeBase):
"""SQLAlchemy declarative base for all AuroraSOC ORM models."""
pass

يتيح AsyncAttrs التحميل الكسول للعلاقات ضمن سياقات async. بدونه، سيؤدي الوصول إلى case.alerts إلى خطأ إدخال/إخراج متزامن.

جميع النماذج

AlertModel

يمثل تنبيهًا أمنيًا من أي مصدر: SIEM أو EDR أو أجهزة CPS أو الإنشاء اليدوي.

class AlertModel(Base):
__tablename__ = "alerts"

id: Mapped[uuid.UUID] # Primary key, auto-generated UUID
title: Mapped[str] # Max 500 chars
description: Mapped[str | None] # Full alert description
severity: Mapped[str] # critical|high|medium|low|info
status: Mapped[str] # new|triaged|investigating|escalated|resolved|false_positive
source: Mapped[str] # Origin system (e.g., "suricata", "crowdstrike")
asset_type: Mapped[str | None] # server|workstation|iot_sensor|cps_controller|...
affected_assets: Mapped[list] # JSONB array of asset identifiers
iocs: Mapped[list] # JSONB array of IOC objects
mitre_techniques: Mapped[list] # JSONB array (e.g., ["T1071.001", "T1048"])
raw_log: Mapped[str | None] # Original log entry
assigned_agent: Mapped[str | None] # Agent type handling this alert
case_id: Mapped[uuid.UUID | None] # FK → cases.id
dedup_hash: Mapped[str | None] # SHA-256 content hash for deduplication
metadata_: Mapped[dict | None] # JSONB, mapped to column "metadata"
created_at: Mapped[datetime] # Timezone-aware UTC
updated_at: Mapped[datetime] # Auto-updated on change

قرارات تصميم أساسية:

  • dedup_hash: يُحسب بصيغة SHA256(title|source|sorted_iocs). يشغّل المجدول إزالة التكرار كل 5 دقائق، ويدمج التنبيهات ذات البصمة المتطابقة.
  • أعمدة JSONB (iocs, mitre_techniques, affected_assets): يتيح PostgreSQL JSONB استعلامات مفهرسة على البنى المتداخلة دون تغيير المخطط.
  • metadata_"metadata": تمت إعادة تسمية خاصية Python لتجنب التعارض مع الخاصية الداخلية metadata في SQLAlchemy.

الفهارس:

الفهرسالعمود/الأعمدةالغرض
idx_alerts_severityseverityالتصفية حسب مستوى الشدة
idx_alerts_statusstatusالتصفية حسب حالة سير العمل
idx_alerts_created_atcreated_atاستعلامات نطاق زمني
idx_alerts_dedup_hashdedup_hashبحث سريع لإزالة التكرار

CaseModel

يمثل قضية تحقيق في حادثة تجمع التنبيهات المرتبطة.

class CaseModel(Base):
__tablename__ = "cases"

id: Mapped[uuid.UUID]
title: Mapped[str]
severity: Mapped[str]
status: Mapped[str] # open|in_progress|pending_approval|contained|...
description: Mapped[str | None]
assignee: Mapped[str | None] # Human analyst or agent name
iocs: Mapped[list | None] # Aggregated IOCs from all alerts
mitre_techniques: Mapped[list | None] # Aggregated MITRE techniques
affected_assets: Mapped[list | None]
cps_devices_involved: Mapped[list | None] # CPS device IDs if applicable
physical_impact: Mapped[str | None] # Physical safety assessment
recommended_actions: Mapped[list | None]
requires_human_approval: Mapped[bool] # Gates high-risk actions
confidence: Mapped[float] # AI confidence score 0.0-1.0
outcome: Mapped[str | None] # Post-resolution summary
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
closed_at: Mapped[datetime | None]

# Relationships
alerts: Mapped[list[AlertModel]] = relationship("AlertModel", back_populates="case")
timeline: Mapped[list[CaseTimelineModel]] = relationship(
"CaseTimelineModel", back_populates="case",
order_by="CaseTimelineModel.timestamp"
)

لماذا confidence كقيمة float؟ توفّر وكلاء الذكاء الاصطناعي درجة ثقة بين 0.0 و1.0 للتقييم. الإجراءات ذات الثقة الأقل من 0.7 على الأصول الحرجة تفعّل تلقائيًا requires_human_approval = True.

CaseTimelineModel

مسار تدقيق لكل إجراء يتخذه وكيل أثناء التحقيق.

class CaseTimelineModel(Base):
__tablename__ = "case_timeline"

id: Mapped[uuid.UUID]
case_id: Mapped[uuid.UUID] # FK → cases.id
agent: Mapped[str] # Which agent (e.g., "threat_hunter")
action: Mapped[str] # What it did (e.g., "Ran YARA scan")
details: Mapped[str | None] # Human-readable outcome
tool_calls: Mapped[list | None] # JSONB: which MCP tools were invoked
timestamp: Mapped[datetime]

CPSDeviceModel

سجل لجميع أجهزة الأنظمة السيبرانية-الفيزيائية وأجهزة IoT.

class CPSDeviceModel(Base):
__tablename__ = "cps_devices"

id: Mapped[uuid.UUID]
device_id: Mapped[str] # Unique device identifier (e.g., "stm32_pac_01")
device_type: Mapped[str] # sensor|controller|gateway|plc
name: Mapped[str | None]
firmware_stack: Mapped[str] # ada_spark|rust_embassy|zephyr_rtos|unknown
firmware_version: Mapped[str]
firmware_hash: Mapped[str] # SHA-256 of firmware binary
attestation_status: Mapped[str] # valid|revoked|expired|unknown|failed
risk_score: Mapped[float] # 0.0-100.0 composite risk score
location: Mapped[str | None] # Physical location
ip_address: Mapped[str | None]
network_segment: Mapped[str | None]
certificate_serial: Mapped[str | None] # x.509 cert serial from Vault PKI
last_seen: Mapped[datetime | None]
last_attestation: Mapped[datetime | None]
metadata_: Mapped[dict | None]

AttestationResultModel

يسجل كل نتيجة تحقق من Attestation للبرمجيات الثابتة.

class AttestationResultModel(Base):
__tablename__ = "attestation_results"

id: Mapped[uuid.UUID]
device_id: Mapped[str]
status: Mapped[str] # valid|failed
firmware_hash: Mapped[str] # Reported hash
expected_hash: Mapped[str] # Known-good hash
certificate_valid: Mapped[bool]
board_family: Mapped[str] # stm32|nrf52|esp32s3
boot_count: Mapped[int] # Monotonic counter (detects reflash attacks)
nonce: Mapped[str | None] # Challenge nonce for replay protection
verified_at: Mapped[datetime]

PlaybookModel & PlaybookExecutionModel

تعريفات Playbook الخاصة بـ SOAR وسجلات تنفيذها.

class PlaybookModel(Base):
__tablename__ = "playbooks"

id: Mapped[uuid.UUID]
name: Mapped[str]
description: Mapped[str | None]
severity_filter: Mapped[str | None] # Only trigger for matching severity
steps: Mapped[list] # JSONB: ordered action steps
enabled: Mapped[bool]
requires_approval: Mapped[bool] # Human approval before execution
created_at: Mapped[datetime]
updated_at: Mapped[datetime]


class PlaybookExecutionModel(Base):
__tablename__ = "playbook_executions"

id: Mapped[uuid.UUID]
playbook_id: Mapped[uuid.UUID] # FK → playbooks.id
case_id: Mapped[uuid.UUID | None] # FK → cases.id
status: Mapped[str] # pending|running|completed|failed|rolled_back
parameters: Mapped[dict | None] # JSONB: runtime parameters
target_assets: Mapped[list | None]
steps_completed: Mapped[int]
steps_total: Mapped[int]
step_results: Mapped[list | None] # JSONB: per-step outcome
error_message: Mapped[str | None]
dry_run: Mapped[bool] # Preview mode
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]

IOCModel

تخزين دائم لمؤشرات الاختراق مع إثراء استخباراتي للتهديدات.

class IOCModel(Base):
__tablename__ = "iocs"

id: Mapped[uuid.UUID]
type: Mapped[str] # ip|domain|url|hash_md5|hash_sha256|email|cve|...
value: Mapped[str] # The actual indicator value
confidence: Mapped[float] # 0.0-1.0
source: Mapped[str] # Feed or agent that reported it
reputation: Mapped[str | None] # malicious|suspicious|clean
threat_actor: Mapped[str | None]
campaigns: Mapped[list | None] # JSONB: associated campaigns
tags: Mapped[list | None] # JSONB: classification tags
context: Mapped[dict | None] # JSONB: arbitrary context
enrichment: Mapped[dict | None] # JSONB: external enrichment data
first_seen: Mapped[datetime]
last_seen: Mapped[datetime]

__table_args__ = (
Index("idx_iocs_type_value", "type", "value", unique=True),
)

يمنع الفهرس المركب الفريد على (type, value) تكرار مؤشرات الاختراق. إذا ظهر نفس IP من مصادر متعددة، يُحدَّث السجل القائم بدل إنشاء سجل مكرر.

النماذج المتبقية

النموذجالجدولالغرض
AgentAuditModelagent_audit_logكل إجراء وكيل مع التوقيت
HumanApprovalModelhuman_approvalsبوابات الموافقة مع انتهاء بعد 4 ساعات
ReportModelreportsتقارير التحقيق المُولدة (JSON/Markdown/PDF)

تهيئة قاعدة البيانات

# aurorasoc/core/database.py
async def init_db():
"""Initialize database with graceful fallback."""
try:
engine = create_async_engine(settings.postgres.url, pool_size=20, max_overflow=10)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
except Exception:
logger.warning("database_unavailable", msg="Running in degraded mode")

تعمل API في وضع متدهور عند عدم توفر PostgreSQL، مع الرجوع إلى مخازن بيانات داخل الذاكرة. يضمن ذلك بقاء لوحة التحكم عاملة أثناء صيانة قاعدة البيانات.

ترحيلات Alembic

# Generate a new migration
alembic revision --autogenerate -m "Add new column"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1

ينشئ الترحيل الأولي (001_initial_schema.py) جميع الجداول الأحد عشر مع الفهارس والمفاتيح الخارجية وأنواع PostgreSQL الخاصة (JSONB وUUID) بصورة صحيحة.