Skip to main content

Database Models (SQLAlchemy)

AuroraSOC uses SQLAlchemy 2.0 with async support and PostgreSQL 16. The ORM layer provides type safety, relationship management, and migration support through Alembic.

Why SQLAlchemy 2.0 Async?

RequirementSolution
Non-blocking I/O in FastAPIAsyncSession with asyncpg driver
Type-safe column definitionsMapped[T] + mapped_column() annotations
Complex JSONB queriesPostgreSQL dialect with JSONB type
Relationship loadingLazy/eager loading via relationship()
Schema migrationsAlembic integration

Entity Relationship Diagram

Base Class

from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase

class Base(AsyncAttrs, DeclarativeBase):
"""SQLAlchemy declarative base for all AuroraSOC ORM models."""
pass

AsyncAttrs enables lazy-loading of relationships in async contexts — without it, accessing case.alerts would raise a synchronous I/O error.

All Models

AlertModel

Represents a security alert from any source — SIEM, EDR, CPS devices, or manual creation.

class AlertModel(Base):
__tablename__ = "alerts"

id: Mapped[uuid.UUID] # Primary key, auto-generated UUID
title: Mapped[str] # Max 500 chars
description: Mapped[str | None] # Full alert description
severity: Mapped[str] # critical|high|medium|low|info
status: Mapped[str] # new|triaged|investigating|escalated|resolved|false_positive
source: Mapped[str] # Origin system (e.g., "suricata", "crowdstrike")
asset_type: Mapped[str | None] # server|workstation|iot_sensor|cps_controller|...
affected_assets: Mapped[list] # JSONB array of asset identifiers
iocs: Mapped[list] # JSONB array of IOC objects
mitre_techniques: Mapped[list] # JSONB array (e.g., ["T1071.001", "T1048"])
raw_log: Mapped[str | None] # Original log entry
assigned_agent: Mapped[str | None] # Agent type handling this alert
case_id: Mapped[uuid.UUID | None] # FK → cases.id
dedup_hash: Mapped[str | None] # SHA-256 content hash for deduplication
metadata_: Mapped[dict | None] # JSONB, mapped to column "metadata"
created_at: Mapped[datetime] # Timezone-aware UTC
updated_at: Mapped[datetime] # Auto-updated on change

Key design decisions:

  • dedup_hash: Computed as SHA256(title|source|sorted_iocs). The scheduler runs deduplication every 5 minutes, merging alerts with matching hashes.
  • JSONB columns (iocs, mitre_techniques, affected_assets): PostgreSQL JSONB enables indexed queries on nested structures without schema changes.
  • metadata_"metadata": Python attribute renamed to avoid conflict with SQLAlchemy's internal metadata attribute.

Indexes:

IndexColumn(s)Purpose
idx_alerts_severityseverityFilter by severity level
idx_alerts_statusstatusFilter by workflow state
idx_alerts_created_atcreated_atTime-range queries
idx_alerts_dedup_hashdedup_hashFast dedup lookup

CaseModel

Represents an incident investigation case that groups related alerts.

class CaseModel(Base):
__tablename__ = "cases"

id: Mapped[uuid.UUID]
title: Mapped[str]
severity: Mapped[str]
status: Mapped[str] # open|in_progress|pending_approval|contained|...
description: Mapped[str | None]
assignee: Mapped[str | None] # Human analyst or agent name
iocs: Mapped[list | None] # Aggregated IOCs from all alerts
mitre_techniques: Mapped[list | None] # Aggregated MITRE techniques
affected_assets: Mapped[list | None]
cps_devices_involved: Mapped[list | None] # CPS device IDs if applicable
physical_impact: Mapped[str | None] # Physical safety assessment
recommended_actions: Mapped[list | None]
requires_human_approval: Mapped[bool] # Gates high-risk actions
confidence: Mapped[float] # AI confidence score 0.0-1.0
outcome: Mapped[str | None] # Post-resolution summary
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
closed_at: Mapped[datetime | None]

# Relationships
alerts: Mapped[list[AlertModel]] = relationship("AlertModel", back_populates="case")
timeline: Mapped[list[CaseTimelineModel]] = relationship(
"CaseTimelineModel", back_populates="case",
order_by="CaseTimelineModel.timestamp"
)

Why confidence as a float? AI agents provide a 0.0-1.0 confidence score for their assessment. Actions with confidence below 0.7 on critical assets auto-trigger requires_human_approval = True.

CaseTimelineModel

Audit trail of every agent action during an investigation.

class CaseTimelineModel(Base):
__tablename__ = "case_timeline"

