ناشرو الأحداث (Redis + NATS)
ينشر Rust Core Engine الأحداث المقيسة على نظامين للرسائل: Redis Streams للاستهلاك الداخلي وNATS JetStream للاتحاد عبر المواقع.
بنية النشر المزدوج
ريديس الناشر
تطبيق
#[derive(Clone)]
pub struct RedisPublisher {
pool: Pool<RedisConnectionManager>,
}
impl RedisPublisher {
pub async fn new(url: &str) -> Result<Self> {
let manager = RedisConnectionManager::new(url)?;
let pool = Pool::builder()
.max_size(16) // 16 connections in the pool
.build(manager)
.await?;
Ok(Self { pool })
}
}
نشر حدث واحد
pub async fn publish_event(&self, event: &NormalizedEvent) -> Result<()> {
let mut conn = self.pool.get().await?;
let data = serde_json::to_string(event)?;
// Route to appropriate stream based on category
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};
// XADD with approximate trimming at 50K messages
redis::cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*") // Auto-generate message ID
.arg("data").arg(&data)
.arg("severity").arg(&event.severity)
.arg("source").arg(&event.source)
.arg("category").arg(&event.category)
.query_async::<_, String>(&mut conn)
.await?;
EVENTS_PUBLISHED.inc();
Ok(())
}
نشر دفعة (خط الأنابيب)
pub async fn publish_event_batch(&self, events: &[NormalizedEvent]) -> Result<()> {
let mut conn = self.pool.get().await?;
let mut pipe = redis::pipe();
for event in events {
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};
let data = serde_json::to_string(event)?;
pipe.cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("data").arg(&data)
.arg("severity").arg(&event.severity);
}
pipe.query_async::<_, Vec<String>>(&mut conn).await?;
EVENTS_PUBLISHED.inc_by(events.len() as f64);
Ok(())
}
يرسل تجميع خطوط الأنابيب كافة الأوامر في رحلة واحدة ذهابًا وإيابًا، مما يقلل حمل الشبكة من O(n) إلى O(1).
نشر التدقيق
pub async fn publish_audit(
&self, agent: &str, action: &str, details: &str
) -> Result<()> {
let mut conn = self.pool.get().await?;
redis::cmd("XADD")
.arg("aurora:audit")
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("agent").arg(agent)
.arg("action").arg(action)
.arg("details").arg(details)
.arg("timestamp").arg(chrono::Utc::now().to_rfc3339())
.query_async::<_, String>(&mut conn)
.await?;
Ok(())
}
ناتس جيت ستريم الناشر
تطبيق
pub struct NatsPublisher {
client: async_nats::Client,
jetstream: async_nats::jetstream::Context,
}
impl NatsPublisher {
pub async fn new(url: &str) -> Result<Self> {
let client = async_nats::connect(url).await?;
let jetstream = async_nats::jetstream::new(client.clone());
// Create AURORA stream if not exists
jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "AURORA".to_string(),
subjects: vec![
"aurora.alerts.>".to_string(),
"aurora.intel.>".to_string(),
"aurora.cps.>".to_string(),
"aurora.soar.>".to_string(),
],
retention: async_nats::jetstream::stream::RetentionPolicy::Limits,
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
..Default::default()
}).await?;
Ok(Self { client, jetstream })
}
}
تنبيه الاتحاد
يتم فقط توحيد الأحداث ذات الخطورة العالية والحرجة مع مواقع SOC الأخرى:
pub async fn federate_alert(&self, event: &NormalizedEvent) -> Result<()> {
// Only federate high-severity events
if event.severity != "high" && event.severity != "critical" {
return Ok(());
}
let subject = format!("aurora.alerts.{}", event.category);
let data = serde_json::to_vec(event)?;
self.jetstream
.publish(subject, data.into())
.await?
.await?; // Wait for ack from JetStream
EVENTS_FEDERATED.inc();
Ok(())
}
مشاركة اللجنة الأولمبية الدولية
تتم مشاركة شهادات النفط العالمية المستخرجة عبر المواقع للدفاع الجماعي:
pub async fn share_ioc(&self, ioc_type: &str, value: &str) -> Result<()> {
let subject = format!("aurora.intel.ioc.{}", ioc_type);
let payload = serde_json::json!({
"type": ioc_type,
"value": value,
"source": "aurora-rust-core",
"timestamp": chrono::Utc::now().to_rfc3339(),
});
self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;
Ok(())
}
نشر نتيجة الشهادة
pub async fn publish_attestation_result(
&self, device_id: &str, status: &str
) -> Result<()> {
let subject = format!("aurora.cps.attestation.{}", device_id);
let payload = serde_json::json!({
"device_id": device_id,
"status": status,
"timestamp": chrono::Utc::now().to_rfc3339(),
});
self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;
Ok(())
}
تكوين الدفق
تدفقات ريديس
| تدفق | أقصى طول | بيانات | المستهلكين |
|---|---|---|---|
aurora:alerts | ~ 50.000 | تنبيهات أمنية عامة | FastAPI، وكيل خط الأنابيب |
aurora:cps:alerts | ~ 50.000 | تنبيهات جهاز CPS/IoT | وكيل CPS، FastAPI |
aurora:audit | ~ 50.000 | مسار تدقيق الوكيل | FastAPI WebSocket |
aurora:cps:attestation | ~10,000 | نتائج التصديق | FastAPI، وكيل CPS |
NATS JetStream (تيار AURORA)
| نمط الموضوع | غاية | حفظ |
|---|---|---|
aurora.alerts.> | التنبيهات الاتحادية | 10 جيجا كحد أقصى |
aurora.intel.ioc.> | مشاركة اللجنة الأولمبية الدولية | 10 جيجا كحد أقصى |
aurora.cps.> | أحداث CPS | 10 جيجا كحد أقصى |
aurora.soar.> | إجراءات SOAR | 10 جيجا كحد أقصى |
مقاييس بروميثيوس
lazy_static! {
static ref EVENTS_PUBLISHED: IntCounter = register_int_counter!(
"aurora_events_published_total", "Total events published to Redis"
).unwrap();
static ref EVENTS_FEDERATED: IntCounter = register_int_counter!(
"aurora_events_federated_total", "Total events federated via NATS"
).unwrap();
}
تكامل المستهلك MQTT
يستهلك محرك Rust أيضًا أحداث MQTT من الأجهزة الطرفية:
pub async fn consume_mqtt_events(cfg: &AppConfig, redis: RedisPublisher) -> Result<()> {
let mut mqttoptions = MqttOptions::new("aurora-rust-core", &cfg.mqtt_host, cfg.mqtt_port);
// mTLS when certificates are configured
if let (Some(ca), Some(cert), Some(key)) = (&cfg.mqtt_ca_path, &cfg.mqtt_cert_path, &cfg.mqtt_key_path) {
let ca = std::fs::read(ca)?;
let cert = std::fs::read(cert)?;
let key = std::fs::read(key)?;
mqttoptions.set_transport(Transport::tls_with_config(/* ... */));
}
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
// Subscribe to all edge device topics
client.subscribe("aurora/sensors/+/alerts", QoS::ExactlyOnce).await?;
client.subscribe("aurora/sensors/+/status", QoS::AtLeastOnce).await?;
client.subscribe("aurora/sensors/+/telemetry", QoS::AtLeastOnce).await?;
client.subscribe("aurora/attestation/+/response", QoS::ExactlyOnce).await?;
loop {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(msg))) => {
let raw = RawEvent {
source: "mqtt".into(),
format: "mqtt".into(),
payload: String::from_utf8_lossy(&msg.payload).into(),
source_ip: None,
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
};
let normalized = normalizer::normalize(&raw);
redis.publish_event(&normalized).await?;
}
Err(e) => {
tracing::error!("MQTT error: {e}");
tokio::time::sleep(Duration::from_secs(5)).await; // Reconnect backoff
}
_ => {}
}
}
}
يعمل عميل MQTT كمهمة Tokio ويعيد الاتصال تلقائيًا مع تراجع لمدة 5 ثوانٍ عن الأخطاء.