NATS JetStream & MQTT
AuroraSOC uses NATS JetStream for cross-site federation and MQTT for IoT device communication. These complement Redis Streams (internal bus) to form a complete event transport layer.
NATS JetStream
Purpose: Cross-Site Federation
Publisher Implementation
class NATSJetStreamPublisher:
def __init__(self, nats_url: str, stream_name: str = "AURORA"):
self.url = nats_url
self.stream_name = stream_name
self.nc = None
self.js = None
async def connect(self):
self.nc = await nats.connect(self.url)
self.js = self.nc.jetstream()
# Create stream if not exists
try:
await self.js.add_stream(
name=self.stream_name,
subjects=[
"AURORA.alerts",
"AURORA.ioc_sharing",
"AURORA.agent_results",
"AURORA.audit"
],
retention="limits",
max_msgs=100000,
max_age=7 * 24 * 3600 * 1_000_000_000, # 7 days in nanoseconds
)
except Exception:
pass # Stream already exists
async def publish(self, subject: str, data: dict):
ack = await self.js.publish(
subject,
json.dumps(data).encode(),
headers={"Nats-Msg-Id": str(uuid4())} # Dedup
)
return ack
Consumer Implementation
class NATSJetStreamConsumer:
async def subscribe(self, subject: str, handler: Callable):
# Durable consumer: remembers position across restarts
sub = await self.js.subscribe(
subject,
durable="aurora-site-consumer",
deliver_policy="all" # Replay from beginning
)
async for msg in sub.messages:
try:
data = json.loads(msg.data.decode())
await handler(data)
await msg.ack()
except Exception as e:
await msg.nak(delay=5) # Retry after 5s
Subject Hierarchy
| Subject | Purpose | Data Format |
|---|---|---|
AURORA.alerts | Alert federation | Alert JSON |
AURORA.ioc_sharing | IOC dissemination | STIX-like IOC JSON |
AURORA.agent_results | Investigation sharing | Result JSON |
AURORA.audit | Cross-site audit | Audit event JSON |
Why NATS JetStream over Kafka?
| Feature | NATS JetStream | Kafka |
|---|---|---|
| Binary size | ~20MB | ~300MB + JVM |
| Memory | ~50MB | ~1GB minimum |
| Setup complexity | Single binary, zero config | ZooKeeper/KRaft + brokers |
| Ops overhead | Minimal | Significant |
| Throughput | Millions/sec | Millions/sec |
| Persistence | Built-in | Built-in |
For AuroraSOC's federation use case (hundreds to thousands of messages/second), Kafka's complexity isn't justified.
MQTT Edge Communication
Purpose: IoT Device Bridge
MQTT Consumer (Bridge)
class MQTTEdgeConsumer:
def __init__(self, mqtt_settings, redis_publisher):
self.mqtt = mqtt_settings
self.publisher = redis_publisher
async def start(self):
client = aiomqtt.Client(
hostname=self.mqtt.host,
port=self.mqtt.port,
username=self.mqtt.username,
password=self.mqtt.password,
)
async with client:
# Subscribe to all device topics
await client.subscribe("aurora/sensors/+/telemetry")
await client.subscribe("aurora/sensors/+/alerts")
await client.subscribe("aurora/sensors/+/attestation")
async for message in client.messages:
await self._handle_message(message)
async def _handle_message(self, message):
topic_parts = message.topic.value.split("/")
device_id = topic_parts[2]
message_type = topic_parts[3]
payload = json.loads(message.payload)
payload["device_id"] = device_id
payload["message_type"] = message_type
# Auto-register device if new
if not await self._device_exists(device_id):
await self._auto_register_device(device_id, payload)
# Route to appropriate Redis stream
if message_type == "alerts":
await self.publisher.publish("aurora:cps:alerts", payload)
elif message_type == "attestation":
await self._handle_attestation(device_id, payload)
else:
await self.publisher.publish("aurora:cps:telemetry", payload)
Topic Structure
aurora/sensors/{device_id}/telemetry - Sensor readings
aurora/sensors/{device_id}/alerts - Device-generated alerts
aurora/sensors/{device_id}/attestation - Firmware attestation data
aurora/command/{device_id}/action - Commands TO devices
The + wildcard in subscriptions matches any device ID, so new devices are automatically captured.
Device Auto-Registration
When a new device ID appears:
- Extract metadata from first telemetry message
- Create
CPSDevicerecord in PostgreSQL - Generate unique device UUID
- Start firmware verification flow
Why MQTT over HTTP for IoT?
| Feature | MQTT | HTTP |
|---|---|---|
| Header overhead | 2 bytes | ~500 bytes |
| Connection | Persistent (keep-alive) | Per-request |
| Battery impact | Minimal (no TLS renegotiation) | High |
| QoS guarantee | Built-in (0, 1, 2) | Application-level |
| Bidirectional | Native (subscribe + publish) | Requires polling/WebSocket |
| Offline queuing | QoS 1/2 with broker persistence | Not supported |
For an nRF52840 with 256KB RAM on a coin cell battery, MQTT's 2-byte overhead vs. HTTP's 500+ bytes is the difference between months and days of battery life.