قاعدة البيانات المتجهية (Qdrant)
يستخدم AuroraSOC منصة Qdrant كقاعدة بيانات متجهية للذاكرة الدلالية، مما يمكّن الوكلاء من استدعاء التحقيقات السابقة المتشابهة والبحث في استخبارات التهديدات بالمعنى بدل المطابقة الحرفية.
لماذا قاعدة بيانات متجهية؟
تبحث قواعد البيانات التقليدية بالقيم المطابقة حرفيًا: WHERE ioc_value = '10.0.0.1'. لكن محللي الأمن يفكرون بالمفاهيم:
"ابحث عن قضايا سابقة تشبه هجوم حركة جانبية باستخدام PowerShell مع تهريب بيانات عبر DNS"
تحوّل قواعد البيانات المتجهية النصوص إلى تضمينات عددية وتبحث عن العناصر المتشابهة وفق المسافة الهندسية في فضاء عالي الأبعاد.
البنية
مولّد التضمين النصي
يتم تحويل جميع النصوص إلى متجهات باستخدام Sentence Transformers:
class TextEmbedder:
def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None:
self._model = SentenceTransformer(model_name)
self.vector_size = self._model.get_sentence_embedding_dimension() # 384
async def embed(self, text: str) -> list[float]:
"""Runs sync model in thread to avoid blocking the event loop."""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: self._model.encode(text, normalize_embeddings=True).tolist()
)
return result
لماذا all-MiniLM-L6-v2؟
| النموذج | الأبعاد | السرعة | الجودة | الحجم |
|---|---|---|---|---|
all-MiniLM-L6-v2 | 384 | Fast | Good | 80MB |
all-mpnet-base-v2 | 768 | Medium | Best | 420MB |
all-MiniLM-L12-v2 | 384 | Medium | Better | 120MB |
عند 384 بُعدًا وحجم 80MB، يوازن MiniLM-L6-v2 بين الجودة وقيود الموارد في منصة SOC تشغّل أيضًا 16 وكيل ذكاء اصطناعي ومحرك Rust وثلاث قواعد بيانات.
المجموعة 1: الذاكرة العرضية (aurora_cases)
تخزن قضايا التحقيق المغلقة لاستدعائها في التحقيقات المستقبلية.
المخطط
VectorParams(
size=384, # MiniLM-L6-v2 dimensions
distance=Distance.COSINE, # Range: 0.0 (opposite) to 1.0 (identical)
)
حمولة النقطة
{
"summary": "DNS tunneling attack via iodine tool targeting HR workstations",
"techniques": ["T1071.004", "T1048.001"],
"iocs": [{"type": "domain", "value": "t1.evil.com"}],
"cps_devices_involved": [],
"outcome": "Contained. Blocked domain at DNS firewall. Reimaged 3 workstations.",
"severity": "high",
"confidence": 0.89,
"stored_at": "2024-01-15T10:30:00Z"
}
تخزين قضية
async def store_case(self, case: ClosedCase) -> None:
embedding = await self._embedder.embed(case.summary)
point = PointStruct(
id=case.id,
vector=embedding,
payload={
"summary": case.summary,
"techniques": case.mitre,
"iocs": case.iocs,
"cps_devices_involved": case.cps_devices,
"outcome": case.outcome,
"severity": case.severity,
"confidence": case.confidence,
"stored_at": datetime.now(timezone.utc).isoformat(),
},
)
await client.upsert(collection_name="aurora_cases", points=[point])
استدعاء القضايا المتشابهة
async def recall_similar(
self, query: str, top_k: int = 5, severity_filter: str | None = None
) -> list[dict]:
embedding = await self._embedder.embed(query)
query_filter = None
if severity_filter:
query_filter = Filter(
must=[FieldCondition(key="severity", match=MatchValue(value=severity_filter))]
)
results = await client.search(
collection_name="aurora_cases",
query_vector=embedding,
limit=top_k,
query_filter=query_filter,
)
return [{"id": r.id, "score": r.score, **r.payload} for r in results]
مثال استخدام من قبل المنسق:
# During investigation: "What did we do last time we saw DNS tunneling?"
similar_cases = await episodic_memory.recall_similar(
query="DNS tunneling data exfiltration corporate network",
top_k=3,
severity_filter="high"
)
# Returns past cases with outcomes to inform current investigation
المجموعة 2: استخبارات التهديدات (aurora_threat_intel)
تخزن مؤشرات الاختراق (IOCs) مع سياق دلالي لعمليات البحث عن التشابه، مع دعم التخزين المؤقت عبر Redis TTL.
بنية ثنائية الطبقات
تخزين IOC
async def store_ioc(self, ioc: dict) -> None:
ioc_value = ioc.get("value", "")
ioc_type = ioc.get("type", "unknown")
# Redis: fast exact-match cache (1-hour TTL)
cache_key = f"aurora:ioc:{ioc_type}:{ioc_value}"
await redis.setex(cache_key, 3600, json.dumps(ioc))
# Qdrant: semantic search (persistent)
context_text = f"{ioc_type} {ioc_value} {ioc.get('context', '')}"
embedding = await embedder.embed(context_text)
point = PointStruct(id=str(uuid4()), vector=embedding, payload=ioc)
await qdrant.upsert(collection_name="aurora_threat_intel", points=[point])
إثراء IOC
تقلّل آلية البحث ثلاثية المراحل (Redis ← Qdrant ← خارجي) زمن الاستجابة:
async def enrich_ioc(self, ioc_value: str, ioc_type: str = "auto") -> dict:
# 1. Redis cache (microseconds)
cached = await redis.get(f"aurora:ioc:{ioc_type}:{ioc_value}")
if cached:
return json.loads(cached)
# 2. Qdrant semantic search (milliseconds)
results = await qdrant.search(
collection_name="aurora_threat_intel",
query_vector=await embedder.embed(f"{ioc_type} {ioc_value}"),
limit=3,
)
for result in results:
if result.payload.get("value") == ioc_value:
await redis.setex(cache_key, 3600, json.dumps(result.payload))
return result.payload
# 3. Not found — caller checks external feeds
return {"value": ioc_value, "type": ioc_type, "found": False}
البحث عن IOCs متشابهة
# Find IOCs related to a description
results = await threat_intel.search_similar_iocs(
query="C2 beacon domains used by APT29 in SolarWinds campaign",
top_k=10
)
الإعدادات
# Qdrant connection
AURORA_QDRANT__HOST=qdrant
AURORA_QDRANT__PORT=6333
# Default embedding model
# Set via TextEmbedder constructor, defaults to "all-MiniLM-L6-v2"
لماذا Qdrant بدل البدائل؟
| الميزة | Qdrant | Pinecone | Weaviate | ChromaDB |
|---|---|---|---|---|
| Self-hosted | Yes | No (SaaS) | Yes | Yes |
| Performance | Rust engine, fast | Managed | Java, slower | Python, slowest |
| Filtering | Built-in payload filters | Basic | GraphQL | Metadata filters |
| Persistence | Disk + WAL | Managed | Disk | Ephemeral by default |
| Memory | ~200MB base | N/A | ~1GB | ~100MB |
| Production-ready | Yes | Yes | Yes | Not recommended |
يوفر محرك Qdrant المبني بـ Rust الأداء المطلوب مع بقاء الحل مستضافًا ذاتيًا، وهو أمر حرج لمنصة SOC لا يمكنها إرسال البيانات الأمنية إلى خدمات SaaS خارجية.