Skip to main content

Investigation Workflow

AuroraSOC uses BeeAI's AgentWorkflow to compose multi-agent investigation pipelines. Defined in aurorasoc/workflows/investigation.py, the workflow orchestrates a sequence of agents to perform complete investigations.

Pipeline Architecture

Workflow Implementation

from beeai import AgentWorkflow

class InvestigationWorkflow:
def __init__(self, factory: AuroraAgentFactory):
self.factory = factory

async def run(self, alert: Alert) -> InvestigationReport:
# Determine if CPS agents are needed
involves_cps = self._check_cps_involvement(alert)

# Build agent pipeline
agents = [
self.factory.create_security_analyst(),
self.factory.create_threat_hunter(),
]

if involves_cps:
agents.append(self.factory.create_cps_security())

agents.append(self.factory.create_incident_responder())

# Create and execute workflow
workflow = AgentWorkflow(agents=agents)
result = await workflow.run(
input=format_alert_for_investigation(alert)
)

return parse_investigation_result(result)

def _check_cps_involvement(self, alert: Alert) -> bool:
"""Check if the alert involves CPS/IoT devices."""
cps_indicators = [
alert.source in ["mqtt", "cps_sensor", "ot_monitor"],
any(d.family in DeviceFamily for d in alert.related_devices),
alert.mitre_techniques and any(
t.startswith("T08") for t in alert.mitre_techniques
), # ICS-specific MITRE techniques
]
return any(cps_indicators)

Dynamic Agent Inclusion

The workflow dynamically includes agents based on alert context:

Why dynamic inclusion? Running the CPS Security agent on purely IT alerts wastes LLM tokens and adds latency. By checking for CPS indicators first, we only invoke specialized agents when relevant.

Step-by-Step Execution

Step 1: Security Analyst (Triage)

  • Reads alert metadata and raw event
  • Queries SIEM for related events (15-minute window)
  • Extracts IOCs from event data
  • Maps to MITRE ATT&CK techniques
  • Assigns severity assessment
  • Output: Triage report with severity, IOCs, MITRE mapping

Step 2: Threat Hunter (Hunting)

  • Takes triage output as input
  • Searches for LOLBin usage patterns
  • Checks for baseline deviations
  • Looks for lateral movement indicators
  • Correlates with known threat campaigns
  • Output: Hunting findings with risk assessment

Step 3 (optional): CPS Security (Physical-Cyber)

  • Queries CPS sensor telemetry
  • Verifies firmware attestation status
  • Checks for physical-cyber correlations
  • Assesses industrial protocol anomalies
  • Output: CPS risk assessment

Step 4: Incident Responder (Response)

  • Evaluates all previous steps
  • Determines required response actions
  • Creates case if warranted
  • Requests human approval for high-impact actions
  • Triggers playbook execution if applicable
  • Output: Response plan with actions taken

Data Flow Between Steps

Each step's output becomes the next step's input context:

The BeeAI AgentWorkflow automatically accumulates context:

  • Each agent sees the original input plus all previous agent outputs
  • This provides full investigation context without manual state management
  • Memory tiers add additional context from past investigations

Error Handling

If any step fails:

  1. Error is logged with full context
  2. Workflow continues with remaining steps (best-effort)
  3. Final report includes error details
  4. Incomplete investigation flagged for human review
try:
result = await workflow.run(input=alert_context)
except AgentError as e:
logger.error(f"Investigation workflow failed at step {e.step}",
exc_info=True)
# Create case with partial results
await create_partial_case(alert, e.partial_results)
Why Not Stop on First Error?

Security investigations benefit from partial results. If the Threat Hunter fails but the Security Analyst succeeded, the triage findings alone may be sufficient for an analyst to take action. Stopping entirely would lose valuable context.