انتقل إلى المحتوى الرئيسي

Redis Streams

يمثل Redis Streams ناقل الأحداث الداخلي الأساسي في AuroraSOC. وقد تم تنفيذه في aurorasoc/events/redis_streams.py، ويوفر تسليمًا موثوقًا للأحداث مع دعم مجموعات المستهلكين.

بنية Stream

تعريفات Streams

Streamالغرضالمنتجونالمستهلكون
aurora:alertsتنبيهات أمنيةموحد Rust، وAPIمعالج التنبيهات، وWebSocket لوحة التحكم
aurora:cps:alertsتنبيهات CPS/IoTجسر MQTT، وRustمعالج تنبيهات CPS
aurora:agent:tasksمهام وكلاء الذكاء الاصطناعيAPI، المجدولالمنسق
aurora:agent:resultsنتائج تحليل الوكلاءالوكلاءAPI (الاستمرارية في قاعدة البيانات)
aurora:auditمسار التدقيقجميع المكوناتمسجل التدقيق

تنفيذ الناشر

class RedisStreamPublisher:
def __init__(self, redis_client):
self.redis = redis_client

async def publish(self, stream: str, data: dict):
"""Publish a message to a Redis Stream."""
message = {
"id": str(uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"data": json.dumps(data)
}

# XADD with auto-generated stream ID
msg_id = await self.redis.xadd(stream, message)

# Trim stream to prevent unbounded growth
await self.redis.xtrim(stream, maxlen=10000, approximate=True)

return msg_id

لماذا approximate=True في التقليم؟

يقوم Redis XTRIM مع maxlen الدقيق بفحص كل إدخال. ومع approximate=True ينفّذ Redis التقليم على دفعات، أي بكلفة وسطية O(1). وبالنسبة إلى Stream عالي الإنتاجية، يتجنب ذلك القفزات المفاجئة في زمن الاستجابة.

تنفيذ المستهلك

class RedisStreamConsumer:
def __init__(self, redis_client, stream: str, group: str, consumer: str):
self.redis = redis_client
self.stream = stream
self.group = group
self.consumer = consumer

async def setup(self):
"""Create consumer group if it doesn't exist."""
try:
await self.redis.xgroup_create(
self.stream, self.group, id="0", mkstream=True
)
except redis.ResponseError:
pass # Group already exists

async def consume(self, handler: Callable, batch_size: int = 10):
"""Read and process messages from the stream."""
while True:
# Read new messages for this consumer
messages = await self.redis.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"},
count=batch_size,
block=5000 # Block for 5s if no messages
)

for stream_name, entries in messages:
for msg_id, data in entries:
try:
await handler(json.loads(data["data"]))
# Acknowledge successful processing
await self.redis.xack(self.stream, self.group, msg_id)
except Exception as e:
logger.error(f"Failed to process {msg_id}: {e}")
# Message remains in PEL for redelivery

مجموعات المستهلكين

لماذا مجموعات المستهلكين؟ عند تشغيل عدة نسخ من API لتحقيق التوفر العالي، يجب معالجة كل رسالة مرة واحدة فقط. تضمن مجموعات المستهلكين أن تذهب كل رسالة إلى مستهلك واحد داخل المجموعة، وليس إلى الجميع.

ترحيل WebSocket

The FastAPI app relays Redis Stream messages to WebSocket clients:

async def stream_to_websocket():
"""Relay Redis Stream events to WebSocket clients."""
consumer = RedisStreamConsumer(
redis, stream="aurora:alerts",
group="websocket-relay", consumer="ws-1"
)

async for message in consumer:
# Broadcast to all connected WebSocket clients
await connection_manager.broadcast(json.dumps(message))

يوفّر ذلك تحديثات فورية للوحة التحكم من دون polling.

Stream vs. Pub/Sub

تمت دراسة Redis Pub/Sub لكن رُفضت للأسباب التالية:

  1. رسائل Pub/Sub من نوع fire-and-forget؛ إذا لم يكن هناك مشترك يستمع، تضيع الرسالة.
  2. لا توجد مجموعات مستهلكين؛ جميع المشتركين يستقبلون كل الرسائل (من دون توزيع حمل).
  3. لا توجد استمرارية؛ لا يمكن إعادة تشغيل الرسائل الفائتة.

تعالج Redis Streams المشكلات الثلاث كلها مع الحفاظ على زمن استجابة أقل من ميلي ثانية.