Database Models (SQLAlchemy)
AuroraSOC uses SQLAlchemy 2.0 with async support and PostgreSQL 16. The ORM layer provides type safety, relationship management, and migration support through Alembic.
Why SQLAlchemy 2.0 Async?
| Requirement | Solution |
|---|---|
| Non-blocking I/O in FastAPI | AsyncSession with asyncpg driver |
| Type-safe column definitions | Mapped[T] + mapped_column() annotations |
| Complex JSONB queries | PostgreSQL dialect with JSONB type |
| Relationship loading | Lazy/eager loading via relationship() |
| Schema migrations | Alembic integration |
Entity Relationship Diagram
Base Class
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
class Base(AsyncAttrs, DeclarativeBase):
"""SQLAlchemy declarative base for all AuroraSOC ORM models."""
pass
AsyncAttrs enables lazy-loading of relationships in async contexts — without it, accessing case.alerts would raise a synchronous I/O error.
All Models
AlertModel
Represents a security alert from any source — SIEM, EDR, CPS devices, or manual creation.
class AlertModel(Base):
__tablename__ = "alerts"
id: Mapped[uuid.UUID] # Primary key, auto-generated UUID
title: Mapped[str] # Max 500 chars
description: Mapped[str | None] # Full alert description
severity: Mapped[str] # critical|high|medium|low|info
status: Mapped[str] # new|triaged|investigating|escalated|resolved|false_positive
source: Mapped[str] # Origin system (e.g., "suricata", "crowdstrike")
asset_type: Mapped[str | None] # server|workstation|iot_sensor|cps_controller|...
affected_assets: Mapped[list] # JSONB array of asset identifiers
iocs: Mapped[list] # JSONB array of IOC objects
mitre_techniques: Mapped[list] # JSONB array (e.g., ["T1071.001", "T1048"])
raw_log: Mapped[str | None] # Original log entry
assigned_agent: Mapped[str | None] # Agent type handling this alert
case_id: Mapped[uuid.UUID | None] # FK → cases.id
dedup_hash: Mapped[str | None] # SHA-256 content hash for deduplication
metadata_: Mapped[dict | None] # JSONB, mapped to column "metadata"
created_at: Mapped[datetime] # Timezone-aware UTC
updated_at: Mapped[datetime] # Auto-updated on change
Key design decisions:
dedup_hash: Computed asSHA256(title|source|sorted_iocs). The scheduler runs deduplication every 5 minutes, merging alerts with matching hashes.- JSONB columns (
iocs,mitre_techniques,affected_assets): PostgreSQL JSONB enables indexed queries on nested structures without schema changes. metadata_→"metadata": Python attribute renamed to avoid conflict with SQLAlchemy's internalmetadataattribute.
Indexes:
| Index | Column(s) | Purpose |
|---|---|---|
idx_alerts_severity | severity | Filter by severity level |
idx_alerts_status | status | Filter by workflow state |
idx_alerts_created_at | created_at | Time-range queries |
idx_alerts_dedup_hash | dedup_hash | Fast dedup lookup |
CaseModel
Represents an incident investigation case that groups related alerts.
class CaseModel(Base):
__tablename__ = "cases"
id: Mapped[uuid.UUID]
title: Mapped[str]
severity: Mapped[str]
status: Mapped[str] # open|in_progress|pending_approval|contained|...
description: Mapped[str | None]
assignee: Mapped[str | None] # Human analyst or agent name
iocs: Mapped[list | None] # Aggregated IOCs from all alerts
mitre_techniques: Mapped[list | None] # Aggregated MITRE techniques
affected_assets: Mapped[list | None]
cps_devices_involved: Mapped[list | None] # CPS device IDs if applicable
physical_impact: Mapped[str | None] # Physical safety assessment
recommended_actions: Mapped[list | None]
requires_human_approval: Mapped[bool] # Gates high-risk actions
confidence: Mapped[float] # AI confidence score 0.0-1.0
outcome: Mapped[str | None] # Post-resolution summary
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
closed_at: Mapped[datetime | None]
# Relationships
alerts: Mapped[list[AlertModel]] = relationship("AlertModel", back_populates="case")
timeline: Mapped[list[CaseTimelineModel]] = relationship(
"CaseTimelineModel", back_populates="case",
order_by="CaseTimelineModel.timestamp"
)
Why confidence as a float? AI agents provide a 0.0-1.0 confidence score for their assessment. Actions with confidence below 0.7 on critical assets auto-trigger requires_human_approval = True.
CaseTimelineModel
Audit trail of every agent action during an investigation.
class CaseTimelineModel(Base):
__tablename__ = "case_timeline"
id: Mapped[uuid.UUID]
case_id: Mapped[uuid.UUID] # FK → cases.id
agent: Mapped[str] # Which agent (e.g., "threat_hunter")
action: Mapped[str] # What it did (e.g., "Ran YARA scan")
details: Mapped[str | None] # Human-readable outcome
tool_calls: Mapped[list | None] # JSONB: which MCP tools were invoked
timestamp: Mapped[datetime]
CPSDeviceModel
Registry of all cyber-physical system and IoT devices.
class CPSDeviceModel(Base):
__tablename__ = "cps_devices"
id: Mapped[uuid.UUID]
device_id: Mapped[str] # Unique device identifier (e.g., "stm32_pac_01")
device_type: Mapped[str] # sensor|controller|gateway|plc
name: Mapped[str | None]
firmware_stack: Mapped[str] # ada_spark|rust_embassy|zephyr_rtos|unknown
firmware_version: Mapped[str]
firmware_hash: Mapped[str] # SHA-256 of firmware binary
attestation_status: Mapped[str] # valid|revoked|expired|unknown|failed
risk_score: Mapped[float] # 0.0-100.0 composite risk score
location: Mapped[str | None] # Physical location
ip_address: Mapped[str | None]
network_segment: Mapped[str | None]
certificate_serial: Mapped[str | None] # x.509 cert serial from Vault PKI
last_seen: Mapped[datetime | None]
last_attestation: Mapped[datetime | None]
metadata_: Mapped[dict | None]
AttestationResultModel
Records every firmware attestation verification result.
class AttestationResultModel(Base):
__tablename__ = "attestation_results"
id: Mapped[uuid.UUID]
device_id: Mapped[str]
status: Mapped[str] # valid|failed
firmware_hash: Mapped[str] # Reported hash
expected_hash: Mapped[str] # Known-good hash
certificate_valid: Mapped[bool]
board_family: Mapped[str] # stm32|nrf52|esp32s3
boot_count: Mapped[int] # Monotonic counter (detects reflash attacks)
nonce: Mapped[str | None] # Challenge nonce for replay protection
verified_at: Mapped[datetime]
PlaybookModel & PlaybookExecutionModel
SOAR playbook definitions and their execution records.
class PlaybookModel(Base):
__tablename__ = "playbooks"
id: Mapped[uuid.UUID]
name: Mapped[str]
description: Mapped[str | None]
severity_filter: Mapped[str | None] # Only trigger for matching severity
steps: Mapped[list] # JSONB: ordered action steps
enabled: Mapped[bool]
requires_approval: Mapped[bool] # Human approval before execution
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
class PlaybookExecutionModel(Base):
__tablename__ = "playbook_executions"
id: Mapped[uuid.UUID]
playbook_id: Mapped[uuid.UUID] # FK → playbooks.id
case_id: Mapped[uuid.UUID | None] # FK → cases.id
status: Mapped[str] # pending|running|completed|failed|rolled_back
parameters: Mapped[dict | None] # JSONB: runtime parameters
target_assets: Mapped[list | None]
steps_completed: Mapped[int]
steps_total: Mapped[int]
step_results: Mapped[list | None] # JSONB: per-step outcome
error_message: Mapped[str | None]
dry_run: Mapped[bool] # Preview mode
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]
IOCModel
Persistent indicator of compromise storage with threat intelligence enrichment.
class IOCModel(Base):
__tablename__ = "iocs"
id: Mapped[uuid.UUID]
type: Mapped[str] # ip|domain|url|hash_md5|hash_sha256|email|cve|...
value: Mapped[str] # The actual indicator value
confidence: Mapped[float] # 0.0-1.0
source: Mapped[str] # Feed or agent that reported it
reputation: Mapped[str | None] # malicious|suspicious|clean
threat_actor: Mapped[str | None]
campaigns: Mapped[list | None] # JSONB: associated campaigns
tags: Mapped[list | None] # JSONB: classification tags
context: Mapped[dict | None] # JSONB: arbitrary context
enrichment: Mapped[dict | None] # JSONB: external enrichment data
first_seen: Mapped[datetime]
last_seen: Mapped[datetime]
__table_args__ = (
Index("idx_iocs_type_value", "type", "value", unique=True),
)
The unique composite index on (type, value) prevents duplicate IOCs — if the same IP appears from multiple sources, the existing record is updated rather than duplicated.
Remaining Models
| Model | Table | Purpose |
|---|---|---|
AgentAuditModel | agent_audit_log | Every agent action with timing |
HumanApprovalModel | human_approvals | Approval gates with 4-hour expiry |
ReportModel | reports | Generated investigation reports (JSON/Markdown/PDF) |
Database Initialization
# aurorasoc/core/database.py
async def init_db():
"""Initialize database with graceful fallback."""
try:
engine = create_async_engine(settings.postgres.url, pool_size=20, max_overflow=10)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
except Exception:
logger.warning("database_unavailable", msg="Running in degraded mode")
The API runs in degraded mode when PostgreSQL is unavailable, falling back to in-memory data stores. This ensures the dashboard remains functional during database maintenance.
Alembic Migrations
# Generate a new migration
alembic revision --autogenerate -m "Add new column"
# Apply migrations
alembic upgrade head
# Rollback one step
alembic downgrade -1
The initial migration (001_initial_schema.py) creates all 11 tables with proper indexes, foreign keys, and PostgreSQL-specific types (JSONB, UUID).