id: Mapped[uuid.UUID]
case_id: Mapped[uuid.UUID] # FK → cases.id
agent: Mapped[str] # Which agent (e.g., "threat_hunter")
action: Mapped[str] # What it did (e.g., "Ran YARA scan")
details: Mapped[str | None] # Human-readable outcome
tool_calls: Mapped[list | None] # JSONB: which MCP tools were invoked
timestamp: Mapped[datetime]

SIEMHuntModel

Represents a persistent saved hunt definition owned by an analyst or operator.

class SIEMHuntModel(Base):
__tablename__ = "siem_hunts"

id: Mapped[uuid.UUID]
title: Mapped[str]
description: Mapped[str | None]
query: Mapped[str | None]
source: Mapped[str | None]
severity: Mapped[str | None]
host: Mapped[str | None]
tags: Mapped[list[str]]
sharing_mode: Mapped[str] # private|team|roles
shared_with_roles: Mapped[list[str]]
created_by: Mapped[str | None]
last_match_count: Mapped[int]
last_run_at: Mapped[datetime | None]
created_at: Mapped[datetime]
updated_at: Mapped[datetime]

Saved hunts are durable workflow objects rather than transient dashboard state. Sharing is enforced at the API layer through owner checks, team visibility, and explicit role grants.

DetectionRuleModel

Represents an analyst-authored detection rule derived from SIEM hunting context.

class DetectionRuleModel(Base):
__tablename__ = "detection_rules"

id: Mapped[uuid.UUID]
name: Mapped[str]
description: Mapped[str | None]
query: Mapped[str | None]
source: Mapped[str | None]
match_severity: Mapped[str | None]
host: Mapped[str | None]
alert_severity: Mapped[str]
alert_title_template: Mapped[str | None]
alert_description_template: Mapped[str | None]
auto_create_case: Mapped[bool]
enabled: Mapped[bool]
schedule_enabled: Mapped[bool]
schedule_interval_minutes: Mapped[int | None]
next_run_at: Mapped[datetime | None]
tags: Mapped[list[str]]
mitre_techniques: Mapped[list[str]]
sharing_mode: Mapped[str] # private|team|roles
shared_with_roles: Mapped[list[str]]
created_by: Mapped[str | None]
last_match_count: Mapped[int]
true_positive_count: Mapped[int]
false_positive_count: Mapped[int]
last_run_at: Mapped[datetime | None]
last_triggered_at: Mapped[datetime | None]
created_at: Mapped[datetime]
updated_at: Mapped[datetime]

Rules capture both search pivots and operational outcome metadata. This lets AuroraSOC measure not only how often a rule matches, but also how often analysts confirm or reject those matches.

Scheduler metadata is kept on the rule itself so due work can be claimed safely by the background scheduler after process restarts. next_run_at is advanced after each scheduled execution.

DetectionRuleRunModel

Persists every manual and scheduled detection rule execution.

class DetectionRuleRunModel(Base):
__tablename__ = "detection_rule_runs"

id: Mapped[uuid.UUID]
rule_id: Mapped[uuid.UUID] # FK → detection_rules.id
trigger_mode: Mapped[str] # manual|scheduled
status: Mapped[str] # completed|failed
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]
match_count: Mapped[int]
suppressed_count: Mapped[int]
sample_log_ids: Mapped[list[str]]
metadata_: Mapped[dict | None]
error_message: Mapped[str | None]
created_at: Mapped[datetime]

This model underpins the run-history API, operator timeline views in the SIEM workspace, and the 24-hour detection metrics endpoint.

DetectionSuppressionModel

Stores rule-scoped suppression windows that mute known-noisy patterns without deleting the rule itself.

class DetectionSuppressionModel(Base):
__tablename__ = "detection_suppressions"

id: Mapped[uuid.UUID]
rule_id: Mapped[uuid.UUID] # FK → detection_rules.id
name: Mapped[str]
reason: Mapped[str | None]
criteria: Mapped[dict]
starts_at: Mapped[datetime]
suppress_until: Mapped[datetime | None]
is_active: Mapped[bool]
created_by: Mapped[str | None]
created_at: Mapped[datetime]

The criteria JSON payload is intentionally flexible so suppressions can match combinations such as host, source, message fragments, or severity without introducing a new table per dimension.

DetectionFeedbackModel

Tracks analyst verdicts for promoted detections and binds that verdict back to the originating workflow objects.

class DetectionFeedbackModel(Base):
__tablename__ = "detection_feedback"

id: Mapped[uuid.UUID]
rule_id: Mapped[uuid.UUID] # FK → detection_rules.id
verdict: Mapped[str] # true_positive|false_positive|needs_tuning
notes: Mapped[str | None]
metadata_: Mapped[dict | None]
alert_id: Mapped[uuid.UUID | None] # FK → alerts.id
case_id: Mapped[uuid.UUID | None] # FK → cases.id
siem_log_id: Mapped[str | None]
created_by: Mapped[str | None]
created_at: Mapped[datetime]

