Event Normalizer (Rust)
The Rust Core Engine's normalizer converts raw security events from multiple formats into a unified NormalizedEvent structure. It handles JSON, CEF, LEEF, Syslog, and MQTT payloads at 100K+ events per second.
Why Rust for Normalization?
| Metric | Python | Rust |
|---|---|---|
| Events/second | ~5K | 100K+ |
| Memory per event | ~2KB (GC overhead) | ~200 bytes (stack) |
| Latency p99 | ~5ms | <1ms |
| Safety | Runtime errors | Compile-time guarantees |
For a SOC processing events from thousands of devices, Python's GIL and garbage collector become bottlenecks. Rust's zero-cost abstractions provide the throughput needed.
Architecture
Data Structures
RawEvent (Input)
#[derive(Deserialize)]
pub struct RawEvent {
pub source: String, // Origin system (e.g., "suricata")
pub format: String, // "json" | "cef" | "leef" | "syslog" | "mqtt"
pub payload: String, // Raw event string
pub source_ip: Option<String>,
pub timestamp_ms: Option<i64>,
}
NormalizedEvent (Output)
#[derive(Serialize, Clone)]
pub struct NormalizedEvent {
pub id: String, // UUID v4
pub source: String,
pub severity: String, // critical | high | medium | low | info
pub category: String, // cps | network | endpoint | identity | cloud
pub title: String,
pub description: String,
pub fields: HashMap<String, String>, // Arbitrary key-value pairs
pub iocs: Vec<String>, // Extracted indicators
pub mitre_techniques: Vec<String>, // MITRE ATT&CK IDs
pub timestamp_ms: i64, // Unix timestamp in milliseconds
pub device_id: Option<String>, // CPS device ID if applicable
}
Format Parsers
JSON Parser
Handles structured JSON payloads from SIEM integrations:
fn normalize_json(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parsed: Value = serde_json::from_str(&raw.payload)
.unwrap_or(Value::Object(Map::new()));
let severity = parsed["severity"]
.as_str()
.unwrap_or("medium")
.to_lowercase();
let title = parsed["title"]
.as_str()
.or(parsed["message"].as_str())
.unwrap_or("Unknown Event")
.to_string();
// Extract device_id for CPS events
let device_id = parsed["device_id"]
.as_str()
.map(|s| s.to_string());
// Build fields map from all JSON keys
let mut fields = HashMap::new();
if let Value::Object(map) = &parsed {
for (k, v) in map {
fields.insert(k.clone(), v.to_string());
}
}
NormalizedEvent { id: Uuid::new_v4().to_string(), source: raw.source.clone(), severity, title, fields, device_id, /* ... */ }
}
CEF (Common Event Format) Parser
Parses ArcSight-style CEF logs:
CEF:0|Vendor|Product|Version|SignatureID|Name|Severity|Extension
fn normalize_cef(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parts: Vec<&str> = raw.payload.splitn(8, '|').collect();
// parts[5] = Name (title)
// parts[6] = Severity (0-10, mapped to low/medium/high/critical)
// parts[7] = Extension key=value pairs
let severity = map_cef_severity(parts.get(6).unwrap_or(&"5"));
let title = parts.get(5).unwrap_or(&"CEF Event").to_string();
// ...
}
fn map_cef_severity(cef_sev: &str) -> String {
match cef_sev.parse::<u8>().unwrap_or(5) {
0..=3 => "low",
4..=6 => "medium",
7..=8 => "high",
9..=10 => "critical",
_ => "medium",
}.to_string()
}
LEEF (Log Event Extended Format) Parser
IBM QRadar format, similar to CEF but tab-delimited extensions:
LEEF:2.0|Vendor|Product|Version|EventID|key=value\tkey=value
Syslog Parser
Extracts facility/severity from RFC 5424 PRI header:
fn normalize_syslog(raw: &RawEvent, ts: i64) -> NormalizedEvent {
// Parse PRI: <priority> where priority = facility * 8 + severity
// Severity 0-7 maps to: emergency..debug
let priority = raw.payload.trim_start_matches('<')
.split('>')
.next()
.and_then(|p| p.parse::<u8>().ok())
.unwrap_or(13);
let syslog_severity = priority % 8;
let severity = match syslog_severity {
0..=2 => "critical", // Emergency, Alert, Critical
3 => "high", // Error
4 => "medium", // Warning
5..=7 => "low", // Notice, Info, Debug
_ => "medium",
};
}
MQTT Parser
CPS/IoT device payloads always produce category = "cps":
fn normalize_mqtt(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parsed = serde_json::from_str::<Value>(&raw.payload)
.unwrap_or_default();
NormalizedEvent {
category: "cps".to_string(),
device_id: parsed["device_id"].as_str().map(String::from),
// ...
}
}
IOC Extraction
The normalizer automatically extracts indicators of compromise from event text using regex:
lazy_static! {
static ref IP_RE: Regex = Regex::new(
r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b"
).unwrap();
static ref HASH_RE: Regex = Regex::new(r"\b[a-fA-F0-9]{64}\b").unwrap();
static ref CVE_RE: Regex = Regex::new(r"CVE-\d{4}-\d{4,}").unwrap();
}
fn extract_iocs(text: &str) -> Vec<String> {
let mut iocs = Vec::new();
// Extract IPs (filter RFC1918 private ranges)
for m in IP_RE.find_iter(text) {
let ip = m.as_str();
if !ip.starts_with("10.") && !ip.starts_with("192.168.") && !ip.starts_with("172.") {
iocs.push(format!("ip:{}", ip));
}
}
// Extract SHA-256 hashes
for m in HASH_RE.find_iter(text) {
iocs.push(format!("sha256:{}", m.as_str()));
}
// Extract CVE identifiers
for m in CVE_RE.find_iter(text) {
iocs.push(format!("cve:{}", m.as_str()));
}
iocs
}
Category Classification
Events are classified into categories based on their content:
fn classify_category(fields: &HashMap<String, String>) -> String {
let text = fields.values()
.map(|v| v.to_lowercase())
.collect::<Vec<_>>()
.join(" ");
if text.contains("sensor") || text.contains("plc") || text.contains("scada")
|| text.contains("firmware") || text.contains("attestation") {
"cps"
} else if text.contains("firewall") || text.contains("dns") || text.contains("proxy") {
"network"
} else if text.contains("process") || text.contains("registry") || text.contains("edr") {
"endpoint"
} else if text.contains("login") || text.contains("auth") || text.contains("user") {
"identity"
} else if text.contains("aws") || text.contains("azure") || text.contains("gcp") {
"cloud"
} else {
"unknown"
}
}
HTTP Ingestion Endpoints
Single Event
pub async fn ingest_event(
Json(req): Json<IngestRequest>,
redis: RedisPublisher,
nats: NatsPublisher,
) -> impl IntoResponse {
let event = normalizer::normalize(&RawEvent { /* ... */ });
// Publish to Redis Streams
redis.publish_event(&event).await?;
// Federate high/critical to NATS
if event.severity == "high" || event.severity == "critical" {
nats.federate_alert(&event).await?;
}
// Share extracted IOCs via NATS
for ioc in &event.iocs {
nats.share_ioc(/* ... */).await?;
}
Json(IngestResponse { id: event.id, status: "accepted", severity: event.severity, category: event.category })
}
Batch Ingestion
pub async fn ingest_batch(
Json(req): Json<BatchIngestRequest>,
redis: RedisPublisher,
nats: NatsPublisher,
) -> impl IntoResponse {
let mut accepted = 0;
let mut rejected = 0;
let mut errors = Vec::new();
// Normalize all events
let events: Vec<NormalizedEvent> = req.events.iter()
.map(|e| normalizer::normalize(&/* ... */))
.collect();
// Batch publish to Redis (pipeline)
redis.publish_event_batch(&events).await?;
accepted = events.len();
// Federate high-severity events
for event in &events {
if event.severity == "high" || event.severity == "critical" {
nats.federate_alert(event).await?;
}
}
Json(BatchIngestResponse { accepted, rejected, errors })
}
Performance Characteristics
| Metric | Value |
|---|---|
| Single event normalization | ~5μs |
| IOC extraction (regex) | ~2μs |
| Category classification | ~1μs |
| Total per event | ~10μs |
| Batch throughput | 100K+ events/sec |
| Memory per event | ~200 bytes stack |