Skip to main content

Orchestrator & Dispatch

The Orchestrator is the coordination hub of AuroraSOC's multi-agent system. It routes tasks to specialist agents using the A2A protocol, with circuit breakers, connection pooling, and retry logic for reliability.

The API endpoints /api/v1/orchestrator/dispatch and /api/v1/network-analyzer/analyze route through this dispatch layer, ensuring investigations execute through agent-to-agent boundaries and MCP tool controls instead of direct model calls.

Task Envelope Contract

All new agent task envelopes are validated in aurorasoc/services/dispatch_router.py before they are published to Redis Streams. The contract keeps task dispatch auditable and prevents malformed payloads from reaching specialist agents.

Required fields:

FieldRule
task_idNon-empty string. Generated by the router when the caller does not provide one.
agent_typeNon-empty specialist type or orchestrator type.
priorityInteger from 1 to 10; defaults to 5.

Optional correlation fields such as assignment_id, assigned_agent_id, assigned_replica_id, preferred_site_id, and traceparent are stripped of surrounding whitespace and omitted when empty. Extra context fields, including case_id, alert_id, and network_attack_id, are preserved without allowing them to overwrite router-owned keys.

The dashboard can rely on priority always being present in router-built envelopes, and the worker can round-trip the correlation keys into result receipts for case and network attack timelines.

Dispatch Architecture

Circuit Breaker

The circuit breaker in aurorasoc/agents/orchestrator/dispatch.py prevents cascading failures:

class CircuitBreaker:
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0
):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None

State Transitions

Why Circuit Breakers?

Without circuit breakers, if the Malware Analyst agent goes down:

  1. Orchestrator sends request, waits for timeout (30s)
  2. Retries 3 times = 90 seconds wasted
  3. Meanwhile, 50 more requests queue up
  4. Each one timeout = 75 minutes of wasted waiting
  5. Memory and connections exhausted = cascading failure

With circuit breakers:

  1. After 5 failures, circuit opens
  2. Subsequent requests fail immediately (0ms) with a clear error
  3. After 60s, one probe tests if the agent recovered
  4. If probe succeeds, circuit closes and normal operation resumes

Connection Pooling

_agent_pool: Dict[str, aiohttp.ClientSession] = {}

async def get_agent_connection(agent_url: str) -> aiohttp.ClientSession:
if agent_url not in _agent_pool:
_agent_pool[agent_url] = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
return _agent_pool[agent_url]

Why connection pooling? Creating a new HTTP connection for every A2A request incurs TCP handshake overhead (~1-5ms). With pooling, connections are reused, and HTTP keep-alive reduces latency to sub-millisecond for subsequent requests.

Retry Logic

Using the tenacity library for structured retry:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def dispatch_to_agent(agent_url: str, task: dict) -> dict:
"""Dispatch task with exponential backoff retry."""
circuit = get_circuit_breaker(agent_url)

if not circuit.can_execute():
raise CircuitOpenError(f"Circuit open for {agent_url}")

try:
session = await get_agent_connection(agent_url)
async with session.post(f"{agent_url}/task", json=task) as resp:
result = await resp.json()
circuit.record_success()
return result
except Exception as e:
circuit.record_failure()
raise

Retry Timeline

Attempt 1: Immediate
└── Fails → Wait 1s
Attempt 2: After 1s
└── Fails → Wait 2s
Attempt 3: After 2s
└── Fails → Raise exception, record circuit failure

Parallel Dispatch

For complex investigations requiring multiple specialists:

async def dispatch_parallel(
tasks: List[Tuple[str, dict]]
) -> List[dict]:
"""Dispatch to multiple agents concurrently."""
results = await asyncio.gather(
*[dispatch_to_agent(url, task) for url, task in tasks],
return_exceptions=True
)

# Separate successes from failures
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]

if failures:
logger.warning(f"{len(failures)} agent(s) failed in parallel dispatch")

return successes

Parallel vs. Sequential

Parallel dispatch reduces investigation time from the sum of agent times to the maximum single agent time.

Orchestrator Server

The orchestrator server (aurorasoc/agents/orchestrator/server.py) builds HandoffTools and deploys the A2A service:

from aurorasoc.agents.catalog import LIVE_SPECIALIST_AGENT_TYPES

handoff_tools = []
for agent_type in LIVE_SPECIALIST_AGENT_TYPES:
a2a_agent = A2AAgent(
url=settings.a2a.get_agent_url(agent_type),
memory=create_tiered_memory(agent_type, LIGHTWEIGHT_MEMORY),
)
handoff_tools.append(
HandoffTool(
target=a2a_agent,
name=f"delegate_to_{agent_type}",
description=f"Delegate investigation task to {agent_type} specialist agent",
)
)

server = await build_orchestrator_server()
server.serve()

settings.a2a.get_agent_url(...) applies A2A_DISCOVERY_MODE and optional host overrides (A2A_CLIENT_HOST, A2A_<AGENT_NAME>_HOST) so the same code works in both Docker Compose and Kubernetes.

The orchestrator no longer discovers specialists from AgentType directly. The runtime now derives its handoff list from the same shared live specialist catalog used by the factory, startup health probes, and fleet documentation, so consolidated legacy roles cannot drift back into the orchestrator mesh.

Server Builder

The generic serve_agent() function in aurorasoc/agents/server_builder.py:

def serve_agent(
factory_method: Callable,
port: int,
tags: List[str]
):
"""Deploy an agent as an A2A HTTP server."""

# Handle graceful shutdown
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)

# Create agent from factory
agent = factory_method()

# Wrap in A2A server with LRU memory manager
server = A2AServer(
agent=agent,
memory_manager=LRUMemoryManager(max_entries=1000),
port=port,
tags=tags
)

server.serve()

Why LRUMemoryManager? A2A servers maintain conversation state across multiple requests. The LRU cache bounds memory usage while keeping recent conversations accessible. The max_entries=1000 limit means the server remembers the 1000 most recent conversation threads.