Skip to main content

Memory System

AuroraSOC implements a three-tier memory architecture that gives AI agents contextual recall of past investigations, threat intelligence, and recent conversation history. This is implemented in aurorasoc/memory/.

Three-Tier Design

Why Three Tiers?

Without Tiered MemoryWith Tiered Memory
Agent sees only current conversationAgent recalls similar past cases
Every investigation starts from scratchLearns from organizational history
IOCs checked against external feeds onlyLocal IOC knowledge base with similarity
No context about past false positives"This pattern was a false positive last month"

Tier 1: Sliding Memory

class SlidingMemory:
"""Recent conversation history with configurable window."""

def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
self._messages: deque = deque(maxlen=max_messages)

def add(self, message: BaseMessage):
self._messages.append(message)

def get_all(self) -> List[BaseMessage]:
return list(self._messages)
  • Storage: In-process Python deque
  • Latency: ~0ms (memory access)
  • Persistence: None (lost on restart)
  • Purpose: Maintain recent conversation context for multi-turn reasoning
  • Window sizes: 20 (default), 30 (hunter), 40 (responder), 50 (analyst), 100 (orchestrator)

Tier 2: Episodic Memory

class EpisodicMemoryStore:
"""Long-term case memory using vector similarity search."""

def __init__(self, qdrant_client, collection="aurora_cases"):
self.client = qdrant_client
self.collection = collection
self.embedder = TextEmbedder() # all-MiniLM-L6-v2

async def store(self, case_summary: str, metadata: dict):
"""Store a completed investigation as a memory."""
embedding = self.embedder.embed(case_summary)
await self.client.upsert(
collection_name=self.collection,
points=[PointStruct(
id=str(uuid4()),
vector=embedding,
payload={"summary": case_summary, **metadata}
)]
)

async def recall(self, query: str, limit: int = 10) -> List[dict]:
"""Find similar past cases by semantic search."""
query_embedding = self.embedder.embed(query)
results = await self.client.search(
collection_name=self.collection,
query_vector=query_embedding,
limit=limit
)
return [hit.payload for hit in results]
  • Storage: Qdrant vector database (aurora_cases collection)
  • Latency: ~50ms (network + ANN search)
  • Persistence: Full (disk-backed in Qdrant)
  • Purpose: Recall similar past investigations when analyzing new alerts
  • Embedding model: all-MiniLM-L6-v2 (384-dimensional vectors)

How Episodic Memory Works

Tier 3: Threat Intel Memory

class ThreatIntelMemory:
"""IOC knowledge base with vector similarity + Redis cache."""

def __init__(self, qdrant_client, redis_client):
self.qdrant = qdrant_client
self.redis = redis_client
self.collection = "aurora_threat_intel"
self.embedder = TextEmbedder()

async def search(self, query: str, limit: int = 20) -> List[dict]:
"""Search for related threat intelligence."""
# Check Redis cache first
cache_key = f"threat_intel:{hashlib.md5(query.encode()).hexdigest()}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)

# Vector search in Qdrant
embedding = self.embedder.embed(query)
results = await self.qdrant.search(
collection_name=self.collection,
query_vector=embedding,
limit=limit
)

# Cache for 1 hour
payload = [hit.payload for hit in results]
await self.redis.setex(cache_key, 3600, json.dumps(payload))

return payload
  • Storage: Qdrant (aurora_threat_intel) + Redis cache
  • Latency: ~1ms (cache hit) or ~100ms (cache miss)
  • Persistence: Full (Qdrant) + 1-hour cache (Redis)
  • Purpose: Find IOCs and campaigns similar to current analysis context

TieredAgentMemory: The Unified Interface

class TieredAgentMemory(BaseMemory):
"""BeeAI-compatible memory combining all three tiers."""

def __init__(self, sliding, episodic, threat_intel):
self.sliding = sliding # Tier 1
self.episodic = episodic # Tier 2
self.threat_intel = threat_intel # Tier 3

async def add(self, message: BaseMessage):
"""Add to sliding window."""
self.sliding.add(message)

async def get_context(self, query: str) -> str:
"""Combine all tiers for LLM context."""
context_parts = []

# Tier 1: Recent conversation
context_parts.append(self.sliding.get_formatted())

# Tier 2: Similar past cases
if self.episodic:
cases = await self.episodic.recall(query)
if cases:
context_parts.append(format_cases(cases))

# Tier 3: Related threat intel
if self.threat_intel:
intel = await self.threat_intel.search(query)
if intel:
context_parts.append(format_intel(intel))

return "\n\n".join(context_parts)

Memory Presets

PRESETS = {
"default": {"sliding": 20, "episodic": None, "intel": None},
"analyst": {"sliding": 50, "episodic": 20, "intel": True},
"hunter": {"sliding": 30, "episodic": 30, "intel": True},
"responder": {"sliding": 40, "episodic": 10, "intel": None},
"intel": {"sliding": 20, "episodic": None, "intel": 50},
"orchestrator": {"sliding": 100,"episodic": 5, "intel": None},
"cps": {"sliding": 30, "episodic": 15, "intel": True},
}
Design Rationale
  • Orchestrator needs long sliding context (100 messages) because it coordinates multi-step investigations with many back-and-forth handoffs
  • Threat Intel agent needs large intel recall (50 results) but no episodic memory — it works with IOCs, not past cases
  • Default agents with no episodic/intel memory save Qdrant resources for agents that genuinely benefit from recall

TextEmbedder

class TextEmbedder:
"""Generate text embeddings using sentence-transformers."""

def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)

def embed(self, text: str) -> List[float]:
return self.model.encode(text).tolist()

Why all-MiniLM-L6-v2?

  • Small model (80MB) that runs on CPU
  • 384-dimensional vectors (low storage cost)
  • Excellent quality for semantic similarity tasks
  • Processes ~10,000 sentences/second on modern CPU
  • No GPU required — suitable for edge and container deployments