انتقل إلى المحتوى الرئيسي

ناشرو الأحداث (Redis + NATS)

ينشر Rust Core Engine الأحداث المقيسة على نظامين للرسائل: Redis Streams للاستهلاك الداخلي وNATS JetStream للاتحاد عبر المواقع.

بنية النشر المزدوج

ريديس الناشر

تطبيق

#[derive(Clone)]
pub struct RedisPublisher {
pool: Pool<RedisConnectionManager>,
}

impl RedisPublisher {
pub async fn new(url: &str) -> Result<Self> {
let manager = RedisConnectionManager::new(url)?;
let pool = Pool::builder()
.max_size(16) // 16 connections in the pool
.build(manager)
.await?;
Ok(Self { pool })
}
}

نشر حدث واحد

pub async fn publish_event(&self, event: &NormalizedEvent) -> Result<()> {
let mut conn = self.pool.get().await?;
let data = serde_json::to_string(event)?;

// Route to appropriate stream based on category
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};

// XADD with approximate trimming at 50K messages
redis::cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*") // Auto-generate message ID
.arg("data").arg(&data)
.arg("severity").arg(&event.severity)
.arg("source").arg(&event.source)
.arg("category").arg(&event.category)
.query_async::<_, String>(&mut conn)
.await?;

EVENTS_PUBLISHED.inc();
Ok(())
}

نشر دفعة (خط الأنابيب)

pub async fn publish_event_batch(&self, events: &[NormalizedEvent]) -> Result<()> {
let mut conn = self.pool.get().await?;
let mut pipe = redis::pipe();

for event in events {
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};
let data = serde_json::to_string(event)?;

pipe.cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("data").arg(&data)
.arg("severity").arg(&event.severity);
}

pipe.query_async::<_, Vec<String>>(&mut conn).await?;
EVENTS_PUBLISHED.inc_by(events.len() as f64);
Ok(())
}

يرسل تجميع خطوط الأنابيب كافة الأوامر في رحلة واحدة ذهابًا وإيابًا، مما يقلل حمل الشبكة من O(n) إلى O(1).

نشر التدقيق

pub async fn publish_audit(
&self, agent: &str, action: &str, details: &str
) -> Result<()> {
let mut conn = self.pool.get().await?;
redis::cmd("XADD")
.arg("aurora:audit")
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("agent").arg(agent)
.arg("action").arg(action)
.arg("details").arg(details)
.arg("timestamp").arg(chrono::Utc::now().to_rfc3339())
.query_async::<_, String>(&mut conn)
.await?;
Ok(())
}

ناتس جيت ستريم الناشر

تطبيق

pub struct NatsPublisher {
client: async_nats::Client,
jetstream: async_nats::jetstream::Context,
}

impl NatsPublisher {
pub async fn new(url: &str) -> Result<Self> {
let client = async_nats::connect(url).await?;
let jetstream = async_nats::jetstream::new(client.clone());

// Create AURORA stream if not exists
jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "AURORA".to_string(),
subjects: vec![
"aurora.alerts.>".to_string(),
"aurora.intel.>".to_string(),
"aurora.cps.>".to_string(),
"aurora.soar.>".to_string(),
],
retention: async_nats::jetstream::stream::RetentionPolicy::Limits,
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
..Default::default()
}).await?;

Ok(Self { client, jetstream })
}
}

تنبيه الاتحاد

يتم فقط توحيد الأحداث ذات الخطورة العالية والحرجة مع مواقع SOC الأخرى:

pub async fn federate_alert(&self, event: &NormalizedEvent) -> Result<()> {
// Only federate high-severity events
if event.severity != "high" && event.severity != "critical" {
return Ok(());
}

let subject = format!("aurora.alerts.{}", event.category);
let data = serde_json::to_vec(event)?;

self.jetstream
.publish(subject, data.into())
.await?
.await?; // Wait for ack from JetStream

EVENTS_FEDERATED.inc();
Ok(())
}

مشاركة اللجنة الأولمبية الدولية