This feedback loop gives AuroraSOC a durable audit trail for rule tuning. A false_positive verdict can update the linked alert status while also incrementing the rule’s false-positive counter for future reporting.

CPSDeviceModel

Registry of all cyber-physical system and IoT devices.

class CPSDeviceModel(Base):
__tablename__ = "cps_devices"

id: Mapped[uuid.UUID]
device_id: Mapped[str] # Unique device identifier (e.g., "stm32_pac_01")
device_type: Mapped[str] # sensor|controller|gateway|plc
name: Mapped[str | None]
firmware_stack: Mapped[str] # ada_spark|rust_embassy|zephyr_rtos|unknown
firmware_version: Mapped[str]
firmware_hash: Mapped[str] # SHA-256 of firmware binary
attestation_status: Mapped[str] # valid|revoked|expired|unknown|failed
risk_score: Mapped[float] # 0.0-100.0 composite risk score
location: Mapped[str | None] # Physical location
ip_address: Mapped[str | None]
network_segment: Mapped[str | None]
certificate_serial: Mapped[str | None] # x.509 cert serial from Vault PKI
last_seen: Mapped[datetime | None]
last_attestation: Mapped[datetime | None]
metadata_: Mapped[dict | None]

AttestationResultModel

Records every firmware attestation verification result.

class AttestationResultModel(Base):
__tablename__ = "attestation_results"

id: Mapped[uuid.UUID]
device_id: Mapped[str]
status: Mapped[str] # valid|failed
firmware_hash: Mapped[str] # Reported hash
expected_hash: Mapped[str] # Known-good hash
certificate_valid: Mapped[bool]
board_family: Mapped[str] # stm32|nrf52|esp32s3
boot_count: Mapped[int] # Monotonic counter (detects reflash attacks)
nonce: Mapped[str | None] # Challenge nonce for replay protection
verified_at: Mapped[datetime]

PlaybookModel & PlaybookExecutionModel

SOAR playbook definitions and their execution records.

class PlaybookModel(Base):
__tablename__ = "playbooks"

id: Mapped[uuid.UUID]
name: Mapped[str]
description: Mapped[str | None]
severity_filter: Mapped[str | None] # Only trigger for matching severity
steps: Mapped[list] # JSONB: ordered action steps
enabled: Mapped[bool]
requires_approval: Mapped[bool] # Human approval before execution
created_at: Mapped[datetime]
updated_at: Mapped[datetime]


class PlaybookExecutionModel(Base):
__tablename__ = "playbook_executions"

id: Mapped[uuid.UUID]
playbook_id: Mapped[uuid.UUID] # FK → playbooks.id
case_id: Mapped[uuid.UUID | None] # FK → cases.id
status: Mapped[str] # pending|running|completed|failed|rolled_back
parameters: Mapped[dict | None] # JSONB: runtime parameters
target_assets: Mapped[list | None]
steps_completed: Mapped[int]
steps_total: Mapped[int]
step_results: Mapped[list | None] # JSONB: per-step outcome
error_message: Mapped[str | None]
dry_run: Mapped[bool] # Preview mode
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]

IOCModel

Persistent indicator of compromise storage with threat intelligence enrichment.

class IOCModel(Base):
__tablename__ = "iocs"

id: Mapped[uuid.UUID]
type: Mapped[str] # ip|domain|url|hash_md5|hash_sha256|email|cve|...
value: Mapped[str] # The actual indicator value
confidence: Mapped[float] # 0.0-1.0
source: Mapped[str] # Feed or agent that reported it
reputation: Mapped[str | None] # malicious|suspicious|clean
threat_actor: Mapped[str | None]
campaigns: Mapped[list | None] # JSONB: associated campaigns
tags: Mapped[list | None] # JSONB: classification tags
context: Mapped[dict | None] # JSONB: arbitrary context
enrichment: Mapped[dict | None] # JSONB: external enrichment data
first_seen: Mapped[datetime]
last_seen: Mapped[datetime]

__table_args__ = (
Index("idx_iocs_type_value", "type", "value", unique=True),
)

The unique composite index on (type, value) prevents duplicate IOCs — if the same IP appears from multiple sources, the existing record is updated rather than duplicated.

Remaining Models

ModelTablePurpose
AgentAuditModelagent_audit_logEvery agent action with timing
HumanApprovalModelhuman_approvalsApproval gates with 4-hour expiry
ReportModelreportsGenerated investigation reports (JSON/Markdown/PDF)

vector_embeddings Table (Raw SQL — Not an ORM Model)

The vector_embeddings table stores all vector data for both Episodic Memory and Threat Intelligence. It is not an ORM model — it is managed via raw SQL through sqlalchemy.text() and created by migration 004.

