انتقل إلى المحتوى الرئيسي

Test Strategy

AuroraSOC employs a layered testing strategy combining fast unit tests with SQLite-backed integration tests. This document explains the testing philosophy, infrastructure, and patterns used throughout the project.

Testing Philosophy

Why This Approach?

PrincipleReasoning
SQLite for unit testsNo external dependencies — tests run anywhere without Postgres, Redis, or NATS. The type-compilation shims make SQLAlchemy models work identically on SQLite
AsyncMock over real servicesRedis, MQTT, and NATS are mocked to isolate logic from I/O. This makes tests deterministic and fast
Transactional rollbackEach test gets a fresh session that rolls back automatically, preventing cross-test contamination without dropping/recreating tables
No Docker requiredThe entire test suite runs with make test — no containers needed

Test Infrastructure

pytest Configuration

# pytest.ini
[pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_functions = test_*
python_classes = Test*
markers =
integration: marks tests as integration tests
slow: marks tests as slow

asyncio_mode = auto — All async def test_* functions are automatically treated as async tests without needing @pytest.mark.asyncio decorators.

Dependencies

# pyproject.toml [test] extras
pytest = ">=8.0"
pytest-asyncio = ">=0.23"
pytest-cov = ">=4.0"
aiosqlite = ">=0.20" # SQLite async driver for tests

PostgreSQL → SQLite Type Compatibility

The biggest challenge in testing AuroraSOC's database layer is that PostgreSQL-specific column types (UUID, JSONB, ARRAY) don't exist in SQLite. The conftest solves this with SQLAlchemy type compilation overrides:

from sqlalchemy.dialects.postgresql import UUID as PG_UUID, JSONB, ARRAY
from sqlalchemy.ext.compiler import compiles

@compiles(PG_UUID, "sqlite")
def _compile_uuid_sqlite(element, compiler, **kw):
return "CHAR(36)"

@compiles(JSONB, "sqlite")
def _compile_jsonb_sqlite(element, compiler, **kw):
return "JSON"

@compiles(ARRAY, "sqlite")
def _compile_array_sqlite(element, compiler, **kw):
return "JSON"

Why This Works

SQLite's JSON type stores JSON as text, which behaves identically to JSONB for insert/query operations in tests. CHAR(36) stores UUID strings, which is how Python's uuid.UUID serializes anyway. Arrays are stored as JSON arrays — SQLAlchemy's JSON type handles serialization automatically.

Production Differences

While this shim works well for CRUD testing, some PostgreSQL-specific features like JSONB path operators (->, ->>) or array functions (unnest, ANY) will not work in SQLite tests. If you write a query using these features, mark the test with @pytest.mark.integration and run it against a real Postgres container.

Shared Fixtures

All fixtures live in tests/conftest.py:

Event Loop

@pytest.fixture(scope="session")
def event_loop():
"""Create a session-scoped event loop."""
loop = asyncio.new_event_loop()
yield loop
loop.close()

Why session-scoped? A single event loop shared across all tests avoids the overhead of creating/destroying loops per test while ensuring all async operations use the same loop.

Database Engine

@pytest_asyncio.fixture(scope="function")
async def db_engine():
"""Create a test database engine with fresh schema."""
engine = create_async_engine("sqlite+aiosqlite://", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()

Why function-scoped? Each test gets its own engine with a fresh schema. The in-memory SQLite database is created, used, and destroyed per test — guaranteeing isolation.

Database Session

@pytest_asyncio.fixture
async def db_session(db_engine) -> AsyncGenerator[AsyncSession, None]:
"""Provide a transactional test database session."""
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()

Why expire_on_commit=False? After committing in a test, you often want to access the returned object's attributes (like auto-generated id). Without this flag, accessing any attribute after commit would trigger a lazy load that fails in test contexts.

Why rollback() in teardown? Even though the in-memory database is destroyed after each test, the explicit rollback ensures that if a test commits data, the session is in a clean state. This is a defensive pattern.

Mock Redis Publisher

@pytest.fixture
def mock_redis():
"""Mock Redis publisher."""
publisher = AsyncMock()
publisher.publish_audit = AsyncMock()
publisher.publish_alert = AsyncMock()
publisher.publish_agent_task = AsyncMock()
publisher.publish_agent_result = AsyncMock()
return publisher

This provides a drop-in replacement for the real RedisPublisher class. Tests can inject it and verify that the correct publish methods were called using mock_redis.publish_alert.assert_called_once_with(...).

Sample Data Fixtures

@pytest.fixture
def sample_alert_data():
return {
"id": str(uuid.uuid4()),
"title": "Test DNS tunneling alert",
"severity": "critical",
"status": "new",
"source": "network",
"description": "Suspicious DNS queries detected to evil.example.com",
"iocs": ["evil.example.com", "192.168.1.100"],
"mitre_techniques": ["T1071.004"],
}

@pytest.fixture
def sample_device_data():
return {
"device_id": f"test_device_{uuid.uuid4().hex[:8]}",
"device_type": "access_controller",
"firmware_stack": "ada_spark",
"firmware_version": "2.0.0",
"firmware_hash": "a" * 64,
"attestation_status": "verified",
"risk_score": 0.1,
"location": "Test Lab",
}

Pre-populated Database

@pytest_asyncio.fixture
async def populated_db(db_session, sample_alert_data, sample_device_data):
"""Seeds: 12 alerts (3×4 severities) + 1 CPS device + 1 IOC."""
for severity in ["critical", "high", "medium", "low"]:
for i in range(3):
alert = AlertModel(
title=f"Test {severity} alert {i}",
severity=severity, status="new", source="test",
description=f"Test alert description {i}",
iocs=["test.example.com"],
mitre_techniques=["T1071"],
created_at=datetime.now(timezone.utc),
)
db_session.add(alert)
# ... also adds 1 CPSDeviceModel and 1 IOCModel
await db_session.commit()
return db_session

Test Categories

Test FileCountCategoryDependencies
test_auth.py7JWT + RBACNone (pure logic)
test_dispatch.py5Circuit breakerNone (pure logic)
test_models.py5ORM modelsdb_session fixture
test_normalizer.py5Domain modelsNone (Pydantic only)
test_rate_limit.py2Rate limitingMagicMock Redis
test_scheduler.py4Background tasksAsyncMock + patch
test_settings.py4ConfigurationEnvironment variables
test_tiered_memory.py12Agent memoryMagicMock messages
Total44

Running Tests

Basic Commands

# Run all tests
pytest

# With verbose output
pytest -v

# With coverage report
pytest --cov=aurorasoc --cov-report=html

# Run single file
pytest tests/test_auth.py

# Run single test
pytest tests/test_auth.py::TestJWTAuth::test_create_and_decode_token

# Run by marker
pytest -m integration
pytest -m "not slow"

Using Make

make test              # Run all tests
make test-coverage # Run with HTML coverage report

Coverage Report

After running with --cov-report=html, open htmlcov/index.html to see line-by-line coverage.

Mocking Patterns

Pattern 1: AsyncMock for Async Services

For services with async methods (Redis, NATS, database operations):

from unittest.mock import AsyncMock

mock_publisher = AsyncMock()
mock_publisher.publish_alert = AsyncMock(return_value=None)

# Inject into code under test
service = AlertService(publisher=mock_publisher)
await service.create_alert(data)

# Verify
mock_publisher.publish_alert.assert_called_once()

Pattern 2: MagicMock with Pipeline

For Redis operations that use pipelines (rate limiter):

from unittest.mock import MagicMock, AsyncMock

mock_redis = MagicMock()
mock_pipeline = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[0, 1, 5, True])
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=False)
mock_redis.pipeline.return_value = mock_pipeline

limiter._client = mock_redis

Pattern 3: patch.object for Background Loops

For scheduler loops that run indefinitely:

from unittest.mock import patch, AsyncMock

with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
await scheduler.start()
assert scheduler._running is True

Pattern 4: Manual JWT Crafting

For testing token expiry without waiting:

import jwt
from datetime import datetime, timezone, timedelta

expired = jwt.encode(
{
"sub": "testuser",
"role": "analyst",
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
},
settings.jwt_secret_key,
algorithm="HS256",
)