انتقل إلى المحتوى الرئيسي

Vector Database (Qdrant)

AuroraSOC uses Qdrant as a vector database for semantic memory — enabling agents to recall similar past investigations and search threat intelligence by meaning rather than exact match.

Why a Vector Database?

Traditional databases search by exact values: WHERE ioc_value = '10.0.0.1'. But security analysts think in concepts:

"Find past cases similar to a lateral movement attack using PowerShell with DNS exfiltration"

Vector databases convert text into numerical embeddings and find similar items by geometric distance in high-dimensional space.

Architecture

Text Embedder

All text is converted to vectors using Sentence Transformers:

class TextEmbedder:
def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None:
self._model = SentenceTransformer(model_name)
self.vector_size = self._model.get_sentence_embedding_dimension() # 384

async def embed(self, text: str) -> list[float]:
"""Runs sync model in thread to avoid blocking the event loop."""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: self._model.encode(text, normalize_embeddings=True).tolist()
)
return result

Why all-MiniLM-L6-v2?

ModelDimensionsSpeedQualitySize
all-MiniLM-L6-v2384FastGood80MB
all-mpnet-base-v2768MediumBest420MB
all-MiniLM-L12-v2384MediumBetter120MB

At 384 dimensions and 80MB, MiniLM-L6-v2 balances quality with the resource constraints of a SOC platform that also runs 16 AI agents, a Rust engine, and 3 databases.

Collection 1: Episodic Memory (aurora_cases)

Stores closed investigation cases for recall during future investigations.

Schema

VectorParams(
size=384, # MiniLM-L6-v2 dimensions
distance=Distance.COSINE, # Range: 0.0 (opposite) to 1.0 (identical)
)

Point Payload

{
"summary": "DNS tunneling attack via iodine tool targeting HR workstations",
"techniques": ["T1071.004", "T1048.001"],
"iocs": [{"type": "domain", "value": "t1.evil.com"}],
"cps_devices_involved": [],
"outcome": "Contained. Blocked domain at DNS firewall. Reimaged 3 workstations.",
"severity": "high",
"confidence": 0.89,
"stored_at": "2024-01-15T10:30:00Z"
}

Store a Case

async def store_case(self, case: ClosedCase) -> None:
embedding = await self._embedder.embed(case.summary)
point = PointStruct(
id=case.id,
vector=embedding,
payload={
"summary": case.summary,
"techniques": case.mitre,
"iocs": case.iocs,
"cps_devices_involved": case.cps_devices,
"outcome": case.outcome,
"severity": case.severity,
"confidence": case.confidence,
"stored_at": datetime.now(timezone.utc).isoformat(),
},
)
await client.upsert(collection_name="aurora_cases", points=[point])

Recall Similar Cases

async def recall_similar(
self, query: str, top_k: int = 5, severity_filter: str | None = None
) -> list[dict]:
embedding = await self._embedder.embed(query)

query_filter = None
if severity_filter:
query_filter = Filter(
must=[FieldCondition(key="severity", match=MatchValue(value=severity_filter))]
)

results = await client.search(
collection_name="aurora_cases",
query_vector=embedding,
limit=top_k,
query_filter=query_filter,
)
return [{"id": r.id, "score": r.score, **r.payload} for r in results]

Example usage by the Orchestrator:

# During investigation: "What did we do last time we saw DNS tunneling?"
similar_cases = await episodic_memory.recall_similar(
query="DNS tunneling data exfiltration corporate network",
top_k=3,
severity_filter="high"
)
# Returns past cases with outcomes to inform current investigation

Collection 2: Threat Intelligence (aurora_threat_intel)

Stores IOCs with semantic context for similarity search, supplemented by Redis TTL caching.

Dual-Layer Architecture

Store an IOC

async def store_ioc(self, ioc: dict) -> None:
ioc_value = ioc.get("value", "")
ioc_type = ioc.get("type", "unknown")

# Redis: fast exact-match cache (1-hour TTL)
cache_key = f"aurora:ioc:{ioc_type}:{ioc_value}"
await redis.setex(cache_key, 3600, json.dumps(ioc))

# Qdrant: semantic search (persistent)
context_text = f"{ioc_type} {ioc_value} {ioc.get('context', '')}"
embedding = await embedder.embed(context_text)
point = PointStruct(id=str(uuid4()), vector=embedding, payload=ioc)
await qdrant.upsert(collection_name="aurora_threat_intel", points=[point])

Enrich an IOC

The three-tier lookup (Redis → Qdrant → External) minimizes latency:

async def enrich_ioc(self, ioc_value: str, ioc_type: str = "auto") -> dict:
# 1. Redis cache (microseconds)
cached = await redis.get(f"aurora:ioc:{ioc_type}:{ioc_value}")
if cached:
return json.loads(cached)

# 2. Qdrant semantic search (milliseconds)
results = await qdrant.search(
collection_name="aurora_threat_intel",
query_vector=await embedder.embed(f"{ioc_type} {ioc_value}"),
limit=3,
)
for result in results:
if result.payload.get("value") == ioc_value:
await redis.setex(cache_key, 3600, json.dumps(result.payload))
return result.payload

# 3. Not found — caller checks external feeds
return {"value": ioc_value, "type": ioc_type, "found": False}

Search Similar IOCs

# Find IOCs related to a description
results = await threat_intel.search_similar_iocs(
query="C2 beacon domains used by APT29 in SolarWinds campaign",
top_k=10
)

Configuration

# Qdrant connection
AURORA_QDRANT__HOST=qdrant
AURORA_QDRANT__PORT=6333

# Default embedding model
# Set via TextEmbedder constructor, defaults to "all-MiniLM-L6-v2"

Why Qdrant over Alternatives?

FeatureQdrantPineconeWeaviateChromaDB
Self-hostedYesNo (SaaS)YesYes
PerformanceRust engine, fastManagedJava, slowerPython, slowest
FilteringBuilt-in payload filtersBasicGraphQLMetadata filters
PersistenceDisk + WALManagedDiskEphemeral by default
Memory~200MB baseN/A~1GB~100MB
Production-readyYesYesYesNot recommended

Qdrant's Rust-based engine provides the performance needed while remaining self-hosted — critical for a SOC that cannot send security data to external SaaS services.