انتقل إلى المحتوى الرئيسي

Event Normalizer (Rust)

The Rust Core Engine's normalizer converts raw security events from multiple formats into a unified NormalizedEvent structure. It handles JSON, CEF, LEEF, Syslog, and MQTT payloads at 100K+ events per second.

Why Rust for Normalization?

MetricPythonRust
Events/second~5K100K+
Memory per event~2KB (GC overhead)~200 bytes (stack)
Latency p99~5ms<1ms
SafetyRuntime errorsCompile-time guarantees

For a SOC processing events from thousands of devices, Python's GIL and garbage collector become bottlenecks. Rust's zero-cost abstractions provide the throughput needed.

Architecture

Data Structures

RawEvent (Input)

#[derive(Deserialize)]
pub struct RawEvent {
pub source: String, // Origin system (e.g., "suricata")
pub format: String, // "json" | "cef" | "leef" | "syslog" | "mqtt"
pub payload: String, // Raw event string
pub source_ip: Option<String>,
pub timestamp_ms: Option<i64>,
}

NormalizedEvent (Output)

#[derive(Serialize, Clone)]
pub struct NormalizedEvent {
pub id: String, // UUID v4
pub source: String,
pub severity: String, // critical | high | medium | low | info
pub category: String, // cps | network | endpoint | identity | cloud
pub title: String,
pub description: String,
pub fields: HashMap<String, String>, // Arbitrary key-value pairs
pub iocs: Vec<String>, // Extracted indicators
pub mitre_techniques: Vec<String>, // MITRE ATT&CK IDs
pub timestamp_ms: i64, // Unix timestamp in milliseconds
pub device_id: Option<String>, // CPS device ID if applicable
}

Format Parsers

JSON Parser

Handles structured JSON payloads from SIEM integrations:

fn normalize_json(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parsed: Value = serde_json::from_str(&raw.payload)
.unwrap_or(Value::Object(Map::new()));

let severity = parsed["severity"]
.as_str()
.unwrap_or("medium")
.to_lowercase();

let title = parsed["title"]
.as_str()
.or(parsed["message"].as_str())
.unwrap_or("Unknown Event")
.to_string();

// Extract device_id for CPS events
let device_id = parsed["device_id"]
.as_str()
.map(|s| s.to_string());

// Build fields map from all JSON keys
let mut fields = HashMap::new();
if let Value::Object(map) = &parsed {
for (k, v) in map {
fields.insert(k.clone(), v.to_string());
}
}

NormalizedEvent { id: Uuid::new_v4().to_string(), source: raw.source.clone(), severity, title, fields, device_id, /* ... */ }
}

CEF (Common Event Format) Parser

Parses ArcSight-style CEF logs:

CEF:0|Vendor|Product|Version|SignatureID|Name|Severity|Extension
fn normalize_cef(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parts: Vec<&str> = raw.payload.splitn(8, '|').collect();
// parts[5] = Name (title)
// parts[6] = Severity (0-10, mapped to low/medium/high/critical)
// parts[7] = Extension key=value pairs

let severity = map_cef_severity(parts.get(6).unwrap_or(&"5"));
let title = parts.get(5).unwrap_or(&"CEF Event").to_string();
// ...
}

fn map_cef_severity(cef_sev: &str) -> String {
match cef_sev.parse::<u8>().unwrap_or(5) {
0..=3 => "low",
4..=6 => "medium",
7..=8 => "high",
9..=10 => "critical",
_ => "medium",
}.to_string()
}

LEEF (Log Event Extended Format) Parser

IBM QRadar format, similar to CEF but tab-delimited extensions:

LEEF:2.0|Vendor|Product|Version|EventID|key=value\tkey=value

Syslog Parser

Extracts facility/severity from RFC 5424 PRI header:

fn normalize_syslog(raw: &RawEvent, ts: i64) -> NormalizedEvent {
// Parse PRI: <priority> where priority = facility * 8 + severity
// Severity 0-7 maps to: emergency..debug
let priority = raw.payload.trim_start_matches('<')
.split('>')
.next()
.and_then(|p| p.parse::<u8>().ok())
.unwrap_or(13);

let syslog_severity = priority % 8;
let severity = match syslog_severity {
0..=2 => "critical", // Emergency, Alert, Critical
3 => "high", // Error
4 => "medium", // Warning
5..=7 => "low", // Notice, Info, Debug
_ => "medium",
};
}

