Event Publishers (Redis + NATS)
The Rust Core Engine publishes normalized events to two message systems: Redis Streams for internal consumption and NATS JetStream for cross-site federation.
Dual-Publish Architecture
Redis Publisher
Implementation
#[derive(Clone)]
pub struct RedisPublisher {
pool: Pool<RedisConnectionManager>,
}
impl RedisPublisher {
pub async fn new(url: &str) -> Result<Self> {
let manager = RedisConnectionManager::new(url)?;
let pool = Pool::builder()
.max_size(16) // 16 connections in the pool
.build(manager)
.await?;
Ok(Self { pool })
}
}
Single Event Publish
pub async fn publish_event(&self, event: &NormalizedEvent) -> Result<()> {
let mut conn = self.pool.get().await?;
let data = serde_json::to_string(event)?;
// Route to appropriate stream based on category
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};
// XADD with approximate trimming at 50K messages
redis::cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*") // Auto-generate message ID
.arg("data").arg(&data)
.arg("severity").arg(&event.severity)
.arg("source").arg(&event.source)
.arg("category").arg(&event.category)
.query_async::<_, String>(&mut conn)
.await?;
EVENTS_PUBLISHED.inc();
Ok(())
}
Batch Publish (Pipeline)
pub async fn publish_event_batch(&self, events: &[NormalizedEvent]) -> Result<()> {
let mut conn = self.pool.get().await?;
let mut pipe = redis::pipe();
for event in events {
let stream = if event.category == "cps" {
"aurora:cps:alerts"
} else {
"aurora:alerts"
};
let data = serde_json::to_string(event)?;
pipe.cmd("XADD")
.arg(stream)
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("data").arg(&data)
.arg("severity").arg(&event.severity);
}
pipe.query_async::<_, Vec<String>>(&mut conn).await?;
EVENTS_PUBLISHED.inc_by(events.len() as f64);
Ok(())
}
Pipeline batching sends all commands in a single round-trip, reducing network overhead from O(n) to O(1).
Audit Publishing
pub async fn publish_audit(
&self, agent: &str, action: &str, details: &str
) -> Result<()> {
let mut conn = self.pool.get().await?;
redis::cmd("XADD")
.arg("aurora:audit")
.arg("MAXLEN").arg("~").arg("50000")
.arg("*")
.arg("agent").arg(agent)
.arg("action").arg(action)
.arg("details").arg(details)
.arg("timestamp").arg(chrono::Utc::now().to_rfc3339())
.query_async::<_, String>(&mut conn)
.await?;
Ok(())
}
NATS JetStream Publisher
Implementation
pub struct NatsPublisher {
client: async_nats::Client,
jetstream: async_nats::jetstream::Context,
}
impl NatsPublisher {
pub async fn new(url: &str) -> Result<Self> {
let client = async_nats::connect(url).await?;
let jetstream = async_nats::jetstream::new(client.clone());
// Create AURORA stream if not exists
jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "AURORA".to_string(),
subjects: vec![
"aurora.alerts.>".to_string(),
"aurora.intel.>".to_string(),
"aurora.cps.>".to_string(),
"aurora.soar.>".to_string(),
],
retention: async_nats::jetstream::stream::RetentionPolicy::Limits,
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
..Default::default()
}).await?;
Ok(Self { client, jetstream })
}
}
Alert Federation
Only high and critical severity events are federated to other SOC sites:
pub async fn federate_alert(&self, event: &NormalizedEvent) -> Result<()> {
// Only federate high-severity events
if event.severity != "high" && event.severity != "critical" {
return Ok(());
}
let subject = format!("aurora.alerts.{}", event.category);
let data = serde_json::to_vec(event)?;
self.jetstream
.publish(subject, data.into())
.await?
.await?; // Wait for ack from JetStream
EVENTS_FEDERATED.inc();
Ok(())
}
IOC Sharing
Extracted IOCs are shared across sites for collective defense:
pub async fn share_ioc(&self, ioc_type: &str, value: &str) -> Result<()> {
let subject = format!("aurora.intel.ioc.{}", ioc_type);
let payload = serde_json::json!({
"type": ioc_type,
"value": value,
"source": "aurora-rust-core",
"timestamp": chrono::Utc::now().to_rfc3339(),
});
self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;
Ok(())
}
Attestation Result Publication
pub async fn publish_attestation_result(
&self, device_id: &str, status: &str
) -> Result<()> {
let subject = format!("aurora.cps.attestation.{}", device_id);
let payload = serde_json::json!({
"device_id": device_id,
"status": status,
"timestamp": chrono::Utc::now().to_rfc3339(),
});
self.jetstream
.publish(subject, serde_json::to_vec(&payload)?.into())
.await?
.await?;
Ok(())
}
Stream Configuration
Redis Streams
| Stream | Max Length | Data | Consumers |
|---|---|---|---|
aurora:alerts | ~50,000 | General security alerts | FastAPI, Agent Pipeline |
aurora:cps:alerts | ~50,000 | CPS/IoT device alerts | CPS Agent, FastAPI |
aurora:audit | ~50,000 | Agent audit trail | FastAPI WebSocket |
aurora:cps:attestation | ~10,000 | Attestation results | FastAPI, CPS Agent |
NATS JetStream (AURORA Stream)
| Subject Pattern | Purpose | Retention |
|---|---|---|
aurora.alerts.> | Federated alerts | 10 GB max |
aurora.intel.ioc.> | IOC sharing | 10 GB max |
aurora.cps.> | CPS events | 10 GB max |
aurora.soar.> | SOAR actions | 10 GB max |
Prometheus Metrics
lazy_static! {
static ref EVENTS_PUBLISHED: IntCounter = register_int_counter!(
"aurora_events_published_total", "Total events published to Redis"
).unwrap();
static ref EVENTS_FEDERATED: IntCounter = register_int_counter!(
"aurora_events_federated_total", "Total events federated via NATS"
).unwrap();
}
MQTT Consumer Integration
The Rust engine also consumes MQTT events from edge devices:
pub async fn consume_mqtt_events(cfg: &AppConfig, redis: RedisPublisher) -> Result<()> {
let mut mqttoptions = MqttOptions::new("aurora-rust-core", &cfg.mqtt_host, cfg.mqtt_port);
// mTLS when certificates are configured
if let (Some(ca), Some(cert), Some(key)) = (&cfg.mqtt_ca_path, &cfg.mqtt_cert_path, &cfg.mqtt_key_path) {
let ca = std::fs::read(ca)?;
let cert = std::fs::read(cert)?;
let key = std::fs::read(key)?;
mqttoptions.set_transport(Transport::tls_with_config(/* ... */));
}
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
// Subscribe to all edge device topics
client.subscribe("aurora/sensors/+/alerts", QoS::ExactlyOnce).await?;
client.subscribe("aurora/sensors/+/status", QoS::AtLeastOnce).await?;
client.subscribe("aurora/sensors/+/telemetry", QoS::AtLeastOnce).await?;
client.subscribe("aurora/attestation/+/response", QoS::ExactlyOnce).await?;
loop {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(msg))) => {
let raw = RawEvent {
source: "mqtt".into(),
format: "mqtt".into(),
payload: String::from_utf8_lossy(&msg.payload).into(),
source_ip: None,
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
};
let normalized = normalizer::normalize(&raw);
redis.publish_event(&normalized).await?;
}
Err(e) => {
tracing::error!("MQTT error: {e}");
tokio::time::sleep(Duration::from_secs(5)).await; // Reconnect backoff
}
_ => {}
}
}
}
The MQTT consumer runs as a spawned Tokio task and automatically reconnects with 5-second backoff on errors.