Writing Tests
This guide walks you through writing new tests for AuroraSOC, covering each component type with concrete examples and best practices.
Test File Structure
Every test file follows this template:
"""Tests for <module_name>."""
from __future__ import annotations
import pytest
# Import the module under test
from aurorasoc.module import ClassUnderTest
class TestFeatureName:
"""Group related tests."""
async def test_does_expected_thing(self):
"""Test names describe the expected behavior."""
result = ClassUnderTest().method()
assert result == expected_value
async def test_handles_edge_case(self):
"""Separate test for each edge case."""
with pytest.raises(ValueError):
ClassUnderTest().method(bad_input)
Naming Conventions
| Convention | Example | Why |
|---|---|---|
File: test_<module>.py | test_auth.py | pytest autodiscovery |
Class: Test<Feature> | TestJWTAuth | Group related tests |
Function: test_<behavior> | test_expired_token_raises | Describe what's tested |
Writing Database Tests
Database tests use the db_session fixture for a transactional SQLite session.
Creating and Querying Models
import pytest
from datetime import datetime, timezone
from sqlalchemy import select
from aurorasoc.core.models import AlertModel
async def test_create_alert(db_session):
"""AlertModel persists and retrieves correctly."""
# Arrange
alert = AlertModel(
title="Suspicious SSH brute force",
severity="high",
status="new",
source="network",
description="Multiple failed SSH attempts from 10.0.0.5",
iocs=["10.0.0.5"],
mitre_techniques=["T1110.001"],
created_at=datetime.now(timezone.utc),
)
# Act
db_session.add(alert)
await db_session.commit()
# Assert
result = await db_session.execute(
select(AlertModel).where(AlertModel.title == "Suspicious SSH brute force")
)
fetched = result.scalar_one()
assert fetched.id is not None # UUID was auto-generated
assert fetched.severity == "high"
assert "10.0.0.5" in fetched.iocs
assert fetched.mitre_techniques == ["T1110.001"]
Testing Relationships
from aurorasoc.core.models import CaseModel, TimelineEntryModel
async def test_case_with_timeline(db_session):
"""Case timeline entries maintain parent relationship."""
# Create parent
case = CaseModel(
title="Investigation: SSH brute force",
status="open",
severity="high",
assigned_to="analyst-1",
created_at=datetime.now(timezone.utc),
)
db_session.add(case)
await db_session.flush() # Get case.id without committing
# Create child
entry = TimelineEntryModel(
case_id=case.id,
action="case_created",
actor="system",
details="Auto-created from alert correlation",
timestamp=datetime.now(timezone.utc),
)
db_session.add(entry)
await db_session.commit()
# Verify relationship
result = await db_session.execute(
select(TimelineEntryModel).where(TimelineEntryModel.case_id == case.id)
)
fetched = result.scalar_one()
assert fetched.action == "case_created"
assert fetched.case_id == case.id
Using Pre-populated Data
async def test_query_critical_alerts(populated_db):
"""populated_db provides 12 alerts (3 per severity)."""
result = await populated_db.execute(
select(AlertModel).where(AlertModel.severity == "critical")
)
alerts = result.scalars().all()
assert len(alerts) == 3
Writing Authentication Tests
Auth tests are typically pure logic — no fixtures needed.
JWT Round-Trip
from aurorasoc.core.auth import create_token, decode_token
from aurorasoc.models.domain import Role
class TestJWTAuth:
async def test_create_and_decode_token(self):
"""Token round-trip preserves claims."""
token = create_token(sub="analyst-1", role=Role.ANALYST)
payload = decode_token(token)
assert payload.sub == "analyst-1"
assert payload.role == Role.ANALYST
async def test_invalid_token_raises_401(self):
"""Garbage tokens produce HTTP 401."""
with pytest.raises(HTTPException) as exc_info:
decode_token("not.a.real.token")
assert exc_info.value.status_code == 401
RBAC Permissions
from aurorasoc.core.auth import ROLE_PERMISSIONS, Role
class TestRBAC:
def test_admin_has_all_permissions(self):
"""Admin role includes read, write, and revoke."""
perms = ROLE_PERMISSIONS[Role.ADMIN]
assert "alerts:read" in perms
assert "cases:write" in perms
assert "cps:revoke" in perms
def test_viewer_has_limited_permissions(self):
"""Viewer role is read-only."""
perms = ROLE_PERMISSIONS[Role.VIEWER]
assert "alerts:read" in perms
assert "cases:write" not in perms
Writing Domain Model Tests
Domain models are Pydantic classes — test them without any database:
from aurorasoc.models.domain import Alert, Severity, CPSDevice, FirmwareStack
class TestAlertModel:
def test_alert_creation(self):
"""Alert domain model accepts all fields."""
alert = Alert(
id="alert-001",
title="Test alert",
severity=Severity.HIGH,
source="network",
)
assert alert.id == "alert-001"
assert alert.severity == Severity.HIGH
def test_severity_enum_values(self):
"""All severity variants are string-valued."""
for sev in Severity:
assert isinstance(sev.value, str)
Writing Service Tests with Mocks
When testing services that depend on external systems, mock the dependencies:
Mocking Redis
from unittest.mock import AsyncMock, MagicMock
async def test_rate_limiter_allows_within_limit():
"""Requests below threshold are allowed."""
# Setup mock Redis client
mock_redis = MagicMock()
mock_pipeline = MagicMock()
mock_pipeline.execute = AsyncMock(
return_value=[0, 1, 5, True] # zremrangebyscore, zadd, zcard=5, expire
)
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=False)
mock_redis.pipeline.return_value = mock_pipeline
# Inject mock
limiter = RateLimiter(max_requests=100, window_seconds=60)
limiter._client = mock_redis
# Test
allowed, remaining = await limiter.check("user-1")
assert allowed is True
assert remaining == 95 # 100 - 5
Mocking Background Tasks
from unittest.mock import patch, AsyncMock
async def test_scheduler_creates_tasks():
"""start() spawns all 4 background loops."""
scheduler = BackgroundScheduler()
with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
with patch.object(scheduler, "_expired_approval_loop", new=AsyncMock()):
with patch.object(scheduler, "_metrics_collector_loop", new=AsyncMock()):
await scheduler.start()
assert scheduler._running is True
assert len(scheduler._tasks) == 4
await scheduler.stop()
Writing Memory System Tests
Memory tests validate the tiered memory configuration and operations:
from aurorasoc.memory.tiered import (
create_tiered_memory,
ANALYST_MEMORY,
HUNTER_MEMORY,
)
class TestTieredMemory:
def test_config_presets(self):
"""Each preset has expected sliding window size."""
assert ANALYST_MEMORY.sliding_window == 30
assert HUNTER_MEMORY.sliding_window == 40
def test_factory_sets_agent_name(self):
"""create_tiered_memory stamps the agent name."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
assert mem._config.agent_name == "TestAgent"
assert mem._config.sliding_window == 30
async def test_add_and_retrieve(self):
"""Messages round-trip through add/messages."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
msg = MagicMock()
msg.text = "Suspicious activity detected"
mem.add(msg)
assert len(mem.messages) == 1
async def test_disabled_features_return_empty(self):
"""Disabled features return empty rather than error."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
# Analyst preset has episodic/threat-intel disabled
assert await mem.recall_similar("query") == []
assert await mem.search_threat_intel("ioc") == []
assert await mem.enrich_ioc("1.2.3.4") == {}
Writing Circuit Breaker Tests
The circuit breaker is a pure state machine — ideal for exhaustive testing:
from aurorasoc.agents.factory import CircuitBreaker, CircuitState
class TestCircuitBreaker:
def test_initial_state_is_closed(self):
"""New circuit breakers start CLOSED (allowing requests)."""
cb = CircuitBreaker(threshold=3, timeout=30)
assert cb.state == CircuitState.CLOSED
def test_opens_after_threshold_failures(self):
"""Circuit opens after N consecutive failures."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_failure()
assert cb.state == CircuitState.OPEN
def test_success_resets_failures(self):
"""A success clears the failure counter."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_success() # Resets!
assert cb.state == CircuitState.CLOSED
assert cb.allow_request() is True
Test Best Practices
Do
- One assertion concept per test — A test can have multiple
assertstatements, but they should all validate the same behavior - Use descriptive names —
test_expired_token_raises_401nottest_token_3 - Test edge cases separately — Empty inputs, None values, boundary conditions each get their own test
- Use fixtures for shared setup — Don't repeat database initialization in every test
- Keep tests fast — If a test takes >1 second, something is wrong
Don't
- Don't test external libraries — Don't test that SQLAlchemy can do a SELECT; test your query logic
- Don't use
time.sleep()— UseAsyncMockand control flow explicitly - Don't share state between tests — Each test must work in isolation
- Don't catch exceptions in tests — Let them propagate; use
pytest.raises()for expected errors
Async Test Checklist
# ✅ Correct - pytest-asyncio handles it (asyncio_mode = auto)
async def test_something():
result = await async_function()
assert result is not None
# ❌ Wrong - don't manually run the loop
def test_something():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_function()) # Don't do this
Adding a New Test File
- Create
tests/test_<module>.py - Import the module under test
- Add fixtures to
conftest.pyif needed (shared across tests) - Write tests following the patterns above
- Run:
pytest tests/test_<module>.py -v - Check coverage:
pytest tests/test_<module>.py --cov=aurorasoc.<module>