Investigation Workflow
AuroraSOC uses BeeAI's AgentWorkflow to compose multi-agent investigation pipelines. Defined in aurorasoc/workflows/investigation.py, the workflow orchestrates a sequence of agents to perform complete investigations.
Pipeline Architecture
Workflow Implementation
from beeai import AgentWorkflow
class InvestigationWorkflow:
def __init__(self, factory: AuroraAgentFactory):
self.factory = factory
async def run(self, alert: Alert) -> InvestigationReport:
# Determine if CPS agents are needed
involves_cps = self._check_cps_involvement(alert)
# Build agent pipeline
agents = [
self.factory.create_security_analyst(),
self.factory.create_threat_hunter(),
]
if involves_cps:
agents.append(self.factory.create_cps_security())
agents.append(self.factory.create_incident_responder())
# Create and execute workflow
workflow = AgentWorkflow(agents=agents)
result = await workflow.run(
input=format_alert_for_investigation(alert)
)
return parse_investigation_result(result)
def _check_cps_involvement(self, alert: Alert) -> bool:
"""Check if the alert involves CPS/IoT devices."""
cps_indicators = [
alert.source in ["mqtt", "cps_sensor", "ot_monitor"],
any(d.family in DeviceFamily for d in alert.related_devices),
alert.mitre_techniques and any(
t.startswith("T08") for t in alert.mitre_techniques
), # ICS-specific MITRE techniques
]
return any(cps_indicators)
Dynamic Agent Inclusion
The workflow dynamically includes agents based on alert context:
Why dynamic inclusion? Running the CPS Security agent on purely IT alerts wastes LLM tokens and adds latency. By checking for CPS indicators first, we only invoke specialized agents when relevant.
Step-by-Step Execution
Step 1: Security Analyst (Triage)
- Reads alert metadata and raw event
- Queries SIEM for related events (15-minute window)
- Extracts IOCs from event data
- Maps to MITRE ATT&CK techniques
- Assigns severity assessment
- Output: Triage report with severity, IOCs, MITRE mapping
Step 2: Threat Hunter (Hunting)
- Takes triage output as input
- Searches for LOLBin usage patterns
- Checks for baseline deviations
- Looks for lateral movement indicators
- Correlates with known threat campaigns
- Output: Hunting findings with risk assessment
Step 3 (optional): CPS Security (Physical-Cyber)
- Queries CPS sensor telemetry
- Verifies firmware attestation status
- Checks for physical-cyber correlations
- Assesses industrial protocol anomalies
- Output: CPS risk assessment
Step 4: Incident Responder (Response)
- Evaluates all previous steps
- Determines required response actions
- Creates case if warranted
- Requests human approval for high-impact actions
- Triggers playbook execution if applicable
- Output: Response plan with actions taken
Data Flow Between Steps
Each step's output becomes the next step's input context:
The BeeAI AgentWorkflow automatically accumulates context:
- Each agent sees the original input plus all previous agent outputs
- This provides full investigation context without manual state management
- Memory tiers add additional context from past investigations
Error Handling
If any step fails:
- Error is logged with full context
- Workflow continues with remaining steps (best-effort)
- Final report includes error details
- Incomplete investigation flagged for human review
try:
result = await workflow.run(input=alert_context)
except AgentError as e:
logger.error(f"Investigation workflow failed at step {e.step}",
exc_info=True)
# Create case with partial results
await create_partial_case(alert, e.partial_results)
Security investigations benefit from partial results. If the Threat Hunter fails but the Security Analyst succeeded, the triage findings alone may be sufficient for an analyst to take action. Stopping entirely would lose valuable context.