Redis Streams
يمثل Redis Streams ناقل الأحداث الداخلي الأساسي في AuroraSOC. وقد تم تنفيذه في aurorasoc/events/redis_streams.py، ويوفر تسليمًا موثوقًا للأحداث مع دعم مجموعات المستهلكين.
بنية Stream
تعريفات Streams
| Stream | الغرض | المنتجون | المستهلكون |
|---|---|---|---|
aurora:alerts | تنبيهات أمنية | موحد Rust، وAPI | معالج التنبيهات، وWebSocket لوحة التحكم |
aurora:cps:alerts | تنبيهات CPS/IoT | جسر MQTT، وRust | معالج تنبيهات CPS |
aurora:agent:tasks | مهام وكلاء الذكاء الاصطناعي | API، المجدول | المنسق |
aurora:agent:results | نتائج تحليل الوكلاء | الوكلاء | API (الاستمرارية في قاعدة البيانات) |
aurora:audit | مسار التدقيق | جميع المكونات | مسجل التدقيق |
تنفيذ الناشر
class RedisStreamPublisher:
def __init__(self, redis_client):
self.redis = redis_client
async def publish(self, stream: str, data: dict):
"""Publish a message to a Redis Stream."""
message = {
"id": str(uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"data": json.dumps(data)
}
# XADD with auto-generated stream ID
msg_id = await self.redis.xadd(stream, message)
# Trim stream to prevent unbounded growth
await self.redis.xtrim(stream, maxlen=10000, approximate=True)
return msg_id
لماذا approximate=True في التقليم؟
يقوم Redis XTRIM مع maxlen الدقيق بفحص كل إدخال. ومع approximate=True ينفّذ Redis التقليم على دفعات، أي بكلفة وسطية O(1). وبالنسبة إلى Stream عالي الإنتاجية، يتجنب ذلك القفزات المفاجئة في زمن الاستجابة.
تنفيذ المستهلك
class RedisStreamConsumer:
def __init__(self, redis_client, stream: str, group: str, consumer: str):
self.redis = redis_client
self.stream = stream
self.group = group
self.consumer = consumer
async def setup(self):
"""Create consumer group if it doesn't exist."""
try:
await self.redis.xgroup_create(
self.stream, self.group, id="0", mkstream=True
)
except redis.ResponseError:
pass # Group already exists
async def consume(self, handler: Callable, batch_size: int = 10):
"""Read and process messages from the stream."""
while True:
# Read new messages for this consumer
messages = await self.redis.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"},
count=batch_size,
block=5000 # Block for 5s if no messages
)
for stream_name, entries in messages:
for msg_id, data in entries:
try:
await handler(json.loads(data["data"]))
# Acknowledge successful processing
await self.redis.xack(self.stream, self.group, msg_id)
except Exception as e:
logger.error(f"Failed to process {msg_id}: {e}")
# Message remains in PEL for redelivery
مجموعات المستهلكين
لماذا مجموعات المستهلكين؟ عند تشغيل عدة نسخ من API لتحقيق التوفر العالي، يجب معالجة كل رسالة مرة واحدة فقط. تضمن مجموعات المستهلكين أن تذهب كل رسالة إلى مستهلك واحد داخل المجموعة، وليس إلى الجميع.
ترحيل WebSocket
The FastAPI app relays Redis Stream messages to WebSocket clients:
async def stream_to_websocket():
"""Relay Redis Stream events to WebSocket clients."""
consumer = RedisStreamConsumer(
redis, stream="aurora:alerts",
group="websocket-relay", consumer="ws-1"
)
async for message in consumer:
# Broadcast to all connected WebSocket clients
await connection_manager.broadcast(json.dumps(message))
يوفّر ذلك تحديثات فورية للوحة التحكم من دون polling.
تمت دراسة Redis Pub/Sub لكن رُفضت للأسباب التالية:
- رسائل Pub/Sub من نوع fire-and-forget؛ إذا لم يكن هناك مشترك يستمع، تضيع الرسالة.
- لا توجد مجموعات مستهلكين؛ جميع المشتركين يستقبلون كل الرسائل (من دون توزيع حمل).
- لا توجد استمرارية؛ لا يمكن إعادة تشغيل الرسائل الفائتة.
تعالج Redis Streams المشكلات الثلاث كلها مع الحفاظ على زمن استجابة أقل من ميلي ثانية.