انتقل إلى المحتوى الرئيسي

نظام الذاكرة (Memory System)

تُنفذ وتطبق منصة AuroraSOC هندسة ومعمارية ذاكرة ثلاثية المستويات (three-tier) تمنح وتوفر لوكلاء الذكاء الاصطناعي استدعاءً سياقياً وتذكراً للتحقيقات السابقة، واستخبارات التهديدات، وسجل المحادثات الأحدث والأخيرة. يتم تنفيذ ذلك في aurorasoc/memory/.

تصميم من ثلاث مستويات (Three-Tier Design)

لماذا ثلاث مستويات؟

بدون ذاكرة متدرجة أو مقسمة لمستويات (Without Tiered Memory)مع ذاكرة متدرجة ومقسمة لمستويات (With Tiered Memory)
يرى الوكيل ويطلع على المحادثة الحالية فقطيستدعي الوكيل ويتذكر الحالات السابقة المشابهة والمماثلة
يبدأ كل تحقيق من الصفريتعلم من تاريخ وسجل المؤسسة والميزة التراكمية التنظيمية
يتم التحقق من المؤشرات (IOCs) والمطابقة مقابل التغذيات (feeds) والموجزات الخارجية فقطقاعدة معرفة واطلاع محلية للمؤشرات (IOC) مع التشابه والمطابقة (similarity)
لا يوجد سياق أو خلفية حول الإيجابيات الكاذبة (false positives) السابقة"كان هذا النمط إيجابية كاذبة في الشهر الماضي"

المستوى 1: الذاكرة المنزلقة والمتجددة (Sliding Memory)

class SlidingMemory:
"""Recent conversation history with configurable window."""

def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
self._messages: deque = deque(maxlen=max_messages)

def add(self, message: BaseMessage):
self._messages.append(message)

def get_all(self) -> List[BaseMessage]:
return list(self._messages)
  • التخزين (Storage): في الذاكرة قيد المعالجة، قائمة deque في بايثون
  • وقت الاستجابة (Latency): ~0 مللي ثانية (ms) (وصول للذاكرة)
  • الاستمرارية والبقاء (Persistence): لا يوجد (تُفقد وتضيع عند إعادة التشغيل)
  • الغرض والهدف (Purpose): الحفاظ والإبقاء على سياق المحادثة الأخيرة والحديثة للتعليل والتفكير متعدد الأدوار (multi-turn reasoning)
  • أحجام النوافذ (Window sizes): 20 (الافتراضي default)، 30 (صائد hunter)، 40 (مستجيب responder)، 50 (محلل analyst)، 100 (منسق orchestrator)

المستوى 2: الذاكرة العرضية (Episodic Memory)

class EpisodicMemoryStore:
"""Long-term case memory using vector similarity search."""

def __init__(self, qdrant_client, collection="aurora_cases"):
self.client = qdrant_client
self.collection = collection
self.embedder = TextEmbedder() # all-MiniLM-L6-v2

async def store(self, case_summary: str, metadata: dict):
"""Store a completed investigation as a memory."""
embedding = self.embedder.embed(case_summary)
await self.client.upsert(
collection_name=self.collection,
points=[PointStruct(
id=str(uuid4()),
vector=embedding,
payload={"summary": case_summary, **metadata}
)]
)

async def recall(self, query: str, limit: int = 10) -> List[dict]:
"""Find similar past cases by semantic search."""
query_embedding = self.embedder.embed(query)
results = await self.client.search(
collection_name=self.collection,
query_vector=query_embedding,
limit=limit
)
return [hit.payload for hit in results]
  • التخزين (Storage): قاعدة بيانات متجهات (vector database) Qdrant (مجموعة aurora_cases)
  • وقت الاستجابة (Latency): ~50 مللي ثانية (ms) (الشبكة + بحث ANN)
  • الاستمرارية والبقاء (Persistence): كامل وباقٍ (مدعوم بالقرص disk-backed في Qdrant)
  • الغرض والهدف (Purpose): استدعاء وتذكر التحقيقات السابقة المشابهة والمماثلة عند تحليل إنذارات جديدة
  • نموذج التضمين (Embedding model): نموذج all-MiniLM-L6-v2 (متجهات 384-dimensional vectors)

كيف تعمل الذاكرة العرضية

المستوى 3: ذاكرة واستخبارات التهديدات (Threat Intel Memory)

class ThreatIntelMemory:
"""IOC knowledge base with vector similarity + Redis cache."""

def __init__(self, qdrant_client, redis_client):
self.qdrant = qdrant_client
self.redis = redis_client
self.collection = "aurora_threat_intel"
self.embedder = TextEmbedder()

async def search(self, query: str, limit: int = 20) -> List[dict]:
"""Search for related threat intelligence."""
# Check Redis cache first
cache_key = f"threat_intel:{hashlib.md5(query.encode()).hexdigest()}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)