MQTT Parser

CPS/IoT device payloads always produce category = "cps":

fn normalize_mqtt(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parsed = serde_json::from_str::<Value>(&raw.payload)
.unwrap_or_default();

NormalizedEvent {
category: "cps".to_string(),
device_id: parsed["device_id"].as_str().map(String::from),
// ...
}
}

IOC Extraction

The normalizer automatically extracts indicators of compromise from event text using regex:

lazy_static! {
static ref IP_RE: Regex = Regex::new(
r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b"
).unwrap();

static ref HASH_RE: Regex = Regex::new(r"\b[a-fA-F0-9]{64}\b").unwrap();
static ref CVE_RE: Regex = Regex::new(r"CVE-\d{4}-\d{4,}").unwrap();
}

fn extract_iocs(text: &str) -> Vec<String> {
let mut iocs = Vec::new();

// Extract IPs (filter RFC1918 private ranges)
for m in IP_RE.find_iter(text) {
let ip = m.as_str();
if !ip.starts_with("10.") && !ip.starts_with("192.168.") && !ip.starts_with("172.") {
iocs.push(format!("ip:{}", ip));
}
}

// Extract SHA-256 hashes
for m in HASH_RE.find_iter(text) {
iocs.push(format!("sha256:{}", m.as_str()));
}

// Extract CVE identifiers
for m in CVE_RE.find_iter(text) {
iocs.push(format!("cve:{}", m.as_str()));
}

iocs
}

Category Classification

Events are classified into categories based on their content:

fn classify_category(fields: &HashMap<String, String>) -> String {
let text = fields.values()
.map(|v| v.to_lowercase())
.collect::<Vec<_>>()
.join(" ");

if text.contains("sensor") || text.contains("plc") || text.contains("scada")
|| text.contains("firmware") || text.contains("attestation") {
"cps"
} else if text.contains("firewall") || text.contains("dns") || text.contains("proxy") {
"network"
} else if text.contains("process") || text.contains("registry") || text.contains("edr") {
"endpoint"
} else if text.contains("login") || text.contains("auth") || text.contains("user") {
"identity"
} else if text.contains("aws") || text.contains("azure") || text.contains("gcp") {
"cloud"
} else {
"unknown"
}
}

HTTP Ingestion Endpoints

Single Event

pub async fn ingest_event(
Json(req): Json<IngestRequest>,
redis: RedisPublisher,
nats: NatsPublisher,
) -> impl IntoResponse {
let event = normalizer::normalize(&RawEvent { /* ... */ });

// Publish to Redis Streams
redis.publish_event(&event).await?;

// Federate high/critical to NATS
if event.severity == "high" || event.severity == "critical" {
nats.federate_alert(&event).await?;
}

// Share extracted IOCs via NATS
for ioc in &event.iocs {
nats.share_ioc(/* ... */).await?;
}

Json(IngestResponse { id: event.id, status: "accepted", severity: event.severity, category: event.category })
}

Batch Ingestion

pub async fn ingest_batch(
Json(req): Json<BatchIngestRequest>,
redis: RedisPublisher,
nats: NatsPublisher,
) -> impl IntoResponse {
let mut accepted = 0;
let mut rejected = 0;
let mut errors = Vec::new();

// Normalize all events
let events: Vec<NormalizedEvent> = req.events.iter()
.map(|e| normalizer::normalize(&/* ... */))
.collect();

// Batch publish to Redis (pipeline)
redis.publish_event_batch(&events).await?;
accepted = events.len();

// Federate high-severity events
for event in &events {
if event.severity == "high" || event.severity == "critical" {
nats.federate_alert(event).await?;
}
}

Json(BatchIngestResponse { accepted, rejected, errors })
}

Performance Characteristics

MetricValue
Single event normalization~5μs
IOC extraction (regex)~2μs
Category classification~1μs
Total per event~10μs
Batch throughput100K+ events/sec
Memory per event~200 bytes stack