Memory System
AuroraSOC implements a three-tier memory architecture that gives AI agents contextual recall of past investigations, threat intelligence, and recent conversation history. This is implemented in aurorasoc/memory/.
Three-Tier Design
Why Three Tiers?
| Without Tiered Memory | With Tiered Memory |
|---|---|
| Agent sees only current conversation | Agent recalls similar past cases |
| Every investigation starts from scratch | Learns from organizational history |
| IOCs checked against external feeds only | Local IOC knowledge base with similarity |
| No context about past false positives | "This pattern was a false positive last month" |
Tier 1: Sliding Memory
class SlidingMemory:
"""Recent conversation history with configurable window."""
def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
self._messages: deque = deque(maxlen=max_messages)
def add(self, message: BaseMessage):
self._messages.append(message)
def get_all(self) -> List[BaseMessage]:
return list(self._messages)
- Storage: In-process Python
deque - Latency: ~0ms (memory access)
- Persistence: None (lost on restart)
- Purpose: Maintain recent conversation context for multi-turn reasoning
- Window sizes: 20 (default), 30 (hunter), 40 (responder), 50 (analyst), 100 (orchestrator)
Tier 2: Episodic Memory
class EpisodicMemoryStore:
"""Long-term case memory using vector similarity search."""
def __init__(self, qdrant_client, collection="aurora_cases"):
self.client = qdrant_client
self.collection = collection
self.embedder = TextEmbedder() # all-MiniLM-L6-v2
async def store(self, case_summary: str, metadata: dict):
"""Store a completed investigation as a memory."""
embedding = self.embedder.embed(case_summary)
await self.client.upsert(
collection_name=self.collection,
points=[PointStruct(
id=str(uuid4()),
vector=embedding,
payload={"summary": case_summary, **metadata}
)]
)
async def recall(self, query: str, limit: int = 10) -> List[dict]:
"""Find similar past cases by semantic search."""
query_embedding = self.embedder.embed(query)
results = await self.client.search(
collection_name=self.collection,
query_vector=query_embedding,
limit=limit
)
return [hit.payload for hit in results]
- Storage: Qdrant vector database (
aurora_casescollection) - Latency: ~50ms (network + ANN search)
- Persistence: Full (disk-backed in Qdrant)
- Purpose: Recall similar past investigations when analyzing new alerts
- Embedding model:
all-MiniLM-L6-v2(384-dimensional vectors)
How Episodic Memory Works
Tier 3: Threat Intel Memory
class ThreatIntelMemory:
"""IOC knowledge base with vector similarity + Redis cache."""
def __init__(self, qdrant_client, redis_client):
self.qdrant = qdrant_client
self.redis = redis_client
self.collection = "aurora_threat_intel"
self.embedder = TextEmbedder()
async def search(self, query: str, limit: int = 20) -> List[dict]:
"""Search for related threat intelligence."""
# Check Redis cache first
cache_key = f"threat_intel:{hashlib.md5(query.encode()).hexdigest()}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Vector search in Qdrant
embedding = self.embedder.embed(query)
results = await self.qdrant.search(
collection_name=self.collection,
query_vector=embedding,
limit=limit
)
# Cache for 1 hour
payload = [hit.payload for hit in results]
await self.redis.setex(cache_key, 3600, json.dumps(payload))
return payload
- Storage: Qdrant (
aurora_threat_intel) + Redis cache - Latency: ~1ms (cache hit) or ~100ms (cache miss)
- Persistence: Full (Qdrant) + 1-hour cache (Redis)
- Purpose: Find IOCs and campaigns similar to current analysis context
TieredAgentMemory: The Unified Interface
class TieredAgentMemory(BaseMemory):
"""BeeAI-compatible memory combining all three tiers."""
def __init__(self, sliding, episodic, threat_intel):
self.sliding = sliding # Tier 1
self.episodic = episodic # Tier 2
self.threat_intel = threat_intel # Tier 3
async def add(self, message: BaseMessage):
"""Add to sliding window."""
self.sliding.add(message)
async def get_context(self, query: str) -> str:
"""Combine all tiers for LLM context."""
context_parts = []
# Tier 1: Recent conversation
context_parts.append(self.sliding.get_formatted())
# Tier 2: Similar past cases
if self.episodic:
cases = await self.episodic.recall(query)
if cases:
context_parts.append(format_cases(cases))
# Tier 3: Related threat intel
if self.threat_intel:
intel = await self.threat_intel.search(query)
if intel:
context_parts.append(format_intel(intel))
return "\n\n".join(context_parts)
Memory Presets
PRESETS = {
"default": {"sliding": 20, "episodic": None, "intel": None},
"analyst": {"sliding": 50, "episodic": 20, "intel": True},
"hunter": {"sliding": 30, "episodic": 30, "intel": True},
"responder": {"sliding": 40, "episodic": 10, "intel": None},
"intel": {"sliding": 20, "episodic": None, "intel": 50},
"orchestrator": {"sliding": 100,"episodic": 5, "intel": None},
"cps": {"sliding": 30, "episodic": 15, "intel": True},
}
Design Rationale
- Orchestrator needs long sliding context (100 messages) because it coordinates multi-step investigations with many back-and-forth handoffs
- Threat Intel agent needs large intel recall (50 results) but no episodic memory — it works with IOCs, not past cases
- Default agents with no episodic/intel memory save Qdrant resources for agents that genuinely benefit from recall
TextEmbedder
class TextEmbedder:
"""Generate text embeddings using sentence-transformers."""
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def embed(self, text: str) -> List[float]:
return self.model.encode(text).tolist()
Why all-MiniLM-L6-v2?
- Small model (80MB) that runs on CPU
- 384-dimensional vectors (low storage cost)
- Excellent quality for semantic similarity tasks
- Processes ~10,000 sentences/second on modern CPU
- No GPU required — suitable for edge and container deployments