Test Strategy
AuroraSOC employs a layered testing strategy combining fast unit tests with SQLite-backed integration tests. This document explains the testing philosophy, infrastructure, and patterns used throughout the project.
Testing Philosophy
Why This Approach?
| Principle | Reasoning |
|---|---|
| SQLite for unit tests | No external dependencies — tests run anywhere without Postgres, Redis, or NATS. The type-compilation shims make SQLAlchemy models work identically on SQLite |
| AsyncMock over real services | Redis, MQTT, and NATS are mocked to isolate logic from I/O. This makes tests deterministic and fast |
| Transactional rollback | Each test gets a fresh session that rolls back automatically, preventing cross-test contamination without dropping/recreating tables |
| No Docker required | The entire test suite runs with make test — no containers needed |
Test Infrastructure
pytest Configuration
# pytest.ini
[pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_functions = test_*
python_classes = Test*
markers =
integration: marks tests as integration tests
slow: marks tests as slow
asyncio_mode = auto — All async def test_* functions are automatically treated as async tests without needing @pytest.mark.asyncio decorators.
Dependencies
# pyproject.toml [test] extras
pytest = ">=8.0"
pytest-asyncio = ">=0.23"
pytest-cov = ">=4.0"
aiosqlite = ">=0.20" # SQLite async driver for tests
PostgreSQL → SQLite Type Compatibility
The biggest challenge in testing AuroraSOC's database layer is that PostgreSQL-specific column types (UUID, JSONB, ARRAY) don't exist in SQLite. The conftest solves this with SQLAlchemy type compilation overrides:
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, JSONB, ARRAY
from sqlalchemy.ext.compiler import compiles
@compiles(PG_UUID, "sqlite")
def _compile_uuid_sqlite(element, compiler, **kw):
return "CHAR(36)"
@compiles(JSONB, "sqlite")
def _compile_jsonb_sqlite(element, compiler, **kw):
return "JSON"
@compiles(ARRAY, "sqlite")
def _compile_array_sqlite(element, compiler, **kw):
return "JSON"
Why This Works
SQLite's JSON type stores JSON as text, which behaves identically to JSONB for insert/query operations in tests. CHAR(36) stores UUID strings, which is how Python's uuid.UUID serializes anyway. Arrays are stored as JSON arrays — SQLAlchemy's JSON type handles serialization automatically.
While this shim works well for CRUD testing, some PostgreSQL-specific features like JSONB path operators (->, ->>) or array functions (unnest, ANY) will not work in SQLite tests. If you write a query using these features, mark the test with @pytest.mark.integration and run it against a real Postgres container.
Shared Fixtures
All fixtures live in tests/conftest.py:
Event Loop
@pytest.fixture(scope="session")
def event_loop():
"""Create a session-scoped event loop."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
Why session-scoped? A single event loop shared across all tests avoids the overhead of creating/destroying loops per test while ensuring all async operations use the same loop.
Database Engine
@pytest_asyncio.fixture(scope="function")
async def db_engine():
"""Create a test database engine with fresh schema."""
engine = create_async_engine("sqlite+aiosqlite://", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
Why function-scoped? Each test gets its own engine with a fresh schema. The in-memory SQLite database is created, used, and destroyed per test — guaranteeing isolation.
Database Session
@pytest_asyncio.fixture
async def db_session(db_engine) -> AsyncGenerator[AsyncSession, None]:
"""Provide a transactional test database session."""
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()
Why expire_on_commit=False? After committing in a test, you often want to access the returned object's attributes (like auto-generated id). Without this flag, accessing any attribute after commit would trigger a lazy load that fails in test contexts.
Why rollback() in teardown? Even though the in-memory database is destroyed after each test, the explicit rollback ensures that if a test commits data, the session is in a clean state. This is a defensive pattern.
Mock Redis Publisher
@pytest.fixture
def mock_redis():
"""Mock Redis publisher."""
publisher = AsyncMock()
publisher.publish_audit = AsyncMock()
publisher.publish_alert = AsyncMock()
publisher.publish_agent_task = AsyncMock()
publisher.publish_agent_result = AsyncMock()
return publisher
This provides a drop-in replacement for the real RedisPublisher class. Tests can inject it and verify that the correct publish methods were called using mock_redis.publish_alert.assert_called_once_with(...).
Sample Data Fixtures
@pytest.fixture
def sample_alert_data():
return {
"id": str(uuid.uuid4()),
"title": "Test DNS tunneling alert",
"severity": "critical",
"status": "new",
"source": "network",
"description": "Suspicious DNS queries detected to evil.example.com",
"iocs": ["evil.example.com", "192.168.1.100"],
"mitre_techniques": ["T1071.004"],
}
@pytest.fixture
def sample_device_data():
return {
"device_id": f"test_device_{uuid.uuid4().hex[:8]}",
"device_type": "access_controller",
"firmware_stack": "ada_spark",
"firmware_version": "2.0.0",
"firmware_hash": "a" * 64,
"attestation_status": "verified",
"risk_score": 0.1,
"location": "Test Lab",
}
Pre-populated Database
@pytest_asyncio.fixture
async def populated_db(db_session, sample_alert_data, sample_device_data):
"""Seeds: 12 alerts (3×4 severities) + 1 CPS device + 1 IOC."""
for severity in ["critical", "high", "medium", "low"]:
for i in range(3):
alert = AlertModel(
title=f"Test {severity} alert {i}",
severity=severity, status="new", source="test",
description=f"Test alert description {i}",
iocs=["test.example.com"],
mitre_techniques=["T1071"],
created_at=datetime.now(timezone.utc),
)
db_session.add(alert)
# ... also adds 1 CPSDeviceModel and 1 IOCModel
await db_session.commit()
return db_session
Test Categories
| Test File | Count | Category | Dependencies |
|---|---|---|---|
test_auth.py | 7 | JWT + RBAC | None (pure logic) |
test_dispatch.py | 5 | Circuit breaker | None (pure logic) |
test_models.py | 5 | ORM models | db_session fixture |
test_normalizer.py | 5 | Domain models | None (Pydantic only) |
test_rate_limit.py | 2 | Rate limiting | MagicMock Redis |
test_scheduler.py | 4 | Background tasks | AsyncMock + patch |
test_settings.py | 4 | Configuration | Environment variables |
test_tiered_memory.py | 12 | Agent memory | MagicMock messages |
| Total | 44 |
Running Tests
Basic Commands
# Run all tests
pytest
# With verbose output
pytest -v
# With coverage report
pytest --cov=aurorasoc --cov-report=html
# Run single file
pytest tests/test_auth.py
# Run single test
pytest tests/test_auth.py::TestJWTAuth::test_create_and_decode_token
# Run by marker
pytest -m integration
pytest -m "not slow"
Using Make
make test # Run all tests
make test-coverage # Run with HTML coverage report
Coverage Report
After running with --cov-report=html, open htmlcov/index.html to see line-by-line coverage.
Mocking Patterns
Pattern 1: AsyncMock for Async Services
For services with async methods (Redis, NATS, database operations):
from unittest.mock import AsyncMock
mock_publisher = AsyncMock()
mock_publisher.publish_alert = AsyncMock(return_value=None)
# Inject into code under test
service = AlertService(publisher=mock_publisher)
await service.create_alert(data)
# Verify
mock_publisher.publish_alert.assert_called_once()
Pattern 2: MagicMock with Pipeline
For Redis operations that use pipelines (rate limiter):
from unittest.mock import MagicMock, AsyncMock
mock_redis = MagicMock()
mock_pipeline = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[0, 1, 5, True])
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=False)
mock_redis.pipeline.return_value = mock_pipeline
limiter._client = mock_redis
Pattern 3: patch.object for Background Loops
For scheduler loops that run indefinitely:
from unittest.mock import patch, AsyncMock
with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
await scheduler.start()
assert scheduler._running is True
Pattern 4: Manual JWT Crafting
For testing token expiry without waiting:
import jwt
from datetime import datetime, timezone, timedelta
expired = jwt.encode(
{
"sub": "testuser",
"role": "analyst",
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
},
settings.jwt_secret_key,
algorithm="HS256",
)