انتقل إلى المحتوى الرئيسي

NATS JetStream & MQTT

AuroraSOC uses NATS JetStream for cross-site federation and MQTT for IoT device communication. These complement Redis Streams (internal bus) to form a complete event transport layer.

NATS JetStream

Purpose: Cross-Site Federation

Publisher Implementation

class NATSJetStreamPublisher:
def __init__(self, nats_url: str, stream_name: str = "AURORA"):
self.url = nats_url
self.stream_name = stream_name
self.nc = None
self.js = None

async def connect(self):
self.nc = await nats.connect(self.url)
self.js = self.nc.jetstream()

# Create stream if not exists
try:
await self.js.add_stream(
name=self.stream_name,
subjects=[
"AURORA.alerts",
"AURORA.ioc_sharing",
"AURORA.agent_results",
"AURORA.audit"
],
retention="limits",
max_msgs=100000,
max_age=7 * 24 * 3600 * 1_000_000_000, # 7 days in nanoseconds
)
except Exception:
pass # Stream already exists

async def publish(self, subject: str, data: dict):
ack = await self.js.publish(
subject,
json.dumps(data).encode(),
headers={"Nats-Msg-Id": str(uuid4())} # Dedup
)
return ack

Consumer Implementation

class NATSJetStreamConsumer:
async def subscribe(self, subject: str, handler: Callable):
# Durable consumer: remembers position across restarts
sub = await self.js.subscribe(
subject,
durable="aurora-site-consumer",
deliver_policy="all" # Replay from beginning
)

async for msg in sub.messages:
try:
data = json.loads(msg.data.decode())
await handler(data)
await msg.ack()
except Exception as e:
await msg.nak(delay=5) # Retry after 5s

Subject Hierarchy

SubjectPurposeData Format
AURORA.alertsAlert federationAlert JSON
AURORA.ioc_sharingIOC disseminationSTIX-like IOC JSON
AURORA.agent_resultsInvestigation sharingResult JSON
AURORA.auditCross-site auditAudit event JSON

Why NATS JetStream over Kafka?

FeatureNATS JetStreamKafka
Binary size~20MB~300MB + JVM
Memory~50MB~1GB minimum
Setup complexitySingle binary, zero configZooKeeper/KRaft + brokers
Ops overheadMinimalSignificant
ThroughputMillions/secMillions/sec
PersistenceBuilt-inBuilt-in

For AuroraSOC's federation use case (hundreds to thousands of messages/second), Kafka's complexity isn't justified.


MQTT Edge Communication

Purpose: IoT Device Bridge

MQTT Consumer (Bridge)

class MQTTEdgeConsumer:
def __init__(self, mqtt_settings, redis_publisher):
self.mqtt = mqtt_settings
self.publisher = redis_publisher

async def start(self):
client = aiomqtt.Client(
hostname=self.mqtt.host,
port=self.mqtt.port,
username=self.mqtt.username,
password=self.mqtt.password,
)

async with client:
# Subscribe to all device topics
await client.subscribe("aurora/sensors/+/telemetry")
await client.subscribe("aurora/sensors/+/alerts")
await client.subscribe("aurora/sensors/+/attestation")

async for message in client.messages:
await self._handle_message(message)

async def _handle_message(self, message):
topic_parts = message.topic.value.split("/")
device_id = topic_parts[2]
message_type = topic_parts[3]

payload = json.loads(message.payload)
payload["device_id"] = device_id
payload["message_type"] = message_type

# Auto-register device if new
if not await self._device_exists(device_id):
await self._auto_register_device(device_id, payload)

# Route to appropriate Redis stream
if message_type == "alerts":
await self.publisher.publish("aurora:cps:alerts", payload)
elif message_type == "attestation":
await self._handle_attestation(device_id, payload)
else:
await self.publisher.publish("aurora:cps:telemetry", payload)

Topic Structure

aurora/sensors/{device_id}/telemetry   - Sensor readings
aurora/sensors/{device_id}/alerts - Device-generated alerts
aurora/sensors/{device_id}/attestation - Firmware attestation data
aurora/command/{device_id}/action - Commands TO devices

The + wildcard in subscriptions matches any device ID, so new devices are automatically captured.

Device Auto-Registration

When a new device ID appears:

  1. Extract metadata from first telemetry message
  2. Create CPSDevice record in PostgreSQL
  3. Generate unique device UUID
  4. Start firmware verification flow

Why MQTT over HTTP for IoT?

FeatureMQTTHTTP
Header overhead2 bytes~500 bytes
ConnectionPersistent (keep-alive)Per-request
Battery impactMinimal (no TLS renegotiation)High
QoS guaranteeBuilt-in (0, 1, 2)Application-level
BidirectionalNative (subscribe + publish)Requires polling/WebSocket
Offline queuingQoS 1/2 with broker persistenceNot supported

For an nRF52840 with 256KB RAM on a coin cell battery, MQTT's 2-byte overhead vs. HTTP's 500+ bytes is the difference between months and days of battery life.