انتقل إلى المحتوى الرئيسي

مسار وسير عمل التحقيق (Investigation Workflow)

تستخدم منصة AuroraSOC فئة AgentWorkflow من مكتبة BeeAI لتكوين وإنشاء مسارات تحقيق تعتمد على وكلاء متعددين. يُعرف هذا المسار في aurorasoc/workflows/investigation.py، ويقوم بتنسيق وإدارة سلسلة من الوكلاء لإجراء تحقيقات كاملة وشاملة.

بنية ومعمارية المسار (Pipeline Architecture)

تنفيذ سير العمل (Workflow Implementation)

from beeai import AgentWorkflow

class InvestigationWorkflow:
def __init__(self, factory: AuroraAgentFactory):
self.factory = factory

async def run(self, alert: Alert) -> InvestigationReport:
# Determine if CPS agents are needed
involves_cps = self._check_cps_involvement(alert)

# Build agent pipeline
agents = [
self.factory.create_security_analyst(),
self.factory.create_threat_hunter(),
]

if involves_cps:
agents.append(self.factory.create_cps_security())

agents.append(self.factory.create_incident_responder())

# Create and execute workflow
workflow = AgentWorkflow(agents=agents)
result = await workflow.run(
input=format_alert_for_investigation(alert)
)

return parse_investigation_result(result)

def _check_cps_involvement(self, alert: Alert) -> bool:
"""Check if the alert involves CPS/IoT devices."""
cps_indicators = [
alert.source in ["mqtt", "cps_sensor", "ot_monitor"],
any(d.family in DeviceFamily for d in alert.related_devices),
alert.mitre_techniques and any(
t.startswith("T08") for t in alert.mitre_techniques
), # ICS-specific MITRE techniques
]
return any(cps_indicators)

الإدراج الديناميكي للوكلاء (Dynamic Agent Inclusion)

يتضمن سير العمل وكلاء بشكل ديناميكي بناءً على سياق الإنذار:

لماذا الإدراج الديناميكي؟ يؤدي تشغيل وكيل أمن الأنظمة السيبرانية الفيزيائية (CPS Security) على إنذارات تكنولوجيا المعلومات (IT) الخالصة إلى إهدار رموز تقييم النماذج اللغوية (LLM tokens) وزيادة وقت الاستجابة (latency). من خلال التحقق من مؤشرات CPS أولاً، فإننا نستدعي الوكلاء المتخصصين فقط في الحالات ذات الصلة.

التنفيذ خطوة بخطوة (Step-by-Step Execution)

الخطوة 1: المحلل الأمني (الفرز والتقييم المبدئي - Triage)

  • قراءة البيانات الوصفية (metadata) للإنذار والحدث الخام
  • الاستعلام من نظام SIEM عن الأحداث ذات الصلة (خلال فترة زمنية مدتها 15 دقيقة)
  • استخراج المؤشرات (IOCs) من بيانات الحدث
  • رسم الخرائط والمطابقة مع تقنيات MITRE ATT&CK
  • تعيين وتحديد تقييم الخطورة (severity)
  • المخرجات: تقرير الفرز والتقييم المبدئي مع تحديد الخطورة، والمؤشرات (IOCs)، وتعيين MITRE

الخطوة 2: صائد التهديدات (الاصطياد - Hunting)

  • يأخذ مخرجات الفرز كمدخلات
  • يبحث عن أنماط استخدام LOLBin
  • يتحقق من الانحرافات عن خط الأساس (baseline)
  • يبحث عن مؤشرات الحركة الجانبية (lateral movement)
  • يربط ويطابق مع حملات التهديد المعروفة
  • المخرجات: نتائج الاصطياد مع تقييم المخاطر

الخطوة 3 (اختياري): أمن الأنظمة السيبرانية الفيزيائية (الفيزيائي-السيبراني)

  • يستعلم من القياس عن بُعد (telemetry) لمستشعر CPS
  • يتحقق من حالة المصادقة والإثبات (attestation) للبرامج الثابتة
  • يتحقق ويبحث عن الارتباطات والعلاقات المتبادلة الفيزيائية-السيبرانية
  • يقيم الحالات الشاذة وغير الطبيعية في البروتوكول الصناعي
  • المخرجات: تقييم مخاطر CPS

الخطوة 4: المستجيب للحوادث (الاستجابة - Response)

  • يقيم ويراجع جميع الخطوات السابقة
  • يحدد إجراءات الاستجابة المطلوبة
  • ينشئ حالة (case) إذا لزم الأمر
  • يطلب موافقة بشرية للإجراءات ذات التأثير العالي
  • يُشغّل وينفذ دليل التشغيل (playbook) إذا كان قابلاً للتطبيق
  • المخرجات: خطة الاستجابة مع الإجراءات المتخذة

تدفق ومسار البيانات بين الخطوات (Data Flow Between Steps)

تصبح مخرجات كل خطوة سياق المُدخل للخطوة التالية:

يقوم مسار عمل وكيل BeeAI (AgentWorkflow) بتجميع البيانات والسياق تلقائياً:

  • يرى كل وكيل المُدخل الأصلي بالإضافة إلى جميع مخرجات الوكلاء السابقين
  • يوفر هذا سياق تحقيق كامل وشامل دون الحاجة إلى إدارة للحالة يدوياً
  • تضيف مستويات الذاكرة سياقاً ومعطيات إضافية من التحقيقات السابقة

معالجة وتسوية الأخطاء (Error Handling)

إذا فشلت أي خطوة:

  1. يتم تسجيل الخطأ مع السياق الكامل
  2. يستمر سير العمل مع تقدم الخطوات المتبقية بأفضل جهد (best-effort)
  3. يتضمن التقرير النهائي تفاصيل الخطأ
  4. يتم وضع علامة على التحقيق غير المكتمل وتسليمه للمراجعة البشرية
try:
result = await workflow.run(input=alert_context)
except AgentError as e:
logger.error(f"Investigation workflow failed at step {e.step}",
exc_info=True)
# Create case with partial results
await create_partial_case(alert, e.partial_results)
لماذا لا نتوقف عند الخطأ الأول؟

تستفيد التحقيقات الأمنية من المخرجات والنتائج الجزئية. إذا فشل صائد التهديدات (Threat Hunter) وظل المحلل الأمني (Security Analyst) ناجحاً، فقد تكون نتائج ومعطيات الفرز والتقييم المبدئي كافية وحدها لكي يتخذ المحلل إجراءً بشرياً. التوقف والتخلي بالكامل سيؤدي إلى فقدان سياق قيم للغاية.