انتقل إلى المحتوى الرئيسي

NATS JetStream وMQTT

يستخدم AuroraSOC نظام NATS JetStream للاتحاد عبر المواقع، ويستخدم MQTT للتواصل مع أجهزة IoT. ويكمل هذان النظامان Redis Streams (الناقل الداخلي) لتشكيل طبقة نقل أحداث متكاملة.

NATS JetStream

الغرض: الاتحاد عبر المواقع

تنفيذ الناشر

class NATSJetStreamPublisher:
def __init__(self, nats_url: str, stream_name: str = "AURORA"):
self.url = nats_url
self.stream_name = stream_name
self.nc = None
self.js = None

async def connect(self):
self.nc = await nats.connect(self.url)
self.js = self.nc.jetstream()

# Create stream if not exists
try:
await self.js.add_stream(
name=self.stream_name,
subjects=[
"AURORA.alerts",
"AURORA.ioc_sharing",
"AURORA.agent_results",
"AURORA.audit"
],
retention="limits",
max_msgs=100000,
max_age=7 * 24 * 3600 * 1_000_000_000, # 7 days in nanoseconds
)
except Exception:
pass # Stream already exists

async def publish(self, subject: str, data: dict):
ack = await self.js.publish(
subject,
json.dumps(data).encode(),
headers={"Nats-Msg-Id": str(uuid4())} # Dedup
)
return ack

تنفيذ المستهلك

class NATSJetStreamConsumer:
async def subscribe(self, subject: str, handler: Callable):
# Durable consumer: remembers position across restarts
sub = await self.js.subscribe(
subject,
durable="aurora-site-consumer",
deliver_policy="all" # Replay from beginning
)

async for msg in sub.messages:
try:
data = json.loads(msg.data.decode())
await handler(data)
await msg.ack()
except Exception as e:
await msg.nak(delay=5) # Retry after 5s

هيكلية Subjects

Subjectالغرضتنسيق البيانات
AURORA.alertsاتحاد التنبيهاتAlert JSON
AURORA.ioc_sharingنشر مؤشرات الاختراقIOC JSON شبيه بـ STIX
AURORA.agent_resultsمشاركة نتائج التحقيقResult JSON
AURORA.auditتدقيق عبر المواقعAudit event JSON

لماذا NATS JetStream بدل Kafka؟

الميزةNATS JetStreamKafka
Binary size~20MB~300MB + JVM
Memory~50MB~1GB minimum
Setup complexitySingle binary, zero configZooKeeper/KRaft + brokers
Ops overheadMinimalSignificant
ThroughputMillions/secMillions/sec
PersistenceBuilt-inBuilt-in

في حالة استخدام AuroraSOC الخاصة بالاتحاد (من مئات إلى آلاف الرسائل في الثانية)، لا يبرَّر تعقيد Kafka.


MQTT Edge Communication

الغرض: جسر أجهزة IoT

مستهلك MQTT (الجسر)

class MQTTEdgeConsumer:
def __init__(self, mqtt_settings, redis_publisher):
self.mqtt = mqtt_settings
self.publisher = redis_publisher

async def start(self):
client = aiomqtt.Client(
hostname=self.mqtt.host,
port=self.mqtt.port,
username=self.mqtt.username,
password=self.mqtt.password,
)

async with client:
# Subscribe to all device topics
await client.subscribe("aurora/sensors/+/telemetry")
await client.subscribe("aurora/sensors/+/alerts")
await client.subscribe("aurora/sensors/+/attestation")

async for message in client.messages:
await self._handle_message(message)

async def _handle_message(self, message):
topic_parts = message.topic.value.split("/")
device_id = topic_parts[2]
message_type = topic_parts[3]

payload = json.loads(message.payload)
payload["device_id"] = device_id
payload["message_type"] = message_type

# Auto-register device if new
if not await self._device_exists(device_id):
await self._auto_register_device(device_id, payload)

# Route to appropriate Redis stream
if message_type == "alerts":
await self.publisher.publish("aurora:cps:alerts", payload)
elif message_type == "attestation":
await self._handle_attestation(device_id, payload)
else:
await self.publisher.publish("aurora:cps:telemetry", payload)

بنية Topic

aurora/sensors/{device_id}/telemetry - Sensor readings
aurora/sensors/{device_id}/alerts - Device-generated alerts
aurora/sensors/{device_id}/attestation - Firmware attestation data
aurora/command/{device_id}/action - Commands TO devices

يمثل المحرف العام + في الاشتراكات مطابقة لأي معرّف جهاز، لذلك يتم التقاط الأجهزة الجديدة تلقائيًا.

التسجيل التلقائي للأجهزة

عند ظهور معرّف جهاز جديد:

  1. Extract metadata from first telemetry message
  2. Create CPSDevice record in PostgreSQL
  3. Generate unique device UUID
  4. Start firmware verification flow

لماذا MQTT بدل HTTP في IoT؟

الميزةMQTTHTTP
Header overhead2 bytes~500 bytes
ConnectionPersistent (keep-alive)Per-request
Battery impactMinimal (no TLS renegotiation)High
QoS guaranteeBuilt-in (0, 1, 2)Application-level
BidirectionalNative (subscribe + publish)Requires polling/WebSocket
Offline queuingQoS 1/2 with broker persistenceNot supported

بالنسبة لجهاز nRF52840 مع ذاكرة RAM بحجم 256KB وبطارية صغيرة، فإن فارق 2 بايت في MQTT مقابل أكثر من 500 بايت في HTTP يساوي الفرق بين عمر بطارية بالأشهر أو بالأيام.