Skip to main content

Writing Tests

This guide walks you through writing new tests for AuroraSOC, covering each component type with concrete examples and best practices.

Test File Structure

Every test file follows this template:

"""Tests for <module_name>."""

from __future__ import annotations

import pytest
# Import the module under test
from aurorasoc.module import ClassUnderTest


class TestFeatureName:
"""Group related tests."""

async def test_does_expected_thing(self):
"""Test names describe the expected behavior."""
result = ClassUnderTest().method()
assert result == expected_value

async def test_handles_edge_case(self):
"""Separate test for each edge case."""
with pytest.raises(ValueError):
ClassUnderTest().method(bad_input)

Naming Conventions

ConventionExampleWhy
File: test_<module>.pytest_auth.pypytest autodiscovery
Class: Test<Feature>TestJWTAuthGroup related tests
Function: test_<behavior>test_expired_token_raisesDescribe what's tested

Writing Database Tests

Database tests use the db_session fixture for a transactional SQLite session.

Creating and Querying Models

import pytest
from datetime import datetime, timezone
from sqlalchemy import select
from aurorasoc.core.models import AlertModel


async def test_create_alert(db_session):
"""AlertModel persists and retrieves correctly."""
# Arrange
alert = AlertModel(
title="Suspicious SSH brute force",
severity="high",
status="new",
source="network",
description="Multiple failed SSH attempts from 10.0.0.5",
iocs=["10.0.0.5"],
mitre_techniques=["T1110.001"],
created_at=datetime.now(timezone.utc),
)

# Act
db_session.add(alert)
await db_session.commit()

# Assert
result = await db_session.execute(
select(AlertModel).where(AlertModel.title == "Suspicious SSH brute force")
)
fetched = result.scalar_one()

assert fetched.id is not None # UUID was auto-generated
assert fetched.severity == "high"
assert "10.0.0.5" in fetched.iocs
assert fetched.mitre_techniques == ["T1110.001"]

Testing Relationships

from aurorasoc.core.models import CaseModel, TimelineEntryModel


async def test_case_with_timeline(db_session):
"""Case timeline entries maintain parent relationship."""
# Create parent
case = CaseModel(
title="Investigation: SSH brute force",
status="open",
severity="high",
assigned_to="analyst-1",
created_at=datetime.now(timezone.utc),
)
db_session.add(case)
await db_session.flush() # Get case.id without committing

# Create child
entry = TimelineEntryModel(
case_id=case.id,
action="case_created",
actor="system",
details="Auto-created from alert correlation",
timestamp=datetime.now(timezone.utc),
)
db_session.add(entry)
await db_session.commit()

# Verify relationship
result = await db_session.execute(
select(TimelineEntryModel).where(TimelineEntryModel.case_id == case.id)
)
fetched = result.scalar_one()
assert fetched.action == "case_created"
assert fetched.case_id == case.id

Using Pre-populated Data

async def test_query_critical_alerts(populated_db):
"""populated_db provides 12 alerts (3 per severity)."""
result = await populated_db.execute(
select(AlertModel).where(AlertModel.severity == "critical")
)
alerts = result.scalars().all()
assert len(alerts) == 3

Writing Authentication Tests

Auth tests are typically pure logic — no fixtures needed.

JWT Round-Trip

from aurorasoc.core.auth import create_token, decode_token
from aurorasoc.models.domain import Role


class TestJWTAuth:
async def test_create_and_decode_token(self):
"""Token round-trip preserves claims."""
token = create_token(sub="analyst-1", role=Role.ANALYST)
payload = decode_token(token)

assert payload.sub == "analyst-1"
assert payload.role == Role.ANALYST

async def test_invalid_token_raises_401(self):
"""Garbage tokens produce HTTP 401."""
with pytest.raises(HTTPException) as exc_info:
decode_token("not.a.real.token")
assert exc_info.value.status_code == 401

RBAC Permissions

from aurorasoc.core.auth import ROLE_PERMISSIONS, Role


class TestRBAC:
def test_admin_has_all_permissions(self):
"""Admin role includes read, write, and revoke."""
perms = ROLE_PERMISSIONS[Role.ADMIN]
assert "alerts:read" in perms
assert "cases:write" in perms
assert "cps:revoke" in perms

def test_viewer_has_limited_permissions(self):
"""Viewer role is read-only."""
perms = ROLE_PERMISSIONS[Role.VIEWER]
assert "alerts:read" in perms
assert "cases:write" not in perms

Writing Domain Model Tests

Domain models are Pydantic classes — test them without any database:

from aurorasoc.models.domain import Alert, Severity, CPSDevice, FirmwareStack


