Database Models (SQLAlchemy)
AuroraSOC uses SQLAlchemy 2.0 with async support and PostgreSQL 16. The ORM layer provides type safety, relationship management, and migration support through Alembic.
Why SQLAlchemy 2.0 Async?
| Requirement | Solution |
|---|---|
| Non-blocking I/O in FastAPI | AsyncSession with asyncpg driver |
| Type-safe column definitions | Mapped[T] + mapped_column() annotations |
| Complex JSONB queries | PostgreSQL dialect with JSONB type |
| Relationship loading | Lazy/eager loading via relationship() |
| Schema migrations | Alembic integration |
Entity Relationship Diagram
Base Class
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
class Base(AsyncAttrs, DeclarativeBase):
"""SQLAlchemy declarative base for all AuroraSOC ORM models."""
pass
AsyncAttrs enables lazy-loading of relationships in async contexts — without it, accessing case.alerts would raise a synchronous I/O error.
All Models
AlertModel
Represents a security alert from any source — SIEM, EDR, CPS devices, or manual creation.
class AlertModel(Base):
__tablename__ = "alerts"
id: Mapped[uuid.UUID] # Primary key, auto-generated UUID
title: Mapped[str] # Max 500 chars
description: Mapped[str | None] # Full alert description
severity: Mapped[str] # critical|high|medium|low|info
status: Mapped[str] # new|triaged|investigating|escalated|resolved|false_positive
source: Mapped[str] # Origin system (e.g., "suricata", "crowdstrike")
asset_type: Mapped[str | None] # server|workstation|iot_sensor|cps_controller|...
affected_assets: Mapped[list] # JSONB array of asset identifiers
iocs: Mapped[list] # JSONB array of IOC objects
mitre_techniques: Mapped[list] # JSONB array (e.g., ["T1071.001", "T1048"])
raw_log: Mapped[str | None] # Original log entry
assigned_agent: Mapped[str | None] # Agent type handling this alert
case_id: Mapped[uuid.UUID | None] # FK → cases.id
dedup_hash: Mapped[str | None] # SHA-256 content hash for deduplication
metadata_: Mapped[dict | None] # JSONB, mapped to column "metadata"
created_at: Mapped[datetime] # Timezone-aware UTC
updated_at: Mapped[datetime] # Auto-updated on change
Key design decisions:
dedup_hash: Computed asSHA256(title|source|sorted_iocs). The scheduler runs deduplication every 5 minutes, merging alerts with matching hashes.- JSONB columns (
iocs,mitre_techniques,affected_assets): PostgreSQL JSONB enables indexed queries on nested structures without schema changes. metadata_→"metadata": Python attribute renamed to avoid conflict with SQLAlchemy's internalmetadataattribute.
Indexes:
| Index | Column(s) | Purpose |
|---|---|---|
idx_alerts_severity | severity | Filter by severity level |
idx_alerts_status | status | Filter by workflow state |
idx_alerts_created_at | created_at | Time-range queries |
idx_alerts_dedup_hash | dedup_hash | Fast dedup lookup |
CaseModel
Represents an incident investigation case that groups related alerts.
class CaseModel(Base):
__tablename__ = "cases"
id: Mapped[uuid.UUID]
title: Mapped[str]
severity: Mapped[str]
status: Mapped[str] # open|in_progress|pending_approval|contained|...
description: Mapped[str | None]
assignee: Mapped[str | None] # Human analyst or agent name
iocs: Mapped[list | None] # Aggregated IOCs from all alerts
mitre_techniques: Mapped[list | None] # Aggregated MITRE techniques
affected_assets: Mapped[list | None]
cps_devices_involved: Mapped[list | None] # CPS device IDs if applicable
physical_impact: Mapped[str | None] # Physical safety assessment
recommended_actions: Mapped[list | None]
requires_human_approval: Mapped[bool] # Gates high-risk actions
confidence: Mapped[float] # AI confidence score 0.0-1.0
outcome: Mapped[str | None] # Post-resolution summary
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
closed_at: Mapped[datetime | None]
# Relationships
alerts: Mapped[list[AlertModel]] = relationship("AlertModel", back_populates="case")
timeline: Mapped[list[CaseTimelineModel]] = relationship(
"CaseTimelineModel", back_populates="case",
order_by="CaseTimelineModel.timestamp"
)
Why confidence as a float? AI agents provide a 0.0-1.0 confidence score for their assessment. Actions with confidence below 0.7 on critical assets auto-trigger requires_human_approval = True.
CaseTimelineModel
Audit trail of every agent action during an investigation.
class CaseTimelineModel(Base):
__tablename__ = "case_timeline"
id: Mapped[uuid.UUID]
case_id: Mapped[uuid.UUID] # FK → cases.id
agent: Mapped[str] # Which agent (e.g., "threat_hunter")
action: Mapped[str] # What it did (e.g., "Ran YARA scan")
details: Mapped[str | None] # Human-readable outcome
tool_calls: Mapped[list | None] # JSONB: which MCP tools were invoked
timestamp: Mapped[datetime]
SIEMHuntModel
Represents a persistent saved hunt definition owned by an analyst or operator.
class SIEMHuntModel(Base):
__tablename__ = "siem_hunts"
id: Mapped[uuid.UUID]
title: Mapped[str]
description: Mapped[str | None]
query: Mapped[str | None]
source: Mapped[str | None]
severity: Mapped[str | None]
host: Mapped[str | None]
tags: Mapped[list[str]]
sharing_mode: Mapped[str] # private|team|roles
shared_with_roles: Mapped[list[str]]
created_by: Mapped[str | None]
last_match_count: Mapped[int]
last_run_at: Mapped[datetime | None]
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
Saved hunts are durable workflow objects rather than transient dashboard state. Sharing is enforced at the API layer through owner checks, team visibility, and explicit role grants.
DetectionRuleModel
Represents an analyst-authored detection rule derived from SIEM hunting context.
class DetectionRuleModel(Base):
__tablename__ = "detection_rules"
id: Mapped[uuid.UUID]
name: Mapped[str]
description: Mapped[str | None]
query: Mapped[str | None]
source: Mapped[str | None]
match_severity: Mapped[str | None]
host: Mapped[str | None]
alert_severity: Mapped[str]
alert_title_template: Mapped[str | None]
alert_description_template: Mapped[str | None]
auto_create_case: Mapped[bool]
enabled: Mapped[bool]
schedule_enabled: Mapped[bool]
schedule_interval_minutes: Mapped[int | None]
next_run_at: Mapped[datetime | None]
tags: Mapped[list[str]]
mitre_techniques: Mapped[list[str]]
sharing_mode: Mapped[str] # private|team|roles
shared_with_roles: Mapped[list[str]]
created_by: Mapped[str | None]
last_match_count: Mapped[int]
true_positive_count: Mapped[int]
false_positive_count: Mapped[int]
last_run_at: Mapped[datetime | None]
last_triggered_at: Mapped[datetime | None]
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
Rules capture both search pivots and operational outcome metadata. This lets AuroraSOC measure not only how often a rule matches, but also how often analysts confirm or reject those matches.
Scheduler metadata is kept on the rule itself so due work can be claimed safely by the background scheduler after process restarts. next_run_at is advanced after each scheduled execution.
DetectionRuleRunModel
Persists every manual and scheduled detection rule execution.
class DetectionRuleRunModel(Base):
__tablename__ = "detection_rule_runs"
id: Mapped[uuid.UUID]
rule_id: Mapped[uuid.UUID] # FK → detection_rules.id
trigger_mode: Mapped[str] # manual|scheduled
status: Mapped[str] # completed|failed
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]
match_count: Mapped[int]
suppressed_count: Mapped[int]
sample_log_ids: Mapped[list[str]]
metadata_: Mapped[dict | None]
error_message: Mapped[str | None]
created_at: Mapped[datetime]
This model underpins the run-history API, operator timeline views in the SIEM workspace, and the 24-hour detection metrics endpoint.
DetectionSuppressionModel
Stores rule-scoped suppression windows that mute known-noisy patterns without deleting the rule itself.
class DetectionSuppressionModel(Base):
__tablename__ = "detection_suppressions"
id: Mapped[uuid.UUID]
rule_id: Mapped[uuid.UUID] # FK → detection_rules.id
name: Mapped[str]
reason: Mapped[str | None]
criteria: Mapped[dict]
starts_at: Mapped[datetime]
suppress_until: Mapped[datetime | None]
is_active: Mapped[bool]
created_by: Mapped[str | None]
created_at: Mapped[datetime]
The criteria JSON payload is intentionally flexible so suppressions can match combinations such as host, source, message fragments, or severity without introducing a new table per dimension.
DetectionFeedbackModel
Tracks analyst verdicts for promoted detections and binds that verdict back to the originating workflow objects.
class DetectionFeedbackModel(Base):
__tablename__ = "detection_feedback"
id: Mapped[uuid.UUID]
rule_id: Mapped[uuid.UUID] # FK → detection_rules.id
verdict: Mapped[str] # true_positive|false_positive|needs_tuning
notes: Mapped[str | None]
metadata_: Mapped[dict | None]
alert_id: Mapped[uuid.UUID | None] # FK → alerts.id
case_id: Mapped[uuid.UUID | None] # FK → cases.id
siem_log_id: Mapped[str | None]
created_by: Mapped[str | None]
created_at: Mapped[datetime]
This feedback loop gives AuroraSOC a durable audit trail for rule tuning. A false_positive verdict can update the linked alert status while also incrementing the rule’s false-positive counter for future reporting.
CPSDeviceModel
Registry of all cyber-physical system and IoT devices.
class CPSDeviceModel(Base):
__tablename__ = "cps_devices"
id: Mapped[uuid.UUID]
device_id: Mapped[str] # Unique device identifier (e.g., "stm32_pac_01")
device_type: Mapped[str] # sensor|controller|gateway|plc
name: Mapped[str | None]
firmware_stack: Mapped[str] # ada_spark|rust_embassy|zephyr_rtos|unknown
firmware_version: Mapped[str]
firmware_hash: Mapped[str] # SHA-256 of firmware binary
attestation_status: Mapped[str] # valid|revoked|expired|unknown|failed
risk_score: Mapped[float] # 0.0-100.0 composite risk score
location: Mapped[str | None] # Physical location
ip_address: Mapped[str | None]
network_segment: Mapped[str | None]
certificate_serial: Mapped[str | None] # x.509 cert serial from Vault PKI
last_seen: Mapped[datetime | None]
last_attestation: Mapped[datetime | None]
metadata_: Mapped[dict | None]
AttestationResultModel
Records every firmware attestation verification result.
class AttestationResultModel(Base):
__tablename__ = "attestation_results"
id: Mapped[uuid.UUID]
device_id: Mapped[str]
status: Mapped[str] # valid|failed
firmware_hash: Mapped[str] # Reported hash
expected_hash: Mapped[str] # Known-good hash
certificate_valid: Mapped[bool]
board_family: Mapped[str] # stm32|nrf52|esp32s3
boot_count: Mapped[int] # Monotonic counter (detects reflash attacks)
nonce: Mapped[str | None] # Challenge nonce for replay protection
verified_at: Mapped[datetime]
PlaybookModel & PlaybookExecutionModel
SOAR playbook definitions and their execution records.
class PlaybookModel(Base):
__tablename__ = "playbooks"
id: Mapped[uuid.UUID]
name: Mapped[str]
description: Mapped[str | None]
severity_filter: Mapped[str | None] # Only trigger for matching severity
steps: Mapped[list] # JSONB: ordered action steps
enabled: Mapped[bool]
requires_approval: Mapped[bool] # Human approval before execution
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
class PlaybookExecutionModel(Base):
__tablename__ = "playbook_executions"
id: Mapped[uuid.UUID]
playbook_id: Mapped[uuid.UUID] # FK → playbooks.id
case_id: Mapped[uuid.UUID | None] # FK → cases.id
status: Mapped[str] # pending|running|completed|failed|rolled_back
parameters: Mapped[dict | None] # JSONB: runtime parameters
target_assets: Mapped[list | None]
steps_completed: Mapped[int]
steps_total: Mapped[int]
step_results: Mapped[list | None] # JSONB: per-step outcome
error_message: Mapped[str | None]
dry_run: Mapped[bool] # Preview mode
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]
IOCModel
Persistent indicator of compromise storage with threat intelligence enrichment.
class IOCModel(Base):
__tablename__ = "iocs"
id: Mapped[uuid.UUID]
type: Mapped[str] # ip|domain|url|hash_md5|hash_sha256|email|cve|...
value: Mapped[str] # The actual indicator value
confidence: Mapped[float] # 0.0-1.0
source: Mapped[str] # Feed or agent that reported it
reputation: Mapped[str | None] # malicious|suspicious|clean
threat_actor: Mapped[str | None]
campaigns: Mapped[list | None] # JSONB: associated campaigns
tags: Mapped[list | None] # JSONB: classification tags
context: Mapped[dict | None] # JSONB: arbitrary context
enrichment: Mapped[dict | None] # JSONB: external enrichment data
first_seen: Mapped[datetime]
last_seen: Mapped[datetime]
__table_args__ = (
Index("idx_iocs_type_value", "type", "value", unique=True),
)
The unique composite index on (type, value) prevents duplicate IOCs — if the same IP appears from multiple sources, the existing record is updated rather than duplicated.
Remaining Models
| Model | Table | Purpose |
|---|---|---|
AgentAuditModel | agent_audit_log | Every agent action with timing |
HumanApprovalModel | human_approvals | Approval gates with 4-hour expiry |
ReportModel | reports | Generated investigation reports (JSON/Markdown/PDF) |
vector_embeddings Table (Raw SQL — Not an ORM Model)
The vector_embeddings table stores all vector data for both Episodic Memory and Threat Intelligence. It is not an ORM model — it is managed via raw SQL through sqlalchemy.text() and created by migration 004.
Why Not an ORM Model?
- pgvector's
vector(384)type has limited SQLAlchemy ORM support. Raw SQL gives full control over the cosine distance operator (<=>), HNSW index parameters, and casting. - Upsert with
ON CONFLICTis cleaner in raw SQL than in ORM merge semantics. - Performance: Raw
text()queries avoid ORM overhead for high-throughput embedding operations.
Table Schema
CREATE TABLE IF NOT EXISTS vector_embeddings (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
collection VARCHAR(100) NOT NULL, -- 'aurora_cases' or 'aurora_threat_intel'
source_id VARCHAR(200) NOT NULL, -- case ID or IOC key
embedding vector(384) NOT NULL, -- all-MiniLM-L6-v2 dimension
payload JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_vec_collection_source UNIQUE (collection, source_id)
);
-- HNSW index for fast approximate nearest-neighbor search
CREATE INDEX IF NOT EXISTS idx_vec_emb_hnsw
ON vector_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
SQL Helpers in stores.py
_UPSERT_SQL = text("""
INSERT INTO vector_embeddings (collection, source_id, embedding, payload)
VALUES (:collection, :source_id, :embedding::vector, :payload::jsonb)
ON CONFLICT (collection, source_id)
DO UPDATE SET embedding = EXCLUDED.embedding,
payload = EXCLUDED.payload,
created_at = now()
""")
_SEARCH_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
_SEARCH_FILTERED_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
AND payload @> :filter_json::jsonb
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
| Operator | Meaning |
|---|---|
<=> | pgvector cosine distance (0 = identical, 2 = opposite) |
@> | JSONB containment — payload @> '{"severity":"CRITICAL"}' |
::vector | Cast text literal to pgvector's vector type |
All vectors in one table, partitioned by the collection column. EpisodicMemoryStore uses aurora_cases; ThreatIntelMemory uses aurora_threat_intel. To add a new memory type, just use a new collection name — no migration needed.
vector_embeddings Table (Raw SQL — Not an ORM Model)
The vector_embeddings table stores all vector data for both Episodic Memory and Threat Intelligence. It is not an ORM model — it is managed via raw SQL through sqlalchemy.text() and created by migration 004.
Why Not an ORM Model?
- pgvector's
vector(384)type has limited SQLAlchemy ORM support. Raw SQL gives full control over the cosine distance operator (<=>), HNSW index parameters, and casting. - Upsert with
ON CONFLICTis cleaner in raw SQL than in ORM merge semantics. - Performance: Raw
text()queries avoid ORM overhead for high-throughput embedding operations.
Table Schema
CREATE TABLE IF NOT EXISTS vector_embeddings (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
collection VARCHAR(100) NOT NULL, -- 'aurora_cases' or 'aurora_threat_intel'
source_id VARCHAR(200) NOT NULL, -- case ID or IOC key
embedding vector(384) NOT NULL, -- all-MiniLM-L6-v2 dimension
payload JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_vec_collection_source UNIQUE (collection, source_id)
);
-- HNSW index for fast approximate nearest-neighbor search
CREATE INDEX IF NOT EXISTS idx_vec_emb_hnsw
ON vector_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
SQL Helpers in stores.py
_UPSERT_SQL = text("""
INSERT INTO vector_embeddings (collection, source_id, embedding, payload)
VALUES (:collection, :source_id, :embedding::vector, :payload::jsonb)
ON CONFLICT (collection, source_id)
DO UPDATE SET embedding = EXCLUDED.embedding,
payload = EXCLUDED.payload,
created_at = now()
""")
_SEARCH_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
_SEARCH_FILTERED_SQL = text("""
SELECT source_id, payload,
1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
AND payload @> :filter_json::jsonb
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
| Operator | Meaning |
|---|---|
<=> | pgvector cosine distance (0 = identical, 2 = opposite) |
@> | JSONB containment — payload @> '{"severity":"CRITICAL"}' |
::vector | Cast text literal to pgvector's vector type |
All vectors in one table, partitioned by the collection column. EpisodicMemoryStore uses aurora_cases; ThreatIntelMemory uses aurora_threat_intel. To add a new memory type, just use a new collection name — no migration needed.
Database Initialization
# aurorasoc/core/database.py
async def init_db():
"""Initialize database with graceful fallback."""
try:
engine = create_async_engine(settings.postgres.url, pool_size=20, max_overflow=10)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
except Exception:
logger.warning("database_unavailable", msg="Running in degraded mode")
The API runs in degraded mode when PostgreSQL is unavailable, falling back to in-memory data stores. This ensures the dashboard remains functional during database maintenance.
Alembic Migrations
# Generate a new migration
alembic revision --autogenerate -m "Add new column"
# Apply migrations
alembic upgrade head
# Rollback one step
alembic downgrade -1
The initial migration (001_initial_schema.py) creates all 11 tables with proper indexes, foreign keys, and PostgreSQL-specific types (JSONB, UUID).