Skip to main content

Memory System

AuroraSOC implements a three-tier memory architecture that gives AI agents contextual recall of past investigations, threat intelligence, and recent conversation history. This is implemented in aurorasoc/memory/.

Three-Tier Design

Why Three Tiers?

Without Tiered MemoryWith Tiered Memory
Agent sees only current conversationAgent recalls similar past cases
Every investigation starts from scratchLearns from organizational history
IOCs checked against external feeds onlyLocal IOC knowledge base with similarity
No context about past false positives"This pattern was a false positive last month"

Tier 1: Sliding Memory

class SlidingMemory:
"""Recent conversation history with configurable window."""

def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
self._messages: deque = deque(maxlen=max_messages)

def add(self, message: BaseMessage):
self._messages.append(message)

def get_all(self) -> List[BaseMessage]:
return list(self._messages)
  • Storage: In-process Python deque
  • Latency: ~0ms (memory access)
  • Persistence: None (lost on restart)
  • Purpose: Maintain recent conversation context for multi-turn reasoning
  • Window sizes: 20 (default), 30 (hunter), 40 (responder), 50 (analyst), 100 (orchestrator)

Tier 2: Episodic Memory

Episodic memory uses pgvector — a PostgreSQL extension for vector similarity search — running inside the same PostgreSQL instance as all relational data. This eliminates the need for a separate vector database service.

class EpisodicMemoryStore:
"""pgvector-backed episodic memory for past investigation outcomes.

Stores closed cases as vectors in the ``vector_embeddings`` table
(collection = ``aurora_cases``), enabling semantic retrieval of similar
past investigations.
"""
COLLECTION = "aurora_cases"

def __init__(self, embedder: TextEmbedder | None = None) -> None:
self._embedder = embedder or TextEmbedder()

async def store_case(self, case: ClosedCase) -> None:
"""Store a closed case in episodic memory."""
embedding = await self._embedder.embed(case.summary)
payload = {
"summary": case.summary,
"techniques": case.mitre,
"iocs": case.iocs,
"cps_devices_involved": case.cps_devices,
"outcome": case.outcome,
"severity": case.severity,
"confidence": case.confidence,
"stored_at": datetime.now(UTC).isoformat(),
**case.metadata,
}
async with get_session() as session:
await session.execute(
_UPSERT_SQL,
{
"collection": self.COLLECTION,
"source_id": case.id,
"embedding": _vec_literal(embedding),
"payload": json.dumps(payload),
},
)

async def recall_similar(self, query: str, top_k: int = 5,
severity_filter: str | None = None) -> list[dict]:
"""Find similar past cases by semantic search."""
embedding = await self._embedder.embed(query)
async with get_session() as session:
if severity_filter:
rows = await session.execute(
_SEARCH_FILTERED_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"filter_json": json.dumps({"severity": severity_filter}),
"top_k": top_k,
},
)
else:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": top_k,
},
)
return [{"id": row.source_id, "score": float(row.score), **row.payload} for row in rows]
  • Storage: pgvector in PostgreSQL (vector_embeddings table, collection = aurora_cases)
  • Latency: ~50ms (shared connection pool + HNSW index)
  • Persistence: Full (PostgreSQL WAL + disk)
  • Purpose: Recall similar past investigations when analyzing new alerts
  • Embedding model: all-MiniLM-L6-v2 (384-dimensional vectors)

How Episodic Memory Works

Tier 3: Threat Intel Memory

class ThreatIntelMemory:
"""Combined pgvector + Redis threat intelligence memory.

Redis provides fast TTL-cached lookups; pgvector provides semantic
similarity search across the full IOC knowledge base.
"""
COLLECTION = "aurora_threat_intel"
REDIS_IOC_PREFIX = "aurora:ioc:"

def __init__(self, embedder: TextEmbedder | None = None) -> None:
self._embedder = embedder or TextEmbedder()

async def search_similar_iocs(self, query: str, limit: int = 20) -> list[dict]:
"""Search for related threat intelligence."""
# Check Redis cache first
r = await get_redis()
cache_key = f"threat_intel:{hashlib.md5(query.encode()).hexdigest()}"
cached = await r.get(cache_key)
if cached:
return json.loads(cached)

# Vector search in PostgreSQL via pgvector
embedding = await self._embedder.embed(query)
async with get_session() as session:
rows = await session.execute(
_SEARCH_SQL,
{
"collection": self.COLLECTION,
"query_vec": _vec_literal(embedding),
"top_k": limit,
},
)
results = [
{"score": float(row.score), **row.payload}
for row in rows
]

# Cache for 1 hour
settings = get_settings()
await r.setex(cache_key, settings.redis.ioc_ttl_seconds, json.dumps(results))
return results
  • Storage: pgvector in PostgreSQL (aurora_threat_intel) + Redis cache
  • Latency: ~1ms (cache hit) or ~100ms (cache miss)
  • Persistence: Full (PostgreSQL) + configurable TTL cache (Redis)
  • Purpose: Find IOCs and campaigns similar to current analysis context

TieredAgentMemory: The Unified Interface

class TieredAgentMemory(BaseMemory):
"""BeeAI-compatible memory combining all three tiers."""

def __init__(self, sliding, episodic, threat_intel):
self.sliding = sliding # Tier 1
self.episodic = episodic # Tier 2
self.threat_intel = threat_intel # Tier 3

async def add(self, message: BaseMessage):
"""Add to sliding window."""
self.sliding.add(message)

async def get_context(self, query: str) -> str:
"""Combine all tiers for LLM context."""
context_parts = []

# Tier 1: Recent conversation
context_parts.append(self.sliding.get_formatted())

# Tier 2: Similar past cases
if self.episodic:
cases = await self.episodic.recall(query)
if cases:
context_parts.append(format_cases(cases))

# Tier 3: Related threat intel
if self.threat_intel:
intel = await self.threat_intel.search(query)
if intel:
context_parts.append(format_intel(intel))

return "\n\n".join(context_parts)

Memory Presets

PRESETS = {
"default": {"sliding": 20, "episodic": None, "intel": None},
"analyst": {"sliding": 50, "episodic": 20, "intel": True},
"hunter": {"sliding": 30, "episodic": 30, "intel": True},
"responder": {"sliding": 40, "episodic": 10, "intel": None},
"intel": {"sliding": 20, "episodic": None, "intel": 50},
"orchestrator": {"sliding": 100,"episodic": 5, "intel": None},
"cps": {"sliding": 30, "episodic": 15, "intel": True},
}
Design Rationale
  • Orchestrator needs long sliding context (100 messages) because it coordinates multi-step investigations with many back-and-forth handoffs
  • Threat Intel agent needs large intel recall (50 results) but no episodic memory — it works with IOCs, not past cases
  • Default agents with no episodic/intel memory save pgvector query load for agents that genuinely benefit from recall

TextEmbedder

class TextEmbedder:
"""Generate text embeddings using sentence-transformers."""

def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)

def embed(self, text: str) -> List[float]:
return self.model.encode(text).tolist()

Why all-MiniLM-L6-v2?

  • Small model (80MB) that runs on CPU
  • 384-dimensional vectors (low storage cost)
  • Excellent quality for semantic similarity tasks
  • Processes ~10,000 sentences/second on modern CPU
  • No GPU required — suitable for edge and container deployments