Skip to main content

Vector Database (pgvector)

AuroraSOC uses pgvector — a PostgreSQL extension for vector similarity search — to power semantic memory. Agents can recall similar past investigations and search threat intelligence by meaning rather than exact match, all within the same PostgreSQL instance that stores relational data.

Why pgvector inside PostgreSQL?

Traditional databases search by exact values: WHERE ioc_value = '10.0.0.1'. But security analysts think in concepts:

"Find past cases similar to a lateral movement attack using PowerShell with DNS exfiltration"

Vector databases convert text into numerical embeddings and find similar items by geometric distance in high-dimensional space. By using pgvector rather than a separate vector database, AuroraSOC:

  • Eliminates an external service — no separate Qdrant/Pinecone/Weaviate to deploy, monitor, and secure.
  • Shares the existing PostgreSQL connection pool — vector queries reuse the same asyncpg sessions as relational queries.
  • Simplifies transactions — a case closure can atomically update the cases table and store a vector embedding in one database transaction.
  • Reduces attack surface — one fewer network service to protect in a SOC environment where data sensitivity is paramount.

Architecture

The vector_embeddings Table

All vector data lives in a single table, partitioned logically by a collection column. This table is created by Alembic migration 004 using raw SQL (not an ORM model), because pgvector's vector(384) type is used directly.

CREATE TABLE IF NOT EXISTS vector_embeddings (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
collection VARCHAR(100) NOT NULL,
source_id VARCHAR(200) NOT NULL,
embedding vector(384) NOT NULL,
payload JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (collection, source_id)
);

-- HNSW index for fast approximate nearest-neighbor search (cosine distance)
CREATE INDEX IF NOT EXISTS idx_vector_embeddings_hnsw
ON vector_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

Key design decisions:

DecisionRationale
Single table, collection columnSimpler schema; HNSW index works across all collections; easy to add new memory types
UNIQUE (collection, source_id)Enables upsert semantics — re-storing a case updates its embedding
HNSW index (not IVFFlat)Better recall at low latency; no need to periodically retrain centroids
vector(384) fixed dimensionMatches the all-MiniLM-L6-v2 model output; enforced at the database level
Raw SQL (not ORM)pgvector's vector type doesn't require SQLAlchemy ORM integration — raw text() queries keep the code simple

Text Embedder

All text is converted to vectors using a TextEmbedder class that supports two backends:

class TextEmbedder:
"""Shared text embedding model for all memory tiers."""
_DEFAULT_VECTOR_SIZE = 384

def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None:
self._model = None
self.vector_size = self._DEFAULT_VECTOR_SIZE
self._fallback_mode = True

preferred_backend = os.getenv("AURORASOC_EMBEDDINGS_BACKEND", "auto").lower()
if preferred_backend != "hash" and SentenceTransformer is not None:
try:
self._model = SentenceTransformer(model_name)
self.vector_size = self._model.get_sentence_embedding_dimension()
self._fallback_mode = False
except Exception:
logger.exception("embedder_model_load_failed", fallback="hash")

Two Embedding Modes

ModeWhenMethodQuality
ML embeddingssentence-transformers is installedSentenceTransformer.encode()High — true semantic similarity
SHA-256 hash fallbackMissing dependency or AURORASOC_EMBEDDINGS_BACKEND=hashDeterministic hash → normalized vectorLow — no semantic understanding

The hash fallback ensures the system never crashes due to a missing ML dependency. It produces consistent vectors (same input → same output) but lacks semantic understanding. The is_degraded property exposes this state, and search results include an embedding_quality field so consumers know the confidence level.

def _hash_embed(self, text: str) -> list[float]:
"""Deterministic normalized embeddings without heavy ML deps."""
digest = hashlib.sha256(text.encode("utf-8")).digest()
values = [((digest[i % len(digest)] / 255.0) * 2.0) - 1.0 for i in range(self.vector_size)]
norm = math.sqrt(sum(v * v for v in values)) or 1.0
return [v / norm for v in values]

Why all-MiniLM-L6-v2?

ModelDimensionsSpeedQualitySize
all-MiniLM-L6-v2384FastGood80MB
all-mpnet-base-v2768MediumBest420MB
all-MiniLM-L12-v2384MediumBetter120MB

At 384 dimensions and 80MB, MiniLM-L6-v2 balances quality with the resource constraints of a SOC platform that also runs 16 AI agents, a Rust engine, and multiple database services.

pgvector SQL Helpers

The stores.py module uses three parameterized SQL templates built with SQLAlchemy text():

Upsert

_UPSERT_SQL = text("""
INSERT INTO vector_embeddings (collection, source_id, embedding, payload)
VALUES (:collection, :source_id, :embedding::vector, :payload::jsonb)
ON CONFLICT (collection, source_id) DO UPDATE SET
embedding = EXCLUDED.embedding,
payload = EXCLUDED.payload,
created_at = now()
""")

The ON CONFLICT ... DO UPDATE clause makes store operations idempotent — re-storing a case with the same source_id updates its embedding and payload without creating duplicates.

Search (Cosine Distance)

