انتقل إلى المحتوى الرئيسي

المنسق ومرسل المهام (Orchestrator & Dispatch)

المنسق (Orchestrator) هو مركز التنسيق والمحور الأساسي لنظام وكلاء AuroraSOC المتعدد. يقوم بتوجيه وإرسال المهام إلى الوكلاء المتخصصين باستخدام بروتوكول A2A، محتويًا ومزودًا بقواطع دوائر (circuit breakers)، وتجميع وتسريب الاتصالات (connection pooling)، ومنطق إعادة المحاولة (retry logic) لضمان الموثوقية والاعتمادية.

بنية ومعمارية الإرسال (Dispatch Architecture)

قاطع الدائرة (Circuit Breaker)

يمنع قاطع الدائرة الموجود في aurorasoc/agents/orchestrator/dispatch.py الأعطال والإخفاقات المتتالية (cascading failures):

class CircuitBreaker:
CLOSED = "closed" # تشغيل وعمل طبيعي (Normal operation)
OPEN = "open" # فاشل أو متعطل، ارفض الطلبات (Failing, reject requests)
HALF_OPEN = "half_open" # اختبار الاسترداد والتعافي (Testing recovery)

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0
):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None

التحولات والانتقالات في الحالة (State Transitions)

لماذا نستخدم قواطع الدوائر (Circuit Breakers)؟

بدون قواطع الدوائر، إذا تعطل وكيل محلل البرامج الضارة (Malware Analyst):

  1. يرسل المنسق الطلب، وينتظر حتى انتهاء المهلة (30 ثانية).
  2. يعيد المحاولة 3 مرات = ضياع وإهدار 90 ثانية.
  3. في هذه الأثناء، تتراكم وتصطف 50 طلباً إضافياً.
  4. كل طلب ينتهي وقته = ضياع 75 دقيقة من الانتظار المُهدر.
  5. استنفاد الذاكرة والاتصالات = فشل وعطل متتالٍ ومتصاعد (cascading failure).

مع قواطع الدوائر:

  1. بعد 5 إخفاقات، تُفتح وتقطع الدائرة.
  2. تفشل الطلبات اللاحقة والتالية على الفور (0 مللي ثانية) مع رسالة خطأ واضحة.
  3. بعد 60 ثانية، يقوم طلب فحص أو اختبار (probe) بالتحقق مما إذا كان الوكيل قد تعافى.
  4. إذا نجح طلب الفحص، تُغلق الدائرة ويُستأنف العمل والتشغيل الطبيعي.

تجميع الاتصالات (Connection Pooling)

_agent_pool: Dict[str, aiohttp.ClientSession] = {}

async def get_agent_connection(agent_url: str) -> aiohttp.ClientSession:
if agent_url not in _agent_pool:
_agent_pool[agent_url] = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
return _agent_pool[agent_url]

لماذا تجميع الاتصالات؟ يؤدي إنشاء اتصال HTTP جديد لكل طلب A2A إلى تكبد عبء مصافحة TCP أو TCP handshake (~1-5 مللي ثانية). من خلال التجميع (pooling)، تتم إعادة استخدام الاتصالات، وبقاء أو إبقاء HTTP حياً (keep-alive) يقلل وقت الاستجابة (latency) إلى أقل من مللي ثانية للطلبات اللاحقة والتالية.

منطق إعادة المحاولة (Retry Logic)

استخدام مكتبة tenacity لإعادة المحاولة المنظمة والمرتبة:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def dispatch_to_agent(agent_url: str, task: dict) -> dict:
"""Dispatch task with exponential backoff retry."""
circuit = get_circuit_breaker(agent_url)

if not circuit.can_execute():
raise CircuitOpenError(f"Circuit open for {agent_url}")

try:
session = await get_agent_connection(agent_url)
async with session.post(f"{agent_url}/task", json=task) as resp:
result = await resp.json()
circuit.record_success()
return result
except Exception as e:
circuit.record_failure()
raise

المخطط الزمني لإعادة المحاولة (Retry Timeline)

المحاولة 1: فورية (Immediate)
└── تفشل → انتظار ثانية واحدة
المحاولة 2: بعد ثانية واحدة (After 1s)
└── تفشل → انتظار ثانيتين
المحاولة 3: بعد ثانيتين (After 2s)
└── تفشل → إثارة وطرح استثناء (Raise exception)، وتسجيل فشل وعطل في الدائرة

الإرسال والتوجيه المتوازي (Parallel Dispatch)

لتحقيقات أكثر تعقيداً تتطلب عدة وكلاء متخصصين:

async def dispatch_parallel(
tasks: List[Tuple[str, dict]]
) -> List[dict]:
"""Dispatch to multiple agents concurrently."""
results = await asyncio.gather(
*[dispatch_to_agent(url, task) for url, task in tasks],
return_exceptions=True
)

# فصل وعزل حالات النجاح عن حالات الفشل
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]

if failures:
logger.warning(f"{len(failures)} agent(s) failed in parallel dispatch")

return successes

المتوازي مقابل التسلسلي (Parallel vs. Sequential)

يقلل التوجيه والإرسال المتوازي (Parallel dispatch) وقت التحقيق من مجموع أوقات الوكلاء إلى الحد الأقصى لوقت وكيل واحد.

خادم المنسق (Orchestrator Server)

يبني خادم المنسق (aurorasoc/agents/orchestrator/server.py) أدوات التسليم والإحالة (HandoffTools) وينشر خدمة وبروتوكول A2A:

# بناء أدوات التسليم والإحالة لجميع الوكلاء المتخصصين
handoff_tools = []
for agent_type in AgentType:
if agent_type == AgentType.ORCHESTRATOR:
continue

agent_name = agent_type.value
tool = HandoffTool(
agent=agent_name,
url=settings.a2a.get_agent_url(agent_name),
description=f"تفويض وتوكيل المهمة إلى {agent_name} (Delegate task to {agent_name})",
)
handoff_tools.append(tool)

# إنشاء وتشغيل المنسق بجميع أدوات التسليم والإحالة
factory = AuroraAgentFactory()
agent = factory.create_orchestrator(handoff_tools)
serve_agent(agent, port=settings.a2a.orchestrator_port)

الدالة settings.a2a.get_agent_url(...) تطبق A2A_DISCOVERY_MODE وتدعم تجاوزات المضيف (A2A_CLIENT_HOST و A2A_<AGENT_NAME>_HOST) بحيث يعمل نفس الكود في Docker Compose و Kubernetes.

بُناة ومُنشئو الخادم (Server Builder)

الدالة والطريقة العامة والأساسية serve_agent() الموجودة في aurorasoc/agents/server_builder.py:

def serve_agent(
factory_method: Callable,
port: int,
tags: List[str]
):
"""نشر الوكيل كخادم A2A HTTP."""

# التعامل مع الإغلاق وإيقاف التشغيل الآمن والسلس
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)

# إنشاء وكيل من المصنع
agent = factory_method()

# التغليف في خادم A2A مع مدير الذاكرة LRU
server = A2AServer(
agent=agent,
memory_manager=LRUMemoryManager(max_entries=1000),
port=port,
tags=tags
)

server.serve()

لماذا LRUMemoryManager؟ تحافظ خوادم A2A على حالة المحادثة والفهم السياقي عبر طلبات متعددة. تقيد وتحدد ذاكرة التخزين المؤقت (LRU cache) استخدام الذاكرة بينما تبقي وتحتفظ بالمحادثات الأخيرة متاحة ويمكن الوصول إليها. يعني الحد max_entries=1000 أن الخادم سيتذكر آخر 1000 رسالة، أو سلاسل محادثات (conversation threads).