Skip to main content

Database Models (SQLAlchemy)

AuroraSOC uses SQLAlchemy 2.0 with async support and PostgreSQL 16. The ORM layer provides type safety, relationship management, and migration support through Alembic.

Why SQLAlchemy 2.0 Async?

RequirementSolution
Non-blocking I/O in FastAPIAsyncSession with asyncpg driver
Type-safe column definitionsMapped[T] + mapped_column() annotations
Complex JSONB queriesPostgreSQL dialect with JSONB type
Relationship loadingLazy/eager loading via relationship()
Schema migrationsAlembic integration

Entity Relationship Diagram

Base Class

from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase

class Base(AsyncAttrs, DeclarativeBase):
"""SQLAlchemy declarative base for all AuroraSOC ORM models."""
pass

AsyncAttrs enables lazy-loading of relationships in async contexts — without it, accessing case.alerts would raise a synchronous I/O error.

All Models

AlertModel

Represents a security alert from any source — SIEM, EDR, CPS devices, or manual creation.

class AlertModel(Base):
__tablename__ = "alerts"

id: Mapped[uuid.UUID] # Primary key, auto-generated UUID
title: Mapped[str] # Max 500 chars
description: Mapped[str | None] # Full alert description
severity: Mapped[str] # critical|high|medium|low|info
status: Mapped[str] # new|triaged|investigating|escalated|resolved|false_positive
source: Mapped[str] # Origin system (e.g., "suricata", "crowdstrike")
asset_type: Mapped[str | None] # server|workstation|iot_sensor|cps_controller|...
affected_assets: Mapped[list] # JSONB array of asset identifiers
iocs: Mapped[list] # JSONB array of IOC objects
mitre_techniques: Mapped[list] # JSONB array (e.g., ["T1071.001", "T1048"])
raw_log: Mapped[str | None] # Original log entry
assigned_agent: Mapped[str | None] # Agent type handling this alert
case_id: Mapped[uuid.UUID | None] # FK → cases.id
dedup_hash: Mapped[str | None] # SHA-256 content hash for deduplication
metadata_: Mapped[dict | None] # JSONB, mapped to column "metadata"
created_at: Mapped[datetime] # Timezone-aware UTC
updated_at: Mapped[datetime] # Auto-updated on change

Key design decisions:

  • dedup_hash: Computed as SHA256(title|source|sorted_iocs). The scheduler runs deduplication every 5 minutes, merging alerts with matching hashes.
  • JSONB columns (iocs, mitre_techniques, affected_assets): PostgreSQL JSONB enables indexed queries on nested structures without schema changes.
  • metadata_"metadata": Python attribute renamed to avoid conflict with SQLAlchemy's internal metadata attribute.

Indexes:

IndexColumn(s)Purpose
idx_alerts_severityseverityFilter by severity level
idx_alerts_statusstatusFilter by workflow state
idx_alerts_created_atcreated_atTime-range queries
idx_alerts_dedup_hashdedup_hashFast dedup lookup

CaseModel

Represents an incident investigation case that groups related alerts.

class CaseModel(Base):
__tablename__ = "cases"

id: Mapped[uuid.UUID]
title: Mapped[str]
severity: Mapped[str]
status: Mapped[str] # open|in_progress|pending_approval|contained|...
description: Mapped[str | None]
assignee: Mapped[str | None] # Human analyst or agent name
iocs: Mapped[list | None] # Aggregated IOCs from all alerts
mitre_techniques: Mapped[list | None] # Aggregated MITRE techniques
affected_assets: Mapped[list | None]
cps_devices_involved: Mapped[list | None] # CPS device IDs if applicable
physical_impact: Mapped[str | None] # Physical safety assessment
recommended_actions: Mapped[list | None]
requires_human_approval: Mapped[bool] # Gates high-risk actions
confidence: Mapped[float] # AI confidence score 0.0-1.0
outcome: Mapped[str | None] # Post-resolution summary
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
closed_at: Mapped[datetime | None]

# Relationships
alerts: Mapped[list[AlertModel]] = relationship("AlertModel", back_populates="case")
timeline: Mapped[list[CaseTimelineModel]] = relationship(
"CaseTimelineModel", back_populates="case",
order_by="CaseTimelineModel.timestamp"
)

Why confidence as a float? AI agents provide a 0.0-1.0 confidence score for their assessment. Actions with confidence below 0.7 on critical assets auto-trigger requires_human_approval = True.

CaseTimelineModel

Audit trail of every agent action during an investigation.

class CaseTimelineModel(Base):
__tablename__ = "case_timeline"

id: Mapped[uuid.UUID]
case_id: Mapped[uuid.UUID] # FK → cases.id
agent: Mapped[str] # Which agent (e.g., "threat_hunter")
action: Mapped[str] # What it did (e.g., "Ran YARA scan")
details: Mapped[str | None] # Human-readable outcome
tool_calls: Mapped[list | None] # JSONB: which MCP tools were invoked
timestamp: Mapped[datetime]