_SEARCH_SQL = text("""
SELECT source_id, payload, 1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")

The <=> operator computes cosine distance (0 = identical, 2 = opposite). We convert to a similarity score via 1 - distance, giving a range of -1.0 to 1.0 where higher is more similar.

Filtered Search (JSONB)

_SEARCH_FILTERED_SQL = text("""
SELECT source_id, payload, 1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
AND payload @> :filter_json::jsonb
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")

The @> operator checks if the stored payload JSONB contains the filter. For example, payload @> '{"severity": "high"}' filters to only high-severity cases before running the vector search.

Collection 1: Episodic Memory (aurora_cases)

Stores closed investigation cases for recall during future investigations.

Point Payload

{
"summary": "DNS tunneling attack via iodine tool targeting HR workstations",
"techniques": ["T1071.004", "T1048.001"],
"iocs": [{"type": "domain", "value": "t1.evil.com"}],
"cps_devices_involved": [],
"outcome": "Contained. Blocked domain at DNS firewall. Reimaged 3 workstations.",
"severity": "high",
"confidence": 0.89,
"stored_at": "2024-01-15T10:30:00Z"
}

Store a Case

async def store_case(self, case: ClosedCase) -> None:
embedding = await self._embedder.embed(case.summary)

payload = {
"summary": case.summary,
"techniques": case.mitre,
"iocs": case.iocs,
"cps_devices_involved": case.cps_devices,
"outcome": case.outcome,
"severity": case.severity,
"confidence": case.confidence,
"stored_at": datetime.now(UTC).isoformat(),
**case.metadata,
}

async with get_session() as session:
await session.execute(
_UPSERT_SQL,
{
"collection": self.COLLECTION,
"source_id": case.id,
"embedding": _vec_literal(embedding),
"payload": json.dumps(payload),
},
)

Recall Similar Cases

async def recall_similar(
self, query: str, top_k: int = 5, severity_filter: str | None = None
) -> list[dict]:
embedding = await self._embedder.embed(query)

async with get_session() as session:
if severity_filter:
rows = await session.execute(
_SEARCH_FILTERED_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"filter_json": json.dumps({"severity": severity_filter}),
"top_k": top_k,
},
)
else:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": top_k,
},
)

return [
{"id": row.source_id, "score": float(row.score), **row.payload}
for row in rows
]

Example usage by the Orchestrator:

# During investigation: "What did we do last time we saw DNS tunneling?"
similar_cases = await episodic_memory.recall_similar(
query="DNS tunneling data exfiltration corporate network",
top_k=3,
severity_filter="high"
)
# Returns past cases with outcomes to inform current investigation

Collection 2: Threat Intelligence (aurora_threat_intel)

Stores IOCs with semantic context for similarity search, supplemented by Redis TTL caching.

Dual-Layer Architecture

Store an IOC

async def store_ioc(self, ioc: dict) -> None:
ioc_value = ioc.get("value", "")
ioc_type = ioc.get("type", "unknown")

# Redis: fast exact-match cache (configurable TTL)
r = await get_redis()
settings = get_settings()
cache_key = f"{self.REDIS_IOC_PREFIX}{ioc_type}:{ioc_value}"
await r.setex(cache_key, settings.redis.ioc_ttl_seconds, json.dumps(ioc))

# pgvector: semantic storage (persistent)
context_text = f"{ioc_type} {ioc_value} {ioc.get('context', '')}"
embedding = await self._embedder.embed(context_text)

source_id = str(uuid.uuid5(uuid.NAMESPACE_URL, f"{ioc_type}:{ioc_value}"))
async with get_session() as session:
await session.execute(
_UPSERT_SQL,
{
"collection": self.COLLECTION,
"source_id": source_id,
"embedding": _vec_literal(embedding),
"payload": json.dumps({**ioc, "stored_at": datetime.now(UTC).isoformat()}),
},
)

Enrich an IOC

The three-tier lookup (Redis → pgvector → External) minimizes latency:

async def enrich_ioc(self, ioc_value: str, ioc_type: str = "auto") -> dict:
# 1. Redis cache (microseconds)
r = await get_redis()
cached = await r.get(f"{self.REDIS_IOC_PREFIX}{ioc_type}:{ioc_value}")
if cached:
return json.loads(cached)

# 2. pgvector semantic search (milliseconds)
embedding = await self._embedder.embed(f"{ioc_type} {ioc_value}")
async with get_session() as session:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": 3,
},
)
for row in rows:
if row.payload.get("value") == ioc_value:
await r.setex(cache_key, ioc_ttl, json.dumps(row.payload))
return row.payload

# 3. Not found — caller checks external feeds
return {"value": ioc_value, "type": ioc_type, "found": False}

Configuration

# PostgreSQL (includes pgvector) — pgvector extension is installed via migration 004
PG_HOST=postgres
PG_PORT=5432
PG_DATABASE=aurorasoc
PG_USER=aurorasoc
PG_PASSWORD=<secure>

# Pool tuning (pgvector shares the same connection pool as relational queries)
PG_POOL_SIZE=20
PG_MAX_OVERFLOW=10
PG_POOL_PRE_PING=true
PG_SSLMODE=prefer

# Redis (IOC cache)
REDIS_URL=redis://redis:6379
REDIS_IOC_TTL_SECONDS=3600

# Embedding backend (optional)
AURORASOC_EMBEDDINGS_BACKEND=auto # "auto" | "hash"
Docker Image

AuroraSOC uses the pgvector/pgvector:pg16 Docker image, which ships with pgvector pre-installed. The Alembic migration simply runs CREATE EXTENSION IF NOT EXISTS vector to activate it.

Why pgvector over Alternatives?

FeaturepgvectorQdrantPineconeChromaDB
Runs in PostgreSQLYesNo (separate service)No (SaaS)No (separate)
Self-hostedYesYesNoYes
Shared connection poolYesNoNoNo
Transactional with relational dataYesNoNoNo
HNSW indexYesYesManagedYes
JSONB payload filteringNative (@>)Payload filtersBasicMetadata
Operational overheadNone (it's PostgreSQL)ModerateNone (SaaS)Low
Production-readyYes (since PG 16)YesYesNot recommended

pgvector eliminates an external service from the deployment, reduces operational complexity, and enables true ACID transactions that span both relational and vector data — ideal for a SOC that values simplicity and security.