Vector Database (pgvector)
AuroraSOC uses pgvector — a PostgreSQL extension for vector similarity search — to power semantic memory. Agents can recall similar past investigations and search threat intelligence by meaning rather than exact match, all within the same PostgreSQL instance that stores relational data.
Why pgvector inside PostgreSQL?
Traditional databases search by exact values: WHERE ioc_value = '10.0.0.1'. But security analysts think in concepts:
"Find past cases similar to a lateral movement attack using PowerShell with DNS exfiltration"
Vector databases convert text into numerical embeddings and find similar items by geometric distance in high-dimensional space. By using pgvector rather than a separate vector database, AuroraSOC:
- Eliminates an external service — no separate Qdrant/Pinecone/Weaviate to deploy, monitor, and secure.
- Shares the existing PostgreSQL connection pool — vector queries reuse the same
asyncpgsessions as relational queries. - Simplifies transactions — a case closure can atomically update the
casestable and store a vector embedding in one database transaction. - Reduces attack surface — one fewer network service to protect in a SOC environment where data sensitivity is paramount.
Architecture
The vector_embeddings Table
All vector data lives in a single table, partitioned logically by a collection column. This table is created by Alembic migration 004 using raw SQL (not an ORM model), because pgvector's vector(384) type is used directly.
CREATE TABLE IF NOT EXISTS vector_embeddings (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
collection VARCHAR(100) NOT NULL,
source_id VARCHAR(200) NOT NULL,
embedding vector(384) NOT NULL,
payload JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (collection, source_id)
);
-- HNSW index for fast approximate nearest-neighbor search (cosine distance)
CREATE INDEX IF NOT EXISTS idx_vector_embeddings_hnsw
ON vector_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
Key design decisions:
| Decision | Rationale |
|---|---|
Single table, collection column | Simpler schema; HNSW index works across all collections; easy to add new memory types |
UNIQUE (collection, source_id) | Enables upsert semantics — re-storing a case updates its embedding |
| HNSW index (not IVFFlat) | Better recall at low latency; no need to periodically retrain centroids |
vector(384) fixed dimension | Matches the all-MiniLM-L6-v2 model output; enforced at the database level |
| Raw SQL (not ORM) | pgvector's vector type doesn't require SQLAlchemy ORM integration — raw text() queries keep the code simple |
Text Embedder
All text is converted to vectors using a TextEmbedder class that supports two backends:
class TextEmbedder:
"""Shared text embedding model for all memory tiers."""
_DEFAULT_VECTOR_SIZE = 384
def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None:
self._model = None
self.vector_size = self._DEFAULT_VECTOR_SIZE
self._fallback_mode = True
preferred_backend = os.getenv("AURORASOC_EMBEDDINGS_BACKEND", "auto").lower()
if preferred_backend != "hash" and SentenceTransformer is not None:
try:
self._model = SentenceTransformer(model_name)
self.vector_size = self._model.get_sentence_embedding_dimension()
self._fallback_mode = False
except Exception:
logger.exception("embedder_model_load_failed", fallback="hash")
Two Embedding Modes
| Mode | When | Method | Quality |
|---|---|---|---|
| ML embeddings | sentence-transformers is installed | SentenceTransformer.encode() | High — true semantic similarity |
| SHA-256 hash fallback | Missing dependency or AURORASOC_EMBEDDINGS_BACKEND=hash | Deterministic hash → normalized vector | Low — no semantic understanding |
The hash fallback ensures the system never crashes due to a missing ML dependency. It produces consistent vectors (same input → same output) but lacks semantic understanding. The is_degraded property exposes this state, and search results include an embedding_quality field so consumers know the confidence level.
def _hash_embed(self, text: str) -> list[float]:
"""Deterministic normalized embeddings without heavy ML deps."""
digest = hashlib.sha256(text.encode("utf-8")).digest()
values = [((digest[i % len(digest)] / 255.0) * 2.0) - 1.0 for i in range(self.vector_size)]
norm = math.sqrt(sum(v * v for v in values)) or 1.0
return [v / norm for v in values]
Why all-MiniLM-L6-v2?
| Model | Dimensions | Speed | Quality | Size |
|---|---|---|---|---|
all-MiniLM-L6-v2 | 384 | Fast | Good | 80MB |
all-mpnet-base-v2 | 768 | Medium | Best | 420MB |
all-MiniLM-L12-v2 | 384 | Medium | Better | 120MB |
At 384 dimensions and 80MB, MiniLM-L6-v2 balances quality with the resource constraints of a SOC platform that also runs 16 AI agents, a Rust engine, and multiple database services.
pgvector SQL Helpers
The stores.py module uses three parameterized SQL templates built with SQLAlchemy text():
Upsert
_UPSERT_SQL = text("""
INSERT INTO vector_embeddings (collection, source_id, embedding, payload)
VALUES (:collection, :source_id, :embedding::vector, :payload::jsonb)
ON CONFLICT (collection, source_id) DO UPDATE SET
embedding = EXCLUDED.embedding,
payload = EXCLUDED.payload,
created_at = now()
""")
The ON CONFLICT ... DO UPDATE clause makes store operations idempotent — re-storing a case with the same source_id updates its embedding and payload without creating duplicates.
Search (Cosine Distance)
_SEARCH_SQL = text("""
SELECT source_id, payload, 1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
The <=> operator computes cosine distance (0 = identical, 2 = opposite). We convert to a similarity score via 1 - distance, giving a range of -1.0 to 1.0 where higher is more similar.
Filtered Search (JSONB)
_SEARCH_FILTERED_SQL = text("""
SELECT source_id, payload, 1 - (embedding <=> :query_vec::vector) AS score
FROM vector_embeddings
WHERE collection = :collection
AND payload @> :filter_json::jsonb
ORDER BY embedding <=> :query_vec::vector
LIMIT :top_k
""")
The @> operator checks if the stored payload JSONB contains the filter. For example, payload @> '{"severity": "high"}' filters to only high-severity cases before running the vector search.
Collection 1: Episodic Memory (aurora_cases)
Stores closed investigation cases for recall during future investigations.
Point Payload
{
"summary": "DNS tunneling attack via iodine tool targeting HR workstations",
"techniques": ["T1071.004", "T1048.001"],
"iocs": [{"type": "domain", "value": "t1.evil.com"}],
"cps_devices_involved": [],
"outcome": "Contained. Blocked domain at DNS firewall. Reimaged 3 workstations.",
"severity": "high",
"confidence": 0.89,
"stored_at": "2024-01-15T10:30:00Z"
}
Store a Case
async def store_case(self, case: ClosedCase) -> None:
embedding = await self._embedder.embed(case.summary)
payload = {
"summary": case.summary,
"techniques": case.mitre,
"iocs": case.iocs,
"cps_devices_involved": case.cps_devices,
"outcome": case.outcome,
"severity": case.severity,
"confidence": case.confidence,
"stored_at": datetime.now(UTC).isoformat(),
**case.metadata,
}
async with get_session() as session:
await session.execute(
_UPSERT_SQL,
{
"collection": self.COLLECTION,
"source_id": case.id,
"embedding": _vec_literal(embedding),
"payload": json.dumps(payload),
},
)
Recall Similar Cases
async def recall_similar(
self, query: str, top_k: int = 5, severity_filter: str | None = None
) -> list[dict]:
embedding = await self._embedder.embed(query)
async with get_session() as session:
if severity_filter:
rows = await session.execute(
_SEARCH_FILTERED_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"filter_json": json.dumps({"severity": severity_filter}),
"top_k": top_k,
},
)
else:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": top_k,
},
)
return [
{"id": row.source_id, "score": float(row.score), **row.payload}
for row in rows
]
Example usage by the Orchestrator:
# During investigation: "What did we do last time we saw DNS tunneling?"
similar_cases = await episodic_memory.recall_similar(
query="DNS tunneling data exfiltration corporate network",
top_k=3,
severity_filter="high"
)
# Returns past cases with outcomes to inform current investigation
Collection 2: Threat Intelligence (aurora_threat_intel)
Stores IOCs with semantic context for similarity search, supplemented by Redis TTL caching.
Dual-Layer Architecture
Store an IOC
async def store_ioc(self, ioc: dict) -> None:
ioc_value = ioc.get("value", "")
ioc_type = ioc.get("type", "unknown")
# Redis: fast exact-match cache (configurable TTL)
r = await get_redis()
settings = get_settings()
cache_key = f"{self.REDIS_IOC_PREFIX}{ioc_type}:{ioc_value}"
await r.setex(cache_key, settings.redis.ioc_ttl_seconds, json.dumps(ioc))
# pgvector: semantic storage (persistent)
context_text = f"{ioc_type} {ioc_value} {ioc.get('context', '')}"
embedding = await self._embedder.embed(context_text)
source_id = str(uuid.uuid5(uuid.NAMESPACE_URL, f"{ioc_type}:{ioc_value}"))
async with get_session() as session:
await session.execute(
_UPSERT_SQL,
{
"collection": self.COLLECTION,
"source_id": source_id,
"embedding": _vec_literal(embedding),
"payload": json.dumps({**ioc, "stored_at": datetime.now(UTC).isoformat()}),
},
)
Enrich an IOC
The three-tier lookup (Redis → pgvector → External) minimizes latency:
async def enrich_ioc(self, ioc_value: str, ioc_type: str = "auto") -> dict:
# 1. Redis cache (microseconds)
r = await get_redis()
cached = await r.get(f"{self.REDIS_IOC_PREFIX}{ioc_type}:{ioc_value}")
if cached:
return json.loads(cached)
# 2. pgvector semantic search (milliseconds)
embedding = await self._embedder.embed(f"{ioc_type} {ioc_value}")
async with get_session() as session:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": 3,
},
)
for row in rows:
if row.payload.get("value") == ioc_value:
await r.setex(cache_key, ioc_ttl, json.dumps(row.payload))
return row.payload
# 3. Not found — caller checks external feeds
return {"value": ioc_value, "type": ioc_type, "found": False}
Configuration
# PostgreSQL (includes pgvector) — pgvector extension is installed via migration 004
PG_HOST=postgres
PG_PORT=5432
PG_DATABASE=aurorasoc
PG_USER=aurorasoc
PG_PASSWORD=<secure>
# Pool tuning (pgvector shares the same connection pool as relational queries)
PG_POOL_SIZE=20
PG_MAX_OVERFLOW=10
PG_POOL_PRE_PING=true
PG_SSLMODE=prefer
# Redis (IOC cache)
REDIS_URL=redis://redis:6379
REDIS_IOC_TTL_SECONDS=3600
# Embedding backend (optional)
AURORASOC_EMBEDDINGS_BACKEND=auto # "auto" | "hash"
AuroraSOC uses the pgvector/pgvector:pg16 Docker image, which ships with pgvector pre-installed. The Alembic migration simply runs CREATE EXTENSION IF NOT EXISTS vector to activate it.
Why pgvector over Alternatives?
| Feature | pgvector | Qdrant | Pinecone | ChromaDB |
|---|---|---|---|---|
| Runs in PostgreSQL | Yes | No (separate service) | No (SaaS) | No (separate) |
| Self-hosted | Yes | Yes | No | Yes |
| Shared connection pool | Yes | No | No | No |
| Transactional with relational data | Yes | No | No | No |
| HNSW index | Yes | Yes | Managed | Yes |
| JSONB payload filtering | Native (@>) | Payload filters | Basic | Metadata |
| Operational overhead | None (it's PostgreSQL) | Moderate | None (SaaS) | Low |
| Production-ready | Yes (since PG 16) | Yes | Yes | Not recommended |
pgvector eliminates an external service from the deployment, reduces operational complexity, and enables true ACID transactions that span both relational and vector data — ideal for a SOC that values simplicity and security.