انتقل إلى المحتوى الرئيسي

Redis Streams

Redis Streams is the primary internal event bus for AuroraSOC. Implemented in aurorasoc/events/redis_streams.py, it provides reliable event delivery with consumer group support.

Stream Architecture

Stream Definitions

StreamPurposeProducersConsumers
aurora:alertsSecurity alertsRust normalizer, APIAlert processor, Dashboard WebSocket
aurora:cps:alertsCPS/IoT alertsMQTT bridge, RustCPS alert handler
aurora:agent:tasksTasks for AI agentsAPI, SchedulerOrchestrator
aurora:agent:resultsAgent analysis resultsAgentsAPI (DB persistence)
aurora:auditAudit trailAll componentsAudit logger

Publisher Implementation

class RedisStreamPublisher:
def __init__(self, redis_client):
self.redis = redis_client

async def publish(self, stream: str, data: dict):
"""Publish a message to a Redis Stream."""
message = {
"id": str(uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"data": json.dumps(data)
}

# XADD with auto-generated stream ID
msg_id = await self.redis.xadd(stream, message)

# Trim stream to prevent unbounded growth
await self.redis.xtrim(stream, maxlen=10000, approximate=True)

return msg_id

Why approximate=True for trimming?

Redis XTRIM with exact maxlen scans every entry. With approximate=True, Redis trims in chunks, which is O(1) amortized. For a high-throughput stream, this avoids latency spikes.

Consumer Implementation

class RedisStreamConsumer:
def __init__(self, redis_client, stream: str, group: str, consumer: str):
self.redis = redis_client
self.stream = stream
self.group = group
self.consumer = consumer

async def setup(self):
"""Create consumer group if it doesn't exist."""
try:
await self.redis.xgroup_create(
self.stream, self.group, id="0", mkstream=True
)
except redis.ResponseError:
pass # Group already exists

async def consume(self, handler: Callable, batch_size: int = 10):
"""Read and process messages from the stream."""
while True:
# Read new messages for this consumer
messages = await self.redis.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"},
count=batch_size,
block=5000 # Block for 5s if no messages
)

for stream_name, entries in messages:
for msg_id, data in entries:
try:
await handler(json.loads(data["data"]))
# Acknowledge successful processing
await self.redis.xack(self.stream, self.group, msg_id)
except Exception as e:
logger.error(f"Failed to process {msg_id}: {e}")
# Message remains in PEL for redelivery

Consumer Groups

Why consumer groups? When running multiple API instances for high availability, each message should be processed exactly once. Consumer groups ensure each message goes to one consumer in the group, not all of them.

WebSocket Relay

The FastAPI app relays Redis Stream messages to WebSocket clients:

async def stream_to_websocket():
"""Relay Redis Stream events to WebSocket clients."""
consumer = RedisStreamConsumer(
redis, stream="aurora:alerts",
group="websocket-relay", consumer="ws-1"
)

async for message in consumer:
# Broadcast to all connected WebSocket clients
await connection_manager.broadcast(json.dumps(message))

This provides real-time dashboard updates without polling.

Stream vs. Pub/Sub

Redis Pub/Sub was considered but rejected because:

  1. Pub/Sub messages are fire-and-forget — if no subscriber is listening, the message is lost
  2. No consumer groups — all subscribers receive all messages (no load distribution)
  3. No persistence — cannot replay missed messages

Redis Streams solve all three problems while maintaining sub-millisecond latency.