CPSDeviceModel

Registry of all cyber-physical system and IoT devices.

class CPSDeviceModel(Base):
__tablename__ = "cps_devices"

id: Mapped[uuid.UUID]
device_id: Mapped[str] # Unique device identifier (e.g., "stm32_pac_01")
device_type: Mapped[str] # sensor|controller|gateway|plc
name: Mapped[str | None]
firmware_stack: Mapped[str] # ada_spark|rust_embassy|zephyr_rtos|unknown
firmware_version: Mapped[str]
firmware_hash: Mapped[str] # SHA-256 of firmware binary
attestation_status: Mapped[str] # valid|revoked|expired|unknown|failed
risk_score: Mapped[float] # 0.0-100.0 composite risk score
location: Mapped[str | None] # Physical location
ip_address: Mapped[str | None]
network_segment: Mapped[str | None]
certificate_serial: Mapped[str | None] # x.509 cert serial from Vault PKI
last_seen: Mapped[datetime | None]
last_attestation: Mapped[datetime | None]
metadata_: Mapped[dict | None]

AttestationResultModel

Records every firmware attestation verification result.

class AttestationResultModel(Base):
__tablename__ = "attestation_results"

id: Mapped[uuid.UUID]
device_id: Mapped[str]
status: Mapped[str] # valid|failed
firmware_hash: Mapped[str] # Reported hash
expected_hash: Mapped[str] # Known-good hash
certificate_valid: Mapped[bool]
board_family: Mapped[str] # stm32|nrf52|esp32s3
boot_count: Mapped[int] # Monotonic counter (detects reflash attacks)
nonce: Mapped[str | None] # Challenge nonce for replay protection
verified_at: Mapped[datetime]

PlaybookModel & PlaybookExecutionModel

SOAR playbook definitions and their execution records.

class PlaybookModel(Base):
__tablename__ = "playbooks"

id: Mapped[uuid.UUID]
name: Mapped[str]
description: Mapped[str | None]
severity_filter: Mapped[str | None] # Only trigger for matching severity
steps: Mapped[list] # JSONB: ordered action steps
enabled: Mapped[bool]
requires_approval: Mapped[bool] # Human approval before execution
created_at: Mapped[datetime]
updated_at: Mapped[datetime]


class PlaybookExecutionModel(Base):
__tablename__ = "playbook_executions"

id: Mapped[uuid.UUID]
playbook_id: Mapped[uuid.UUID] # FK → playbooks.id
case_id: Mapped[uuid.UUID | None] # FK → cases.id
status: Mapped[str] # pending|running|completed|failed|rolled_back
parameters: Mapped[dict | None] # JSONB: runtime parameters
target_assets: Mapped[list | None]
steps_completed: Mapped[int]
steps_total: Mapped[int]
step_results: Mapped[list | None] # JSONB: per-step outcome
error_message: Mapped[str | None]
dry_run: Mapped[bool] # Preview mode
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]

IOCModel

Persistent indicator of compromise storage with threat intelligence enrichment.

class IOCModel(Base):
__tablename__ = "iocs"

id: Mapped[uuid.UUID]
type: Mapped[str] # ip|domain|url|hash_md5|hash_sha256|email|cve|...
value: Mapped[str] # The actual indicator value
confidence: Mapped[float] # 0.0-1.0
source: Mapped[str] # Feed or agent that reported it
reputation: Mapped[str | None] # malicious|suspicious|clean
threat_actor: Mapped[str | None]
campaigns: Mapped[list | None] # JSONB: associated campaigns
tags: Mapped[list | None] # JSONB: classification tags
context: Mapped[dict | None] # JSONB: arbitrary context
enrichment: Mapped[dict | None] # JSONB: external enrichment data
first_seen: Mapped[datetime]
last_seen: Mapped[datetime]

__table_args__ = (
Index("idx_iocs_type_value", "type", "value", unique=True),
)

The unique composite index on (type, value) prevents duplicate IOCs — if the same IP appears from multiple sources, the existing record is updated rather than duplicated.

Remaining Models

ModelTablePurpose
AgentAuditModelagent_audit_logEvery agent action with timing
HumanApprovalModelhuman_approvalsApproval gates with 4-hour expiry
ReportModelreportsGenerated investigation reports (JSON/Markdown/PDF)

Database Initialization

# aurorasoc/core/database.py
async def init_db():
"""Initialize database with graceful fallback."""
try:
engine = create_async_engine(settings.postgres.url, pool_size=20, max_overflow=10)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
except Exception:
logger.warning("database_unavailable", msg="Running in degraded mode")

The API runs in degraded mode when PostgreSQL is unavailable, falling back to in-memory data stores. This ensures the dashboard remains functional during database maintenance.

Alembic Migrations

# Generate a new migration
alembic revision --autogenerate -m "Add new column"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1

The initial migration (001_initial_schema.py) creates all 11 tables with proper indexes, foreign keys, and PostgreSQL-specific types (JSONB, UUID).