انتقل إلى المحتوى الرئيسي

قاعدة البيانات المتجهية (Qdrant)

يستخدم AuroraSOC منصة Qdrant كقاعدة بيانات متجهية للذاكرة الدلالية، مما يمكّن الوكلاء من استدعاء التحقيقات السابقة المتشابهة والبحث في استخبارات التهديدات بالمعنى بدل المطابقة الحرفية.

لماذا قاعدة بيانات متجهية؟

تبحث قواعد البيانات التقليدية بالقيم المطابقة حرفيًا: WHERE ioc_value = '10.0.0.1'. لكن محللي الأمن يفكرون بالمفاهيم:

"ابحث عن قضايا سابقة تشبه هجوم حركة جانبية باستخدام PowerShell مع تهريب بيانات عبر DNS"

تحوّل قواعد البيانات المتجهية النصوص إلى تضمينات عددية وتبحث عن العناصر المتشابهة وفق المسافة الهندسية في فضاء عالي الأبعاد.

البنية

مولّد التضمين النصي

يتم تحويل جميع النصوص إلى متجهات باستخدام Sentence Transformers:

class TextEmbedder:
def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None:
self._model = SentenceTransformer(model_name)
self.vector_size = self._model.get_sentence_embedding_dimension() # 384

async def embed(self, text: str) -> list[float]:
"""Runs sync model in thread to avoid blocking the event loop."""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: self._model.encode(text, normalize_embeddings=True).tolist()
)
return result

لماذا all-MiniLM-L6-v2؟

النموذجالأبعادالسرعةالجودةالحجم
all-MiniLM-L6-v2384FastGood80MB
all-mpnet-base-v2768MediumBest420MB
all-MiniLM-L12-v2384MediumBetter120MB

عند 384 بُعدًا وحجم 80MB، يوازن MiniLM-L6-v2 بين الجودة وقيود الموارد في منصة SOC تشغّل أيضًا 16 وكيل ذكاء اصطناعي ومحرك Rust وثلاث قواعد بيانات.

المجموعة 1: الذاكرة العرضية (aurora_cases)

تخزن قضايا التحقيق المغلقة لاستدعائها في التحقيقات المستقبلية.

المخطط

VectorParams(
size=384, # MiniLM-L6-v2 dimensions
distance=Distance.COSINE, # Range: 0.0 (opposite) to 1.0 (identical)
)

حمولة النقطة

{
"summary": "DNS tunneling attack via iodine tool targeting HR workstations",
"techniques": ["T1071.004", "T1048.001"],
"iocs": [{"type": "domain", "value": "t1.evil.com"}],
"cps_devices_involved": [],
"outcome": "Contained. Blocked domain at DNS firewall. Reimaged 3 workstations.",
"severity": "high",
"confidence": 0.89,
"stored_at": "2024-01-15T10:30:00Z"
}

تخزين قضية

async def store_case(self, case: ClosedCase) -> None:
embedding = await self._embedder.embed(case.summary)
point = PointStruct(
id=case.id,
vector=embedding,
payload={
"summary": case.summary,
"techniques": case.mitre,
"iocs": case.iocs,
"cps_devices_involved": case.cps_devices,
"outcome": case.outcome,
"severity": case.severity,
"confidence": case.confidence,
"stored_at": datetime.now(timezone.utc).isoformat(),
},
)
await client.upsert(collection_name="aurora_cases", points=[point])

استدعاء القضايا المتشابهة

async def recall_similar(
self, query: str, top_k: int = 5, severity_filter: str | None = None
) -> list[dict]:
embedding = await self._embedder.embed(query)

query_filter = None
if severity_filter:
query_filter = Filter(
must=[FieldCondition(key="severity", match=MatchValue(value=severity_filter))]
)

results = await client.search(
collection_name="aurora_cases",
query_vector=embedding,
limit=top_k,
query_filter=query_filter,
)
return [{"id": r.id, "score": r.score, **r.payload} for r in results]

مثال استخدام من قبل المنسق:

# During investigation: "What did we do last time we saw DNS tunneling?"
similar_cases = await episodic_memory.recall_similar(
query="DNS tunneling data exfiltration corporate network",
top_k=3,
severity_filter="high"
)
# Returns past cases with outcomes to inform current investigation

المجموعة 2: استخبارات التهديدات (aurora_threat_intel)

تخزن مؤشرات الاختراق (IOCs) مع سياق دلالي لعمليات البحث عن التشابه، مع دعم التخزين المؤقت عبر Redis TTL.

بنية ثنائية الطبقات

تخزين IOC

async def store_ioc(self, ioc: dict) -> None:
ioc_value = ioc.get("value", "")
ioc_type = ioc.get("type", "unknown")

# Redis: fast exact-match cache (1-hour TTL)
cache_key = f"aurora:ioc:{ioc_type}:{ioc_value}"
await redis.setex(cache_key, 3600, json.dumps(ioc))

# Qdrant: semantic search (persistent)
context_text = f"{ioc_type} {ioc_value} {ioc.get('context', '')}"
embedding = await embedder.embed(context_text)
point = PointStruct(id=str(uuid4()), vector=embedding, payload=ioc)
await qdrant.upsert(collection_name="aurora_threat_intel", points=[point])

إثراء IOC

تقلّل آلية البحث ثلاثية المراحل (Redis ← Qdrant ← خارجي) زمن الاستجابة:

async def enrich_ioc(self, ioc_value: str, ioc_type: str = "auto") -> dict:
# 1. Redis cache (microseconds)
cached = await redis.get(f"aurora:ioc:{ioc_type}:{ioc_value}")
if cached:
return json.loads(cached)

# 2. Qdrant semantic search (milliseconds)
results = await qdrant.search(
collection_name="aurora_threat_intel",
query_vector=await embedder.embed(f"{ioc_type} {ioc_value}"),
limit=3,
)
for result in results:
if result.payload.get("value") == ioc_value:
await redis.setex(cache_key, 3600, json.dumps(result.payload))
return result.payload

# 3. Not found — caller checks external feeds
return {"value": ioc_value, "type": ioc_type, "found": False}

البحث عن IOCs متشابهة

# Find IOCs related to a description
results = await threat_intel.search_similar_iocs(
query="C2 beacon domains used by APT29 in SolarWinds campaign",
top_k=10
)

الإعدادات

# Qdrant connection
AURORA_QDRANT__HOST=qdrant
AURORA_QDRANT__PORT=6333

# Default embedding model
# Set via TextEmbedder constructor, defaults to "all-MiniLM-L6-v2"

لماذا Qdrant بدل البدائل؟

الميزةQdrantPineconeWeaviateChromaDB
Self-hostedYesNo (SaaS)YesYes
PerformanceRust engine, fastManagedJava, slowerPython, slowest
FilteringBuilt-in payload filtersBasicGraphQLMetadata filters
PersistenceDisk + WALManagedDiskEphemeral by default
Memory~200MB baseN/A~1GB~100MB
Production-readyYesYesYesNot recommended

يوفر محرك Qdrant المبني بـ Rust الأداء المطلوب مع بقاء الحل مستضافًا ذاتيًا، وهو أمر حرج لمنصة SOC لا يمكنها إرسال البيانات الأمنية إلى خدمات SaaS خارجية.