Why Not an ORM Model?

  • pgvector's vector(384) type has limited SQLAlchemy ORM support. Raw SQL gives full control over the cosine distance operator (<=>), HNSW index parameters, and casting.
  • Upsert with ON CONFLICT is cleaner in raw SQL than in ORM merge semantics.
  • Performance: Raw text() queries avoid ORM overhead for high-throughput embedding operations.

Table Schema

CREATE TABLE IF NOT EXISTS vector_embeddings (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
collection VARCHAR(100) NOT NULL, -- 'aurora_cases' or 'aurora_threat_intel'
source_id VARCHAR(200) NOT NULL, -- case ID or IOC key
embedding vector(384) NOT NULL, -- all-MiniLM-L6-v2 dimension
payload JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),

CONSTRAINT uq_vec_collection_source UNIQUE (collection, source_id)
);

-- HNSW index for fast approximate nearest-neighbor search
CREATE INDEX IF NOT EXISTS idx_vec_emb_hnsw
ON vector_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

SQL Helpers in stores.py

_UPSERT_SQL = text("""
INSERT INTO vector_embeddings (collection, source_id, embedding, payload)
VALUES (:collection, :source_id, :embedding::vector, :payload::jsonb)
ON CONFLICT (collection, source_id)
DO UPDATE SET embedding = EXCLUDED.embedding,
payload = EXCLUDED.payload,
created_at = now()
""")

_SEARCH_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")

_SEARCH_FILTERED_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
AND payload @> :filter_json::jsonb
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
OperatorMeaning
<=>pgvector cosine distance (0 = identical, 2 = opposite)
@>JSONB containment — payload @> '{"severity":"CRITICAL"}'
::vectorCast text literal to pgvector's vector type
Collection Convention

All vectors in one table, partitioned by the collection column. EpisodicMemoryStore uses aurora_cases; ThreatIntelMemory uses aurora_threat_intel. To add a new memory type, just use a new collection name — no migration needed.

vector_embeddings Table (Raw SQL — Not an ORM Model)

The vector_embeddings table stores all vector data for both Episodic Memory and Threat Intelligence. It is not an ORM model — it is managed via raw SQL through sqlalchemy.text() and created by migration 004.

Why Not an ORM Model?

  • pgvector's vector(384) type has limited SQLAlchemy ORM support. Raw SQL gives full control over the cosine distance operator (<=>), HNSW index parameters, and casting.
  • Upsert with ON CONFLICT is cleaner in raw SQL than in ORM merge semantics.
  • Performance: Raw text() queries avoid ORM overhead for high-throughput embedding operations.

Table Schema

CREATE TABLE IF NOT EXISTS vector_embeddings (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
collection VARCHAR(100) NOT NULL, -- 'aurora_cases' or 'aurora_threat_intel'
source_id VARCHAR(200) NOT NULL, -- case ID or IOC key
embedding vector(384) NOT NULL, -- all-MiniLM-L6-v2 dimension
payload JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),

CONSTRAINT uq_vec_collection_source UNIQUE (collection, source_id)
);

-- HNSW index for fast approximate nearest-neighbor search
CREATE INDEX IF NOT EXISTS idx_vec_emb_hnsw
ON vector_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

SQL Helpers in stores.py

_UPSERT_SQL = text("""
INSERT INTO vector_embeddings (collection, source_id, embedding, payload)
VALUES (:collection, :source_id, :embedding::vector, :payload::jsonb)
ON CONFLICT (collection, source_id)
DO UPDATE SET embedding = EXCLUDED.embedding,
payload = EXCLUDED.payload,
created_at = now()
""")

_SEARCH_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")

_SEARCH_FILTERED_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
AND payload @> :filter_json::jsonb
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
OperatorMeaning
<=>pgvector cosine distance (0 = identical, 2 = opposite)
@>JSONB containment — payload @> '{"severity":"CRITICAL"}'
::vectorCast text literal to pgvector's vector type
Collection Convention

All vectors in one table, partitioned by the collection column. EpisodicMemoryStore uses aurora_cases; ThreatIntelMemory uses aurora_threat_intel. To add a new memory type, just use a new collection name — no migration needed.

Database Initialization

# aurorasoc/core/database.py
async def init_db():
"""Initialize database with graceful fallback."""
try:
engine = create_async_engine(settings.postgres.url, pool_size=20, max_overflow=10)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
except Exception:
logger.warning("database_unavailable", msg="Running in degraded mode")

The API runs in degraded mode when PostgreSQL is unavailable, falling back to in-memory data stores. This ensures the dashboard remains functional during database maintenance.

Alembic Migrations

# Generate a new migration
alembic revision --autogenerate -m "Add new column"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1

The initial migration (001_initial_schema.py) creates all 11 tables with proper indexes, foreign keys, and PostgreSQL-specific types (JSONB, UUID).