NATS JetStream وMQTT
يستخدم AuroraSOC نظام NATS JetStream للاتحاد عبر المواقع، ويستخدم MQTT للتواصل مع أجهزة IoT. ويكمل هذان النظامان Redis Streams (الناقل الداخلي) لتشكيل طبقة نقل أحداث متكاملة.
NATS JetStream
الغرض: الاتحاد عبر المواقع
تنفيذ الناشر
class NATSJetStreamPublisher:
def __init__(self, nats_url: str, stream_name: str = "AURORA"):
self.url = nats_url
self.stream_name = stream_name
self.nc = None
self.js = None
async def connect(self):
self.nc = await nats.connect(self.url)
self.js = self.nc.jetstream()
# Create stream if not exists
try:
await self.js.add_stream(
name=self.stream_name,
subjects=[
"AURORA.alerts",
"AURORA.ioc_sharing",
"AURORA.agent_results",
"AURORA.audit"
],
retention="limits",
max_msgs=100000,
max_age=7 * 24 * 3600 * 1_000_000_000, # 7 days in nanoseconds
)
except Exception:
pass # Stream already exists
async def publish(self, subject: str, data: dict):
ack = await self.js.publish(
subject,
json.dumps(data).encode(),
headers={"Nats-Msg-Id": str(uuid4())} # Dedup
)
return ack
تنفيذ المستهلك
class NATSJetStreamConsumer:
async def subscribe(self, subject: str, handler: Callable):
# Durable consumer: remembers position across restarts
sub = await self.js.subscribe(
subject,
durable="aurora-site-consumer",
deliver_policy="all" # Replay from beginning
)
async for msg in sub.messages:
try:
data = json.loads(msg.data.decode())
await handler(data)
await msg.ack()
except Exception as e:
await msg.nak(delay=5) # Retry after 5s
هيكلية Subjects
| Subject | الغرض | تنسيق البيانات |
|---|---|---|
AURORA.alerts | اتحاد التنبيهات | Alert JSON |
AURORA.ioc_sharing | نشر مؤشرات الاختراق | IOC JSON شبيه بـ STIX |
AURORA.agent_results | مشاركة نتائج التحقيق | Result JSON |
AURORA.audit | تدقيق عبر المواقع | Audit event JSON |
لماذا NATS JetStream بدل Kafka؟
| الميزة | NATS JetStream | Kafka |
|---|---|---|
| Binary size | ~20MB | ~300MB + JVM |
| Memory | ~50MB | ~1GB minimum |
| Setup complexity | Single binary, zero config | ZooKeeper/KRaft + brokers |
| Ops overhead | Minimal | Significant |
| Throughput | Millions/sec | Millions/sec |
| Persistence | Built-in | Built-in |
في حالة استخدام AuroraSOC الخاصة بالاتحاد (من مئات إلى آلاف الرسائل في الثانية)، لا يبرَّر تعقيد Kafka.
MQTT Edge Communication
الغرض: جسر أجهزة IoT
مستهلك MQTT (الجسر)
class MQTTEdgeConsumer:
def __init__(self, mqtt_settings, redis_publisher):
self.mqtt = mqtt_settings
self.publisher = redis_publisher
async def start(self):
client = aiomqtt.Client(
hostname=self.mqtt.host,
port=self.mqtt.port,
username=self.mqtt.username,
password=self.mqtt.password,
)
async with client:
# Subscribe to all device topics
await client.subscribe("aurora/sensors/+/telemetry")
await client.subscribe("aurora/sensors/+/alerts")
await client.subscribe("aurora/sensors/+/attestation")
async for message in client.messages:
await self._handle_message(message)
async def _handle_message(self, message):
topic_parts = message.topic.value.split("/")
device_id = topic_parts[2]
message_type = topic_parts[3]
payload = json.loads(message.payload)
payload["device_id"] = device_id
payload["message_type"] = message_type
# Auto-register device if new
if not await self._device_exists(device_id):
await self._auto_register_device(device_id, payload)
# Route to appropriate Redis stream
if message_type == "alerts":
await self.publisher.publish("aurora:cps:alerts", payload)
elif message_type == "attestation":
await self._handle_attestation(device_id, payload)
else:
await self.publisher.publish("aurora:cps:telemetry", payload)
بنية Topic
aurora/sensors/{device_id}/telemetry - Sensor readings
aurora/sensors/{device_id}/alerts - Device-generated alerts
aurora/sensors/{device_id}/attestation - Firmware attestation data
aurora/command/{device_id}/action - Commands TO devices
يمثل المحرف العام + في الاشتراكات مطابقة لأي معرّف جهاز، لذلك يتم التقاط الأجهزة الجديدة تلقائيًا.
التسجيل التلقائي للأجهزة
عند ظهور معرّف جهاز جديد:
- Extract metadata from first telemetry message
- Create
CPSDevicerecord in PostgreSQL - Generate unique device UUID
- Start firmware verification flow
لماذا MQTT بدل HTTP في IoT؟
| الميزة | MQTT | HTTP |
|---|---|---|
| Header overhead | 2 bytes | ~500 bytes |
| Connection | Persistent (keep-alive) | Per-request |
| Battery impact | Minimal (no TLS renegotiation) | High |
| QoS guarantee | Built-in (0, 1, 2) | Application-level |
| Bidirectional | Native (subscribe + publish) | Requires polling/WebSocket |
| Offline queuing | QoS 1/2 with broker persistence | Not supported |
بالنسبة لجهاز nRF52840 مع ذاكرة RAM بحجم 256KB وبطارية صغيرة، فإن فارق 2 بايت في MQTT مقابل أكثر من 500 بايت في HTTP يساوي الفرق بين عمر بطارية بالأشهر أو بالأيام.