Redis Streams
Redis Streams is the primary internal event bus for AuroraSOC. Implemented in aurorasoc/events/redis_streams.py, it provides reliable event delivery with consumer group support.
Stream Architecture
Stream Definitions
| Stream | Purpose | Producers | Consumers |
|---|---|---|---|
aurora:alerts | Security alerts | Rust normalizer, API | Alert processor, Dashboard WebSocket |
aurora:cps:alerts | CPS/IoT alerts | MQTT bridge, Rust | CPS alert handler |
aurora:agent:tasks | Tasks for AI agents | API, Scheduler | Orchestrator |
aurora:agent:results | Agent analysis results | Agents | API (DB persistence) |
aurora:audit | Audit trail | All components | Audit logger |
Publisher Implementation
class RedisStreamPublisher:
def __init__(self, redis_client):
self.redis = redis_client
async def publish(self, stream: str, data: dict):
"""Publish a message to a Redis Stream."""
message = {
"id": str(uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"data": json.dumps(data)
}
# XADD with auto-generated stream ID
msg_id = await self.redis.xadd(stream, message)
# Trim stream to prevent unbounded growth
await self.redis.xtrim(stream, maxlen=10000, approximate=True)
return msg_id
Why approximate=True for trimming?
Redis XTRIM with exact maxlen scans every entry. With approximate=True, Redis trims in chunks, which is O(1) amortized. For a high-throughput stream, this avoids latency spikes.
Consumer Implementation
class RedisStreamConsumer:
def __init__(self, redis_client, stream: str, group: str, consumer: str):
self.redis = redis_client
self.stream = stream
self.group = group
self.consumer = consumer
async def setup(self):
"""Create consumer group if it doesn't exist."""
try:
await self.redis.xgroup_create(
self.stream, self.group, id="0", mkstream=True
)
except redis.ResponseError:
pass # Group already exists
async def consume(self, handler: Callable, batch_size: int = 10):
"""Read and process messages from the stream."""
while True:
# Read new messages for this consumer
messages = await self.redis.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"},
count=batch_size,
block=5000 # Block for 5s if no messages
)
for stream_name, entries in messages:
for msg_id, data in entries:
try:
await handler(json.loads(data["data"]))
# Acknowledge successful processing
await self.redis.xack(self.stream, self.group, msg_id)
except Exception as e:
logger.error(f"Failed to process {msg_id}: {e}")
# Message remains in PEL for redelivery
Consumer Groups
Why consumer groups? When running multiple API instances for high availability, each message should be processed exactly once. Consumer groups ensure each message goes to one consumer in the group, not all of them.
WebSocket Relay
The FastAPI app relays Redis Stream messages to WebSocket clients:
async def stream_to_websocket():
"""Relay Redis Stream events to WebSocket clients."""
consumer = RedisStreamConsumer(
redis, stream="aurora:alerts",
group="websocket-relay", consumer="ws-1"
)
async for message in consumer:
# Broadcast to all connected WebSocket clients
await connection_manager.broadcast(json.dumps(message))
This provides real-time dashboard updates without polling.
Redis Pub/Sub was considered but rejected because:
- Pub/Sub messages are fire-and-forget — if no subscriber is listening, the message is lost
- No consumer groups — all subscribers receive all messages (no load distribution)
- No persistence — cannot replay missed messages
Redis Streams solve all three problems while maintaining sub-millisecond latency.