مُطبيع الحدث (الصدأ)
يقوم مُطبيع Rust Core Engine بتحويل الأحداث الأمنية الأولية من تنسيقات متعددة إلى بنية NormalizedEvent موحدة. وهو يتعامل مع حمولات JSON وCEF وLEEF وSyslog وMQTT بمعدل 100 ألف+ حدث في الثانية.
لماذا الصدأ للتطبيع؟
| متري | بايثون | الصدأ |
|---|---|---|
| الأحداث/الثانية | ~5K | 100 ألف+ |
| الذاكرة لكل حدث | ~2 كيلو بايت (حمل GC) | ~200 بايت (المكدس) |
| الكمون ص99 | ~5 مللي ثانية | < 1 مللي ثانية |
| أمان | أخطاء وقت التشغيل | ضمانات وقت التجميع |
بالنسبة لأحداث معالجة SOC من آلاف الأجهزة، يصبح GIL الخاص بـ Python ومجمع البيانات المهملة بمثابة اختناقات. توفر تجريدات Rust ذات التكلفة الصفرية الإنتاجية المطلوبة.
بنيان
هياكل البيانات
الحدث الخام (الإدخال)
#[derive(Deserialize)]
pub struct RawEvent {
pub source: String, // Origin system (e.g., "suricata")
pub format: String, // "json" | "cef" | "leef" | "syslog" | "mqtt"
pub payload: String, // Raw event string
pub source_ip: Option<String>,
pub timestamp_ms: Option<i64>,
}
حدث عادي (الإخراج)
#[derive(Serialize, Clone)]
pub struct NormalizedEvent {
pub id: String, // UUID v4
pub source: String,
pub severity: String, // critical | high | medium | low | info
pub category: String, // cps | network | endpoint | identity | cloud
pub title: String,
pub description: String,
pub fields: HashMap<String, String>, // Arbitrary key-value pairs
pub iocs: Vec<String>, // Extracted indicators
pub mitre_techniques: Vec<String>, // MITRE ATT&CK IDs
pub timestamp_ms: i64, // Unix timestamp in milliseconds
pub device_id: Option<String>, // CPS device ID if applicable
}
موزعو التنسيق
محلل JSON
يتعامل مع حمولات JSON المنظمة من عمليات تكامل SIEM:
fn normalize_json(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parsed: Value = serde_json::from_str(&raw.payload)
.unwrap_or(Value::Object(Map::new()));
let severity = parsed["severity"]
.as_str()
.unwrap_or("medium")
.to_lowercase();
let title = parsed["title"]
.as_str()
.or(parsed["message"].as_str())
.unwrap_or("Unknown Event")
.to_string();
// Extract device_id for CPS events
let device_id = parsed["device_id"]
.as_str()
.map(|s| s.to_string());
// Build fields map from all JSON keys
let mut fields = HashMap::new();
if let Value::Object(map) = &parsed {
for (k, v) in map {
fields.insert(k.clone(), v.to_string());
}
}
NormalizedEvent { id: Uuid::new_v4().to_string(), source: raw.source.clone(), severity, title, fields, device_id, /* ... */ }
}
محلل CEF (تنسيق الحدث المشترك).
يقوم بتوزيع سجلات CEF بنمط ArcSight:
CEF:0|Vendor|Product|Version|SignatureID|Name|Severity|Extension
fn normalize_cef(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parts: Vec<&str> = raw.payload.splitn(8, '|').collect();
// parts[5] = Name (title)
// parts[6] = Severity (0-10, mapped to low/medium/high/critical)
// parts[7] = Extension key=value pairs
let severity = map_cef_severity(parts.get(6).unwrap_or(&"5"));
let title = parts.get(5).unwrap_or(&"CEF Event").to_string();
// ...
}
fn map_cef_severity(cef_sev: &str) -> String {
match cef_sev.parse::<u8>().unwrap_or(5) {
0..=3 => "low",
4..=6 => "medium",
7..=8 => "high",
9..=10 => "critical",
_ => "medium",
}.to_string()
}
محلل LEEF (التنسيق الموسع لحدث السجل).
تنسيق IBM QRadar، مشابه لـ CEF ولكن امتدادات محددة بعلامات جدولة:
LEEF:2.0|Vendor|Product|Version|EventID|key=value\tkey=value
محلل سجل النظام
يستخرج المنشأة/الخطورة من رأس RFC 5424 PRI:
fn normalize_syslog(raw: &RawEvent, ts: i64) -> NormalizedEvent {
// Parse PRI: <priority> where priority = facility * 8 + severity
// Severity 0-7 maps to: emergency..debug
let priority = raw.payload.trim_start_matches('<')
.split('>')
.next()
.and_then(|p| p.parse::<u8>().ok())
.unwrap_or(13);
let syslog_severity = priority % 8;
let severity = match syslog_severity {
0..=2 => "critical", // Emergency, Alert, Critical
3 => "high", // Error
4 => "medium", // Warning
5..=7 => "low", // Notice, Info, Debug
_ => "medium",
};
}
محلل MQTT
تنتج حمولات أجهزة CPS/IoT دائمًا category = "cps":
fn normalize_mqtt(raw: &RawEvent, ts: i64) -> NormalizedEvent {
let parsed = serde_json::from_str::<Value>(&raw.payload)
.unwrap_or_default();
NormalizedEvent {
category: "cps".to_string(),
device_id: parsed["device_id"].as_str().map(String::from),
// ...
}
}
استخراج اللجنة الأولمبية الدولية
تقوم أداة التسوية تلقائيًا باستخراج مؤشرات التسوية من نص الحدث باستخدام التعبير العادي:
lazy_static! {
static ref IP_RE: Regex = Regex::new(
r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b"
).unwrap();
static ref HASH_RE: Regex = Regex::new(r"\b[a-fA-F0-9]{64}\b").unwrap();
static ref CVE_RE: Regex = Regex::new(r"CVE-\d{4}-\d{4,}").unwrap();
}
fn extract_iocs(text: &str) -> Vec<String> {
let mut iocs = Vec::new();
// Extract IPs (filter RFC1918 private ranges)
for m in IP_RE.find_iter(text) {
let ip = m.as_str();
if !ip.starts_with("10.") && !ip.starts_with("192.168.") && !ip.starts_with("172.") {
iocs.push(format!("ip:{}", ip));
}
}
// Extract SHA-256 hashes
for m in HASH_RE.find_iter(text) {
iocs.push(format!("sha256:{}", m.as_str()));
}
// Extract CVE identifiers
for m in CVE_RE.find_iter(text) {
iocs.push(format!("cve:{}", m.as_str()));
}
iocs
}
تصنيف الفئة
يتم تصنيف الأحداث إلى فئات بناءً على محتواها:
fn classify_category(fields: &HashMap<String, String>) -> String {
let text = fields.values()
.map(|v| v.to_lowercase())
.collect::<Vec<_>>()
.join(" ");
if text.contains("sensor") || text.contains("plc") || text.contains("scada")
|| text.contains("firmware") || text.contains("attestation") {
"cps"
} else if text.contains("firewall") || text.contains("dns") || text.contains("proxy") {
"network"
} else if text.contains("process") || text.contains("registry") || text.contains("edr") {
"endpoint"
} else if text.contains("login") || text.contains("auth") || text.contains("user") {
"identity"
} else if text.contains("aws") || text.contains("azure") || text.contains("gcp") {
"cloud"
} else {
"unknown"
}
}
نقاط نهاية استيعاب HTTP
حدث واحد
pub async fn ingest_event(
Json(req): Json<IngestRequest>,
redis: RedisPublisher,
nats: NatsPublisher,
) -> impl IntoResponse {
let event = normalizer::normalize(&RawEvent { /* ... */ });
// Publish to Redis Streams
redis.publish_event(&event).await?;
// Federate high/critical to NATS
if event.severity == "high" || event.severity == "critical" {
nats.federate_alert(&event).await?;
}
// Share extracted IOCs via NATS
for ioc in &event.iocs {
nats.share_ioc(/* ... */).await?;
}
Json(IngestResponse { id: event.id, status: "accepted", severity: event.severity, category: event.category })
}
استيعاب الدفعة
pub async fn ingest_batch(
Json(req): Json<BatchIngestRequest>,
redis: RedisPublisher,
nats: NatsPublisher,
) -> impl IntoResponse {
let mut accepted = 0;
let mut rejected = 0;
let mut errors = Vec::new();
// Normalize all events
let events: Vec<NormalizedEvent> = req.events.iter()
.map(|e| normalizer::normalize(&/* ... */))
.collect();
// Batch publish to Redis (pipeline)
redis.publish_event_batch(&events).await?;
accepted = events.len();
// Federate high-severity events
for event in &events {
if event.severity == "high" || event.severity == "critical" {
nats.federate_alert(event).await?;
}
}
Json(BatchIngestResponse { accepted, rejected, errors })
}
خصائص الأداء
| متري | قيمة |
|---|---|
| تطبيع حدث واحد | ~5μs |
| استخراج IOC (التعبير العادي) | ~ 2μs |
| تصنيف الفئة | ~1μs |
| الإجمالي لكل حدث | ~10 ميكروثانية |
| دفعة الإنتاجية | 100 ألف+ حدث/ثانية |
| الذاكرة لكل حدث | مكدس ~ 200 بايت |