# Vector search in Qdrant
embedding = self.embedder.embed(query)
results = await self.qdrant.search(
collection_name=self.collection,
query_vector=embedding,
limit=limit
)

# Cache for 1 hour
payload = [hit.payload for hit in results]
await self.redis.setex(cache_key, 3600, json.dumps(payload))

return payload
  • التخزين (Storage): قاعدة Qdrant (مجموعة aurora_threat_intel) + ذاكرة تخزين مؤقت Redis (Redis cache)
  • وقت الاستجابة (Latency): ~1 مللي ثانية (ms) (عند إيجاد الذاكرة المؤقتة cache hit) أو ~100 مللي ثانية (عند عدم الإيجاد cache miss)
  • الاستمرارية والبقاء (Persistence): كامل وباقٍ (في Qdrant) + ذاكرة مؤقتة لمدة ساعة واحدة (في Redis)
  • الغرض والهدف (Purpose): العثور وإيجاد المؤشرات (IOCs) وحملات التهديد المشابهة أو المماثلة لسياق التحليل الحالي

الذاكرة المتدرجة للوكيل TieredAgentMemory: الواجهة الموحدة الشاملة

class TieredAgentMemory(BaseMemory):
"""BeeAI-compatible memory combining all three tiers."""

def __init__(self, sliding, episodic, threat_intel):
self.sliding = sliding # المستوى 1 (Tier 1)
self.episodic = episodic # المستوى 2 (Tier 2)
self.threat_intel = threat_intel # المستوى 3 (Tier 3)

async def add(self, message: BaseMessage):
"""Add to sliding window."""
self.sliding.add(message)

async def get_context(self, query: str) -> str:
"""Combine all tiers for LLM context."""
context_parts = []

# المستوى 1: المحادثة الأخيرة والحديثة
context_parts.append(self.sliding.get_formatted())

# المستوى 2: حالات سابقة مشابهة
if self.episodic:
cases = await self.episodic.recall(query)
if cases:
context_parts.append(format_cases(cases))

# المستوى 3: استخبارات التهديدات ذات الصلة
if self.threat_intel:
intel = await self.threat_intel.search(query)
if intel:
context_parts.append(format_intel(intel))

return "\n\n".join(context_parts)

إعدادات الذاكرة المسبقة والجاهزة (Memory Presets)

PRESETS = {
"default": {"sliding": 20, "episodic": None, "intel": None},
"analyst": {"sliding": 50, "episodic": 20, "intel": True},
"hunter": {"sliding": 30, "episodic": 30, "intel": True},
"responder": {"sliding": 40, "episodic": 10, "intel": None},
"intel": {"sliding": 20, "episodic": None, "intel": 50},
"orchestrator": {"sliding": 100,"episodic": 5, "intel": None},
"cps": {"sliding": 30, "episodic": 15, "intel": True},
}
مبررات وحيثيات التصميم (Design Rationale)
  • يحتاج المنسق (Orchestrator) إلى سياق منزلق طويل (100 رسالة) لأنه يُنسق ويُدير تحقيقات متعددة الخطوات تشمل الكثير من تسليم وإحالة المهام (handoffs) ذهاباً وإياباً
  • يحتاج وكيل استخبارات التهديدات (Threat Intel agent) إلى استدعاء استخبارات بحجم كبير (50 نتيجة) ولكن دون ذاكرة عرضية (episodic memory) — فهو يعمل مع ونحو المؤشرات (IOCs)، وليس الحالات السابقة
  • يحافظ الوكلاء ذوو الإعدادات الافتراضية (Default) الذين لا يمتلكون ذاكرة عرضية أو استخباراتية، على موارد Qdrant وتوفيرها للوكلاء الذين يستفيدون بشكل حقيقي من عملية الاستدعاء (recall)

مُضمّن ومُحوّل النص (TextEmbedder)

class TextEmbedder:
"""Generate text embeddings using sentence-transformers."""

def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)

def embed(self, text: str) -> List[float]:
return self.model.encode(text).tolist()

لماذا all-MiniLM-L6-v2؟

  • نموذج صغير الحجم (80 ميغابايت) يعمل على وحدة المعالجة المركزية (CPU)
  • متجهات مكونة من 384 بُعداً (384-dimensional vectors) (تكلفة تخزين منخفضة)
  • جودة ممتازة واستثنائية لمهام التشابه والمطابقة الدلالية (semantic similarity)
  • يعالج ويعمل على حوالي 10,000 جملة/ثانية على وحدات المعالجة المركزية الحديثة
  • لا يتطلب ولا يحتاج أي وحدة معالجة رسومات (GPU) — مناسب وملاءم لعمليات النشر الطرفية (edge) وفي الحاويات (container)