class TestAlertModel:
def test_alert_creation(self):
"""Alert domain model accepts all fields."""
alert = Alert(
id="alert-001",
title="Test alert",
severity=Severity.HIGH,
source="network",
)
assert alert.id == "alert-001"
assert alert.severity == Severity.HIGH

def test_severity_enum_values(self):
"""All severity variants are string-valued."""
for sev in Severity:
assert isinstance(sev.value, str)

Writing Service Tests with Mocks

When testing services that depend on external systems, mock the dependencies:

Mocking Redis

from unittest.mock import AsyncMock, MagicMock


async def test_rate_limiter_allows_within_limit():
"""Requests below threshold are allowed."""
# Setup mock Redis client
mock_redis = MagicMock()
mock_pipeline = MagicMock()
mock_pipeline.execute = AsyncMock(
return_value=[0, 1, 5, True] # zremrangebyscore, zadd, zcard=5, expire
)
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=False)
mock_redis.pipeline.return_value = mock_pipeline

# Inject mock
limiter = RateLimiter(max_requests=100, window_seconds=60)
limiter._client = mock_redis

# Test
allowed, remaining = await limiter.check("user-1")
assert allowed is True
assert remaining == 95 # 100 - 5

Mocking Background Tasks

from unittest.mock import patch, AsyncMock


async def test_scheduler_creates_tasks():
"""start() spawns all 4 background loops."""
scheduler = BackgroundScheduler()

with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
with patch.object(scheduler, "_expired_approval_loop", new=AsyncMock()):
with patch.object(scheduler, "_metrics_collector_loop", new=AsyncMock()):
await scheduler.start()

assert scheduler._running is True
assert len(scheduler._tasks) == 4
await scheduler.stop()

Writing Memory System Tests

Memory tests validate the tiered memory configuration and operations:

from aurorasoc.memory.tiered import (
create_tiered_memory,
ANALYST_MEMORY,
HUNTER_MEMORY,
)


class TestTieredMemory:
def test_config_presets(self):
"""Each preset has expected sliding window size."""
assert ANALYST_MEMORY.sliding_window == 30
assert HUNTER_MEMORY.sliding_window == 40

def test_factory_sets_agent_name(self):
"""create_tiered_memory stamps the agent name."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
assert mem._config.agent_name == "TestAgent"
assert mem._config.sliding_window == 30

async def test_add_and_retrieve(self):
"""Messages round-trip through add/messages."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
msg = MagicMock()
msg.text = "Suspicious activity detected"
mem.add(msg)
assert len(mem.messages) == 1

async def test_disabled_features_return_empty(self):
"""Disabled features return empty rather than error."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
# Analyst preset has episodic/threat-intel disabled
assert await mem.recall_similar("query") == []
assert await mem.search_threat_intel("ioc") == []
assert await mem.enrich_ioc("1.2.3.4") == {}

Writing Circuit Breaker Tests

The circuit breaker is a pure state machine — ideal for exhaustive testing:

from aurorasoc.agents.factory import CircuitBreaker, CircuitState


class TestCircuitBreaker:
def test_initial_state_is_closed(self):
"""New circuit breakers start CLOSED (allowing requests)."""
cb = CircuitBreaker(threshold=3, timeout=30)
assert cb.state == CircuitState.CLOSED

def test_opens_after_threshold_failures(self):
"""Circuit opens after N consecutive failures."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_failure()
assert cb.state == CircuitState.OPEN

def test_success_resets_failures(self):
"""A success clears the failure counter."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_success() # Resets!
assert cb.state == CircuitState.CLOSED
assert cb.allow_request() is True

Test Best Practices

Do

  • One assertion concept per test — A test can have multiple assert statements, but they should all validate the same behavior
  • Use descriptive namestest_expired_token_raises_401 not test_token_3
  • Test edge cases separately — Empty inputs, None values, boundary conditions each get their own test
  • Use fixtures for shared setup — Don't repeat database initialization in every test
  • Keep tests fast — If a test takes >1 second, something is wrong

Don't

  • Don't test external libraries — Don't test that SQLAlchemy can do a SELECT; test your query logic
  • Don't use time.sleep() — Use AsyncMock and control flow explicitly
  • Don't share state between tests — Each test must work in isolation
  • Don't catch exceptions in tests — Let them propagate; use pytest.raises() for expected errors

Async Test Checklist

# ✅ Correct - pytest-asyncio handles it (asyncio_mode = auto)
async def test_something():
result = await async_function()
assert result is not None

# ❌ Wrong - don't manually run the loop
def test_something():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_function()) # Don't do this

Adding a New Test File

  1. Create tests/test_<module>.py
  2. Import the module under test
  3. Add fixtures to conftest.py if needed (shared across tests)
  4. Write tests following the patterns above
  5. Run: pytest tests/test_<module>.py -v
  6. Check coverage: pytest tests/test_<module>.py --cov=aurorasoc.<module>