Memory System
AuroraSOC implements a three-tier memory architecture that gives AI agents contextual recall of past investigations, threat intelligence, and recent conversation history. This is implemented in aurorasoc/memory/.
Three-Tier Design
Why Three Tiers?
| Without Tiered Memory | With Tiered Memory |
|---|---|
| Agent sees only current conversation | Agent recalls similar past cases |
| Every investigation starts from scratch | Learns from organizational history |
| IOCs checked against external feeds only | Local IOC knowledge base with similarity |
| No context about past false positives | "This pattern was a false positive last month" |
Tier 1: Sliding Memory
class SlidingMemory:
"""Recent conversation history with configurable window."""
def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
self._messages: deque = deque(maxlen=max_messages)
def add(self, message: BaseMessage):
self._messages.append(message)
def get_all(self) -> List[BaseMessage]:
return list(self._messages)
- Storage: In-process Python
deque - Latency: ~0ms (memory access)
- Persistence: None (lost on restart)
- Purpose: Maintain recent conversation context for multi-turn reasoning
- Window sizes: 20 (default), 30 (hunter), 40 (responder), 50 (analyst), 100 (orchestrator)
Tier 2: Episodic Memory
Episodic memory uses pgvector — a PostgreSQL extension for vector similarity search — running inside the same PostgreSQL instance as all relational data. This eliminates the need for a separate vector database service.
class EpisodicMemoryStore:
"""pgvector-backed episodic memory for past investigation outcomes.
Stores closed cases as vectors in the ``vector_embeddings`` table
(collection = ``aurora_cases``), enabling semantic retrieval of similar
past investigations.
"""
COLLECTION = "aurora_cases"
def __init__(self, embedder: TextEmbedder | None = None) -> None:
self._embedder = embedder or TextEmbedder()
async def store_case(self, case: ClosedCase) -> None:
"""Store a closed case in episodic memory."""
embedding = await self._embedder.embed(case.summary)
payload = {
"summary": case.summary,
"techniques": case.mitre,
"iocs": case.iocs,
"cps_devices_involved": case.cps_devices,
"outcome": case.outcome,
"severity": case.severity,
"confidence": case.confidence,
"stored_at": datetime.now(UTC).isoformat(),
**case.metadata,
}
async with get_session() as session:
await session.execute(
_UPSERT_SQL,
{
"collection": self.COLLECTION,
"source_id": case.id,
"embedding": _vec_literal(embedding),
"payload": json.dumps(payload),
},
)
async def recall_similar(self, query: str, top_k: int = 5,
severity_filter: str | None = None) -> list[dict]:
"""Find similar past cases by semantic search."""
embedding = await self._embedder.embed(query)
async with get_session() as session:
if severity_filter:
rows = await session.execute(
_SEARCH_FILTERED_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"filter_json": json.dumps({"severity": severity_filter}),
"top_k": top_k,
},
)
else:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": top_k,
},
)
return [{"id": row.source_id, "score": float(row.score), **row.payload} for row in rows]
- Storage: pgvector in PostgreSQL (
vector_embeddingstable, collection =aurora_cases) - Latency: ~50ms (shared connection pool + HNSW index)
- Persistence: Full (PostgreSQL WAL + disk)
- Purpose: Recall similar past investigations when analyzing new alerts
- Embedding model:
all-MiniLM-L6-v2(384-dimensional vectors)
How Episodic Memory Works
Tier 3: Threat Intel Memory
class ThreatIntelMemory:
"""Combined pgvector + Redis threat intelligence memory.
Redis provides fast TTL-cached lookups; pgvector provides semantic
similarity search across the full IOC knowledge base.
"""
COLLECTION = "aurora_threat_intel"
REDIS_IOC_PREFIX = "aurora:ioc:"
def __init__(self, embedder: TextEmbedder | None = None) -> None:
self._embedder = embedder or TextEmbedder()
async def search_similar_iocs(self, query: str, limit: int = 20) -> list[dict]:
"""Search for related threat intelligence."""
# Check Redis cache first
r = await get_redis()
cache_key = f"threat_intel:{hashlib.md5(query.encode()).hexdigest()}"
cached = await r.get(cache_key)
if cached:
return json.loads(cached)
# Vector search in PostgreSQL via pgvector
embedding = await self._embedder.embed(query)
async with get_session() as session:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": limit,
},
)
results = [
{"score": float(row.score), **row.payload}
for row in rows
]
# Cache for 1 hour
settings = get_settings()
await r.setex(cache_key, settings.redis.ioc_ttl_seconds, json.dumps(results))
return results
- Storage: pgvector in PostgreSQL (
aurora_threat_intel) + Redis cache - Latency: ~1ms (cache hit) or ~100ms (cache miss)
- Persistence: Full (PostgreSQL) + configurable TTL cache (Redis)
- Purpose: Find IOCs and campaigns similar to current analysis context
TieredAgentMemory: The Unified Interface
class TieredAgentMemory(BaseMemory):
"""BeeAI-compatible memory combining all three tiers."""
def __init__(self, sliding, episodic, threat_intel):
self.sliding = sliding # Tier 1
self.episodic = episodic # Tier 2
self.threat_intel = threat_intel # Tier 3
async def add(self, message: BaseMessage):
"""Add to sliding window."""
self.sliding.add(message)
async def get_context(self, query: str) -> str:
"""Combine all tiers for LLM context."""
context_parts = []
# Tier 1: Recent conversation
context_parts.append(self.sliding.get_formatted())
# Tier 2: Similar past cases
if self.episodic:
cases = await self.episodic.recall(query)
if cases:
context_parts.append(format_cases(cases))
# Tier 3: Related threat intel
if self.threat_intel:
intel = await self.threat_intel.search(query)
if intel:
context_parts.append(format_intel(intel))
return "\n\n".join(context_parts)
Memory Presets
PRESETS = {
"default": {"sliding": 20, "episodic": None, "intel": None},
"analyst": {"sliding": 50, "episodic": 20, "intel": True},
"hunter": {"sliding": 30, "episodic": 30, "intel": True},
"responder": {"sliding": 40, "episodic": 10, "intel": None},
"intel": {"sliding": 20, "episodic": None, "intel": 50},
"orchestrator": {"sliding": 100,"episodic": 5, "intel": None},
"cps": {"sliding": 30, "episodic": 15, "intel": True},
}
Design Rationale
- Orchestrator needs long sliding context (100 messages) because it coordinates multi-step investigations with many back-and-forth handoffs
- Threat Intel agent needs large intel recall (50 results) but no episodic memory — it works with IOCs, not past cases
- Default agents with no episodic/intel memory save pgvector query load for agents that genuinely benefit from recall
TextEmbedder
class TextEmbedder:
"""Generate text embeddings using sentence-transformers."""
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def embed(self, text: str) -> List[float]:
return self.model.encode(text).tolist()
Why all-MiniLM-L6-v2?
- Small model (80MB) that runs on CPU
- 384-dimensional vectors (low storage cost)
- Excellent quality for semantic similarity tasks
- Processes ~10,000 sentences/second on modern CPU
- No GPU required — suitable for edge and container deployments