Skip to main content

Event Publishers (Redis + NATS)

The Rust Core Engine publishes normalized events to two message systems: Redis Streams for internal consumption and NATS JetStream for cross-site federation.

Dual-Publish Architecture

Redis Publisher

Implementation

#[derive(Clone)]
pub struct RedisPublisher {
pool: Pool<RedisConnectionManager>,
}

impl RedisPublisher {
pub async fn new(url: &str) -> Result<Self> {
let manager = RedisConnectionManager::new(url)?;
let pool = Pool::builder()
.max_size(16) // 16 connections in the pool
.build(manager)
.await?;
Ok(Self { pool })
}
}

Single Event Publish

pub async fn publish_event(&self, event: &NormalizedEvent) -> Result<()> {
let mut conn = self.pool.get().await?;
let data = serde_json::to_string(event)?;

// Route to appropriate stream based on category
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};

// XADD with approximate trimming at 50K messages
redis::cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*") // Auto-generate message ID
.arg("data").arg(&data)
.arg("severity").arg(&event.severity)
.arg("source").arg(&event.source)
.arg("category").arg(&event.category)
.query_async::<_, String>(&mut conn)
.await?;

EVENTS_PUBLISHED.inc();
Ok(())
}

Batch Publish (Pipeline)

pub async fn publish_event_batch(&self, events: &[NormalizedEvent]) -> Result<()> {
let mut conn = self.pool.get().await?;
let mut pipe = redis::pipe();

for event in events {
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};
let data = serde_json::to_string(event)?;

pipe.cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("data").arg(&data)
.arg("severity").arg(&event.severity);
}

pipe.query_async::<_, Vec<String>>(&mut conn).await?;
EVENTS_PUBLISHED.inc_by(events.len() as f64);
Ok(())
}

Pipeline batching sends all commands in a single round-trip, reducing network overhead from O(n) to O(1).

Audit Publishing

pub async fn publish_audit(
&self, agent: &str, action: &str, details: &str
) -> Result<()> {
let mut conn = self.pool.get().await?;
redis::cmd("XADD")
.arg("aurora:audit")
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("agent").arg(agent)
.arg("action").arg(action)
.arg("details").arg(details)
.arg("timestamp").arg(chrono::Utc::now().to_rfc3339())
.query_async::<_, String>(&mut conn)
.await?;
Ok(())
}

NATS JetStream Publisher

Implementation

pub struct NatsPublisher {
client: async_nats::Client,
jetstream: async_nats::jetstream::Context,
}

impl NatsPublisher {
pub async fn new(url: &str) -> Result<Self> {
let client = async_nats::connect(url).await?;
let jetstream = async_nats::jetstream::new(client.clone());

// Create AURORA stream if not exists
jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "AURORA".to_string(),
subjects: vec![
"aurora.alerts.>".to_string(),
"aurora.intel.>".to_string(),
"aurora.cps.>".to_string(),
"aurora.soar.>".to_string(),
],
retention: async_nats::jetstream::stream::RetentionPolicy::Limits,
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
..Default::default()
}).await?;

Ok(Self { client, jetstream })
}
}

Alert Federation

Only high and critical severity events are federated to other SOC sites:

pub async fn federate_alert(&self, event: &NormalizedEvent) -> Result<()> {
// Only federate high-severity events
if event.severity != "high" && event.severity != "critical" {
return Ok(());
}

let subject = format!("aurora.alerts.{}", event.category);
let data = serde_json::to_vec(event)?;

self.jetstream
.publish(subject, data.into())
.await?
.await?; // Wait for ack from JetStream

EVENTS_FEDERATED.inc();
Ok(())
}

IOC Sharing

Extracted IOCs are shared across sites for collective defense:

pub async fn share_ioc(&self, ioc_type: &str, value: &str) -> Result<()> {
let subject = format!("aurora.intel.ioc.{}", ioc_type);
let payload = serde_json::json!({
"type": ioc_type,
"value": value,
"source": "aurora-rust-core",
"timestamp": chrono::Utc::now().to_rfc3339(),
});

self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;

Ok(())
}

Attestation Result Publication

pub async fn publish_attestation_result(
&self, device_id: &str, status: &str
) -> Result<()> {
let subject = format!("aurora.cps.attestation.{}", device_id);
let payload = serde_json::json!({
"device_id": device_id,
"status": status,
"timestamp": chrono::Utc::now().to_rfc3339(),
});

self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;

Ok(())
}

Stream Configuration

Redis Streams

StreamMax LengthDataConsumers
aurora:alerts~50,000General security alertsFastAPI, Agent Pipeline
aurora:cps:alerts~50,000CPS/IoT device alertsCPS Agent, FastAPI
aurora:audit~50,000Agent audit trailFastAPI WebSocket
aurora:cps:attestation~10,000Attestation resultsFastAPI, CPS Agent

NATS JetStream (AURORA Stream)

Subject PatternPurposeRetention
aurora.alerts.>Federated alerts10 GB max
aurora.intel.ioc.>IOC sharing10 GB max
aurora.cps.>CPS events10 GB max
aurora.soar.>SOAR actions10 GB max

Prometheus Metrics

lazy_static! {
static ref EVENTS_PUBLISHED: IntCounter = register_int_counter!(
"aurora_events_published_total", "Total events published to Redis"
).unwrap();

static ref EVENTS_FEDERATED: IntCounter = register_int_counter!(
"aurora_events_federated_total", "Total events federated via NATS"
).unwrap();
}

MQTT Consumer Integration

The Rust engine also consumes MQTT events from edge devices:

pub async fn consume_mqtt_events(cfg: &AppConfig, redis: RedisPublisher) -> Result<()> {
let mut mqttoptions = MqttOptions::new("aurora-rust-core", &cfg.mqtt_host, cfg.mqtt_port);

// mTLS when certificates are configured
if let (Some(ca), Some(cert), Some(key)) = (&cfg.mqtt_ca_path, &cfg.mqtt_cert_path, &cfg.mqtt_key_path) {
let ca = std::fs::read(ca)?;
let cert = std::fs::read(cert)?;
let key = std::fs::read(key)?;
mqttoptions.set_transport(Transport::tls_with_config(/* ... */));
}

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);

// Subscribe to all edge device topics
client.subscribe("aurora/sensors/+/alerts", QoS::ExactlyOnce).await?;
client.subscribe("aurora/sensors/+/status", QoS::AtLeastOnce).await?;
client.subscribe("aurora/sensors/+/telemetry", QoS::AtLeastOnce).await?;
client.subscribe("aurora/attestation/+/response", QoS::ExactlyOnce).await?;

loop {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(msg))) => {
let raw = RawEvent {
source: "mqtt".into(),
format: "mqtt".into(),
payload: String::from_utf8_lossy(&msg.payload).into(),
source_ip: None,
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
};
let normalized = normalizer::normalize(&raw);
redis.publish_event(&normalized).await?;
}
Err(e) => {
tracing::error!("MQTT error: {e}");
tokio::time::sleep(Duration::from_secs(5)).await; // Reconnect backoff
}
_ => {}
}
}
}

The MQTT consumer runs as a spawned Tokio task and automatically reconnects with 5-second backoff on errors.