تتم مشاركة شهادات النفط العالمية المستخرجة عبر المواقع للدفاع الجماعي:

pub async fn share_ioc(&self, ioc_type: &str, value: &str) -> Result<()> {
let subject = format!("aurora.intel.ioc.{}", ioc_type);
let payload = serde_json::json!({
"type": ioc_type,
"value": value,
"source": "aurora-rust-core",
"timestamp": chrono::Utc::now().to_rfc3339(),
});

self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;

Ok(())
}

نشر نتيجة الشهادة

pub async fn publish_attestation_result(
&self, device_id: &str, status: &str
) -> Result<()> {
let subject = format!("aurora.cps.attestation.{}", device_id);
let payload = serde_json::json!({
"device_id": device_id,
"status": status,
"timestamp": chrono::Utc::now().to_rfc3339(),
});

self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;

Ok(())
}

تكوين الدفق

تدفقات ريديس

تدفقأقصى طولبياناتالمستهلكين
aurora:alerts~ 50.000تنبيهات أمنية عامةFastAPI، وكيل خط الأنابيب
aurora:cps:alerts~ 50.000تنبيهات جهاز CPS/IoTوكيل CPS، FastAPI
aurora:audit~ 50.000مسار تدقيق الوكيلFastAPI WebSocket
aurora:cps:attestation~10,000نتائج التصديقFastAPI، وكيل CPS

NATS JetStream (تيار AURORA)

نمط الموضوعغايةحفظ
aurora.alerts.>التنبيهات الاتحادية10 جيجا كحد أقصى
aurora.intel.ioc.>مشاركة اللجنة الأولمبية الدولية10 جيجا كحد أقصى
aurora.cps.>أحداث CPS10 جيجا كحد أقصى
aurora.soar.>إجراءات SOAR10 جيجا كحد أقصى

مقاييس بروميثيوس

lazy_static! {
static ref EVENTS_PUBLISHED: IntCounter = register_int_counter!(
"aurora_events_published_total", "Total events published to Redis"
).unwrap();

static ref EVENTS_FEDERATED: IntCounter = register_int_counter!(
"aurora_events_federated_total", "Total events federated via NATS"
).unwrap();
}

تكامل المستهلك MQTT

يستهلك محرك Rust أيضًا أحداث MQTT من الأجهزة الطرفية:

pub async fn consume_mqtt_events(cfg: &AppConfig, redis: RedisPublisher) -> Result<()> {
let mut mqttoptions = MqttOptions::new("aurora-rust-core", &cfg.mqtt_host, cfg.mqtt_port);

// mTLS when certificates are configured
if let (Some(ca), Some(cert), Some(key)) = (&cfg.mqtt_ca_path, &cfg.mqtt_cert_path, &cfg.mqtt_key_path) {
let ca = std::fs::read(ca)?;
let cert = std::fs::read(cert)?;
let key = std::fs::read(key)?;
mqttoptions.set_transport(Transport::tls_with_config(/* ... */));
}

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);

// Subscribe to all edge device topics
client.subscribe("aurora/sensors/+/alerts", QoS::ExactlyOnce).await?;
client.subscribe("aurora/sensors/+/status", QoS::AtLeastOnce).await?;
client.subscribe("aurora/sensors/+/telemetry", QoS::AtLeastOnce).await?;
client.subscribe("aurora/attestation/+/response", QoS::ExactlyOnce).await?;

loop {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(msg))) => {
let raw = RawEvent {
source: "mqtt".into(),
format: "mqtt".into(),
payload: String::from_utf8_lossy(&msg.payload).into(),
source_ip: None,
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
};
let normalized = normalizer::normalize(&raw);
redis.publish_event(&normalized).await?;
}
Err(e) => {
tracing::error!("MQTT error: {e}");
tokio::time::sleep(Duration::from_secs(5)).await; // Reconnect backoff
}
_ => {}
}
}
}

يعمل عميل MQTT كمهمة Tokio ويعيد الاتصال تلقائيًا مع تراجع لمدة 5 ثوانٍ عن الأخطاء.