Skip to main content

Orchestrator & Dispatch

The Orchestrator is the coordination hub of AuroraSOC's multi-agent system. It routes tasks to specialist agents using the A2A protocol, with circuit breakers, connection pooling, and retry logic for reliability.

Dispatch Architecture

Circuit Breaker

The circuit breaker in aurorasoc/agents/orchestrator/dispatch.py prevents cascading failures:

class CircuitBreaker:
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0
):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None

State Transitions

Why Circuit Breakers?

Without circuit breakers, if the Malware Analyst agent goes down:

  1. Orchestrator sends request, waits for timeout (30s)
  2. Retries 3 times = 90 seconds wasted
  3. Meanwhile, 50 more requests queue up
  4. Each one timeout = 75 minutes of wasted waiting
  5. Memory and connections exhausted = cascading failure

With circuit breakers:

  1. After 5 failures, circuit opens
  2. Subsequent requests fail immediately (0ms) with a clear error
  3. After 60s, one probe tests if the agent recovered
  4. If probe succeeds, circuit closes and normal operation resumes

Connection Pooling

_agent_pool: Dict[str, aiohttp.ClientSession] = {}

async def get_agent_connection(agent_url: str) -> aiohttp.ClientSession:
if agent_url not in _agent_pool:
_agent_pool[agent_url] = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
return _agent_pool[agent_url]

Why connection pooling? Creating a new HTTP connection for every A2A request incurs TCP handshake overhead (~1-5ms). With pooling, connections are reused, and HTTP keep-alive reduces latency to sub-millisecond for subsequent requests.

Retry Logic

Using the tenacity library for structured retry:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def dispatch_to_agent(agent_url: str, task: dict) -> dict:
"""Dispatch task with exponential backoff retry."""
circuit = get_circuit_breaker(agent_url)

if not circuit.can_execute():
raise CircuitOpenError(f"Circuit open for {agent_url}")

try:
session = await get_agent_connection(agent_url)
async with session.post(f"{agent_url}/task", json=task) as resp:
result = await resp.json()
circuit.record_success()
return result
except Exception as e:
circuit.record_failure()
raise

Retry Timeline

Attempt 1: Immediate
└── Fails → Wait 1s
Attempt 2: After 1s
└── Fails → Wait 2s
Attempt 3: After 2s
└── Fails → Raise exception, record circuit failure

Parallel Dispatch

For complex investigations requiring multiple specialists:

async def dispatch_parallel(
tasks: List[Tuple[str, dict]]
) -> List[dict]:
"""Dispatch to multiple agents concurrently."""
results = await asyncio.gather(
*[dispatch_to_agent(url, task) for url, task in tasks],
return_exceptions=True
)

# Separate successes from failures
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]

if failures:
logger.warning(f"{len(failures)} agent(s) failed in parallel dispatch")

return successes

Parallel vs. Sequential

Parallel dispatch reduces investigation time from the sum of agent times to the maximum single agent time.

Orchestrator Server

The orchestrator server (aurorasoc/agents/orchestrator/server.py) builds HandoffTools and deploys the A2A service:

# Build handoff tools for all specialist agents
handoff_tools = []
for agent_type in AgentType:
if agent_type != AgentType.ORCHESTRATOR:
port = getattr(settings.a2a, f"{agent_type.value}_port")
tool = HandoffTool(
agent=agent_type.value,
url=f"http://localhost:{port}",
description=f"Delegate task to {agent_type.value}"
)
handoff_tools.append(tool)

# Create and serve orchestrator with all handoff tools
factory = AuroraAgentFactory()
agent = factory.create_orchestrator(handoff_tools)
serve_agent(agent, port=settings.a2a.orchestrator_port)

Server Builder

The generic serve_agent() function in aurorasoc/agents/server_builder.py:

def serve_agent(
factory_method: Callable,
port: int,
tags: List[str]
):
"""Deploy an agent as an A2A HTTP server."""

# Handle graceful shutdown
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)

# Create agent from factory
agent = factory_method()

# Wrap in A2A server with LRU memory manager
server = A2AServer(
agent=agent,
memory_manager=LRUMemoryManager(max_entries=1000),
port=port,
tags=tags
)

server.serve()

Why LRUMemoryManager? A2A servers maintain conversation state across multiple requests. The LRU cache bounds memory usage while keeping recent conversations accessible. The max_entries=1000 limit means the server remembers the 1000 most recent conversation threads.