Writing Tests
This guide walks you through writing new tests for AuroraSOC, covering each component type with concrete examples and best practices.
Test File Structure
Every test file follows this template:
"""Tests for <module_name>."""
from __future__ import annotations
import pytest
# Import the module under test
from aurorasoc.module import ClassUnderTest
class TestFeatureName:
"""Group related tests."""
async def test_does_expected_thing(self):
"""Test names describe the expected behavior."""
result = ClassUnderTest().method()
assert result == expected_value
async def test_handles_edge_case(self):
"""Separate test for each edge case."""
with pytest.raises(ValueError):
ClassUnderTest().method(bad_input)
Naming Conventions
| Convention | Example | Why |
|---|---|---|
File: test_<module>.py | test_auth.py | pytest autodiscovery |
Class: Test<Feature> | TestJWTAuth | Group related tests |
Function: test_<behavior> | test_expired_token_raises | Describe what's tested |
Writing Database Tests
Database tests use the db_session fixture for a transactional SQLite session.
Creating and Querying Models
import pytest
from datetime import datetime, timezone
from sqlalchemy import select
from aurorasoc.core.models import AlertModel
async def test_create_alert(db_session):
"""AlertModel persists and retrieves correctly."""
# Arrange
alert = AlertModel(
title="Suspicious SSH brute force",
severity="high",
status="new",
source="network",
description="Multiple failed SSH attempts from 10.0.0.5",
iocs=["10.0.0.5"],
mitre_techniques=["T1110.001"],
created_at=datetime.now(timezone.utc),
)
# Act
db_session.add(alert)
await db_session.commit()
# Assert
result = await db_session.execute(
select(AlertModel).where(AlertModel.title == "Suspicious SSH brute force")
)
fetched = result.scalar_one()
assert fetched.id is not None # UUID was auto-generated
assert fetched.severity == "high"
assert "10.0.0.5" in fetched.iocs
assert fetched.mitre_techniques == ["T1110.001"]
Testing Relationships
from aurorasoc.core.models import CaseModel, TimelineEntryModel
async def test_case_with_timeline(db_session):
"""Case timeline entries maintain parent relationship."""
# Create parent
case = CaseModel(
title="Investigation: SSH brute force",
status="open",
severity="high",
assigned_to="analyst-1",
created_at=datetime.now(timezone.utc),
)
db_session.add(case)
await db_session.flush() # Get case.id without committing
# Create child
entry = TimelineEntryModel(
case_id=case.id,
action="case_created",
actor="system",
details="Auto-created from alert correlation",
timestamp=datetime.now(timezone.utc),
)
db_session.add(entry)
await db_session.commit()
# Verify relationship
result = await db_session.execute(
select(TimelineEntryModel).where(TimelineEntryModel.case_id == case.id)
)
fetched = result.scalar_one()
assert fetched.action == "case_created"
assert fetched.case_id == case.id
Using Pre-populated Data
async def test_query_critical_alerts(populated_db):
"""populated_db provides 12 alerts (3 per severity)."""
result = await populated_db.execute(
select(AlertModel).where(AlertModel.severity == "critical")
)
alerts = result.scalars().all()
assert len(alerts) == 3
Writing Authentication Tests
Auth tests are typically pure logic — no fixtures needed.
JWT Round-Trip
from aurorasoc.core.auth import create_token, decode_token
from aurorasoc.models.domain import Role
class TestJWTAuth:
async def test_create_and_decode_token(self):
"""Token round-trip preserves claims."""
token = create_token(sub="analyst-1", role=Role.ANALYST)
payload = decode_token(token)
assert payload.sub == "analyst-1"
assert payload.role == Role.ANALYST
async def test_invalid_token_raises_401(self):
"""Garbage tokens produce HTTP 401."""
with pytest.raises(HTTPException) as exc_info:
decode_token("not.a.real.token")
assert exc_info.value.status_code == 401
RBAC Permissions
from aurorasoc.core.auth import ROLE_PERMISSIONS, Role
class TestRBAC:
def test_admin_has_all_permissions(self):
"""Admin role includes read, write, and revoke."""
perms = ROLE_PERMISSIONS[Role.ADMIN]
assert "alerts:read" in perms
assert "cases:write" in perms
assert "cps:revoke" in perms
def test_viewer_has_limited_permissions(self):
"""Viewer role is read-only."""
perms = ROLE_PERMISSIONS[Role.VIEWER]
assert "alerts:read" in perms
assert "cases:write" not in perms
Writing Domain Model Tests
Domain models are Pydantic classes — test them without any database:
from aurorasoc.models.domain import Alert, Severity, CPSDevice, FirmwareStack
class TestAlertModel:
def test_alert_creation(self):
"""Alert domain model accepts all fields."""
alert = Alert(
id="alert-001",
title="Test alert",
severity=Severity.HIGH,
source="network",
)
assert alert.id == "alert-001"
assert alert.severity == Severity.HIGH
def test_severity_enum_values(self):
"""All severity variants are string-valued."""
for sev in Severity:
assert isinstance(sev.value, str)
Writing Service Tests with Mocks
When testing services that depend on external systems, mock the dependencies:
Mocking Redis with Lua Scripts
AuroraSOC's rate limiter uses a Lua script registered via client.register_script() for atomic sliding-window operations. Mocking this requires understanding the sync/async boundary:
get_redis()is async → patch withAsyncMock(return_value=mock_client)client.register_script()is sync →mock_clientmust beMagicMock()(NOTAsyncMock)- The Lua script callable is async → use
AsyncMock(return_value=count)
from unittest.mock import AsyncMock, MagicMock, patch
from aurorasoc.core.rate_limit import RedisRateLimiter
def _mock_redis(script_return: int):
"""Create a mock Redis client with a Lua script that returns script_return."""
mock_client = MagicMock() # sync — register_script is NOT async
mock_script = AsyncMock(return_value=script_return) # script call IS async
mock_client.register_script.return_value = mock_script
return mock_client, mock_script
async def test_rate_limiter_allows_within_limit():
"""Requests below threshold are allowed."""
mock_client, _ = _mock_redis(script_return=5)
with patch("aurorasoc.core.rate_limit.get_redis",
AsyncMock(return_value=mock_client)):
limiter = RedisRateLimiter(max_requests=100, window_seconds=60)
allowed, remaining = await limiter.check("user-1")
assert allowed is True
assert remaining == 95 # 100 - 5
async def test_rate_limiter_blocks_over_limit():
"""Requests over threshold are blocked."""
mock_client, _ = _mock_redis(script_return=101)
with patch("aurorasoc.core.rate_limit.get_redis",
AsyncMock(return_value=mock_client)):
limiter = RedisRateLimiter(max_requests=100, window_seconds=60)
allowed, remaining = await limiter.check("user-1")
assert allowed is False
assert remaining == 0
If you use AsyncMock() for mock_client, then register_script() returns a coroutine instead of the mock script directly. When the rate limiter tries to call script(keys=..., args=...), it gets TypeError: 'coroutine' object is not callable. Always use MagicMock() for any Redis client mock where sync methods like register_script are used.
Mocking Background Tasks
from unittest.mock import patch, AsyncMock
async def test_scheduler_creates_tasks():
"""start() spawns all 4 background loops."""
scheduler = BackgroundScheduler()
with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
with patch.object(scheduler, "_expired_approval_loop", new=AsyncMock()):
with patch.object(scheduler, "_metrics_collector_loop", new=AsyncMock()):
await scheduler.start()
assert scheduler._running is True
assert len(scheduler._tasks) == 4
await scheduler.stop()
Writing Dashboard Browser Tests
Use Playwright for operator-facing dashboard behavior that depends on browser rendering, auth guards, disabled controls, and runtime-truth messaging.
When To Use This Lane
- The UI must distinguish an unavailable live read from an empty result set.
- You need to verify disabled controls, recovery links, or runtime banners.
- The behavior depends on client-side auth bootstrap or multiple browser fetches on page load.
Keep Browser Tests Deterministic
Dashboard browser tests should mock API traffic in the page instead of depending on a live backend. The existing unavailable-state coverage in dashboard/tests/e2e/runtime-unavailable.spec.ts is the reference pattern.
await page.route("**/api/v1/users/me", async (route) => {
await route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify({
username: "playwright.operator",
role: "operator",
permissions: ["alerts:read", "cases:read"],
auth_provider: "local",
}),
});
});
await page.route("**/api/v1/system/mode", async (route) => {
await route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify({
mode: "dry_run",
description: "Dry run mode is active.",
is_read_only: true,
is_mutations_allowed: false,
is_external_actions_allowed: false,
is_database_available: false,
showcase_reads_allowed: false,
uses_showcase_data: false,
read_data_source: "unavailable",
}),
});
});
Browser Test Checklist
- Mock
/api/v1/auth/providersand/api/v1/users/mefirst so the auth guard settles on the protected route. - Mock
/api/v1/system/modewith the runtime state you want the operator to see. - Mock only the page-specific endpoints needed for that test, such as
/api/v1/alertsor/api/v1/cases. - Assert on operator-visible outcomes: unavailable banners, queue messaging, disabled search or action controls, and recovery links.
- Prefer focused files such as
runtime-unavailable.spec.tsover broad end-to-end flows that try to cover the whole dashboard at once.
Writing Memory System Tests
Memory tests validate the tiered memory configuration and operations:
from aurorasoc.memory.tiered import (
create_tiered_memory,
ANALYST_MEMORY,
HUNTER_MEMORY,
)
class TestTieredMemory:
def test_config_presets(self):
"""Each preset has expected sliding window size."""
assert ANALYST_MEMORY.sliding_window == 30
assert HUNTER_MEMORY.sliding_window == 40
def test_factory_sets_agent_name(self):
"""create_tiered_memory stamps the agent name."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
assert mem._config.agent_name == "TestAgent"
assert mem._config.sliding_window == 30
async def test_add_and_retrieve(self):
"""Messages round-trip through add/messages."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
msg = MagicMock()
msg.text = "Suspicious activity detected"
mem.add(msg)
assert len(mem.messages) == 1
async def test_disabled_features_return_empty(self):
"""Disabled features return empty rather than error."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
# Analyst preset has episodic/threat-intel disabled
assert await mem.recall_similar("query") == []
assert await mem.search_threat_intel("ioc") == []
assert await mem.enrich_ioc("1.2.3.4") == {}
Writing Circuit Breaker Tests
The circuit breaker is a pure state machine — ideal for exhaustive testing:
from aurorasoc.agents.factory import CircuitBreaker, CircuitState
class TestCircuitBreaker:
def test_initial_state_is_closed(self):
"""New circuit breakers start CLOSED (allowing requests)."""
cb = CircuitBreaker(threshold=3, timeout=30)
assert cb.state == CircuitState.CLOSED
def test_opens_after_threshold_failures(self):
"""Circuit opens after N consecutive failures."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_failure()
assert cb.state == CircuitState.OPEN
def test_success_resets_failures(self):
"""A success clears the failure counter."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_success() # Resets!
assert cb.state == CircuitState.CLOSED
assert cb.allow_request() is True
Test Best Practices
Do
- One assertion concept per test — A test can have multiple
assertstatements, but they should all validate the same behavior - Use descriptive names —
test_expired_token_raises_401nottest_token_3 - Test edge cases separately — Empty inputs, None values, boundary conditions each get their own test
- Use fixtures for shared setup — Don't repeat database initialization in every test
- Keep tests fast — If a test takes >1 second, something is wrong
Don't
- Don't test external libraries — Don't test that SQLAlchemy can do a SELECT; test your query logic
- Don't use
time.sleep()— UseAsyncMockand control flow explicitly - Don't share state between tests — Each test must work in isolation
- Don't catch exceptions in tests — Let them propagate; use
pytest.raises()for expected errors
Async Test Checklist
# ✅ Correct - pytest-asyncio handles it (asyncio_mode = auto)
async def test_something():
result = await async_function()
assert result is not None
# ❌ Wrong - don't manually run the loop
def test_something():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_function()) # Don't do this
Adding a New Test File
- Create
tests/test_<module>.py - Import the module under test
- Add fixtures to
conftest.pyif needed (shared across tests) - Write tests following the patterns above
- Run:
pytest tests/test_<module>.py -v - Check coverage:
pytest tests/test_<module>.py --cov=aurorasoc.<module>