استراتيجية الاختبار
تستخدم AuroraSOC استراتيجية اختبار متعددة الطبقات تجمع بين اختبارات الوحدات السريعة واختبارات التكامل المدعومة بـ SQLite. يشرح هذا المستند فلسفة الاختبار والبنية التحتية والأنماط المستخدمة في المشروع.
اختبار الفلسفة
لماذا هذا النهج؟
| مبدأ | المنطق |
|---|---|
| ** SQLite لاختبارات الوحدة ** | لا توجد تبعيات خارجية - يتم تشغيل الاختبارات في أي مكان بدون Postgres أو Redis أو NATS. تعمل حشوات تجميع النوع على جعل نماذج SQLAlchemy تعمل بشكل مماثل على SQLite |
| ** AsyncMock على الخدمات الحقيقية ** | يتم الاستهزاء بـ Redis وMQTT وNATS لعزل المنطق عن الإدخال/الإخراج. وهذا يجعل الاختبارات حتمية وسريعة |
| ** التراجع عن المعاملات ** | يحصل كل اختبار على جلسة جديدة تتراجع تلقائيًا، مما يمنع تلوث الاختبار المتبادل دون إسقاط/إعادة إنشاء الجداول |
| ** لا يوجد حاجة إلى عامل ميناء ** | تعمل مجموعة الاختبار بأكملها باستخدام make test - دون الحاجة إلى حاويات |
البنية التحتية للاختبار
تكوين بيتيست
# pytest.ini
[pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_functions = test_*
python_classes = Test*
markers =
integration: marks tests as integration tests
slow: marks tests as slow
asyncio_mode = auto — يتم التعامل مع جميع وظائف async def test_* تلقائيًا كاختبارات غير متزامنة دون الحاجة إلى أدوات تزيين @pytest.mark.asyncio.
التبعيات
# pyproject.toml [test] extras
pytest = ">=8.0"
pytest-asyncio = ">=0.23"
pytest-cov = ">=4.0"
aiosqlite = ">=0.20" # SQLite async driver for tests
PostgreSQL → توافق نوع SQLite
التحدي الأكبر في اختبار طبقة قاعدة بيانات AuroraSOC هو أن أنواع الأعمدة الخاصة بـ PostgreSQL (UUID، JSONB، ARRAY) غير موجودة في SQLite. يحل المسابقة هذه المشكلة من خلال تجاوزات تجميع نوع SQLAlchemy:
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, JSONB, ARRAY
from sqlalchemy.ext.compiler import compiles
@compiles(PG_UUID, "sqlite")
def _compile_uuid_sqlite(element, compiler, **kw):
return "CHAR(36)"
@compiles(JSONB, "sqlite")
def _compile_jsonb_sqlite(element, compiler, **kw):
return "JSON"
@compiles(ARRAY, "sqlite")
def _compile_array_sqlite(element, compiler, **kw):
return "JSON"
لماذا يعمل هذا؟
يقوم نوع JSON الخاص بـ SQLite بتخزين JSON كنص، والذي يتصرف بشكل مماثل لـ JSONB لعمليات الإدراج/الاستعلام في الاختبارات. يقوم CHAR(36) بتخزين سلاسل UUID، وهي الطريقة التي يتم بها إجراء تسلسل uuid.UUID الخاص بـ Python على أي حال. يتم تخزين المصفوفات كمصفوفات JSON - يتعامل نوع JSON الخاص بـ SQLAlchemy مع التسلسل تلقائيًا.
بينما تعمل هذه الرقائق بشكل جيد مع اختبار CRUD، فإن بعض الميزات الخاصة بـ PostgreSQL مثل عوامل تشغيل المسار JSONB (->، ->>) أو وظائف المصفوفة (unnest، ANY) لن تعمل في اختبارات SQLite. إذا كتبت استعلامًا باستخدام هذه الميزات، فقم بوضع علامة على الاختبار باستخدام @pytest.mark.integration وقم بتشغيله مقابل حاوية Postgres حقيقية.
التجهيزات المشتركة
جميع التركيبات موجودة في tests/conftest.py:
حلقة الحدث
@pytest.fixture(scope="session")
def event_loop():
"""Create a session-scoped event loop."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
** لماذا يتم تحديد نطاق الجلسة؟ ** تعمل حلقة حدث واحدة مشتركة عبر جميع الاختبارات على تجنب الحمل الزائد لإنشاء/تدمير الحلقات لكل اختبار مع ضمان استخدام جميع العمليات غير المتزامنة لنفس الحلقة.
محرك قاعدة البيانات
@pytest_asyncio.fixture(scope="function")
async def db_engine():
"""Create a test database engine with fresh schema."""
engine = create_async_engine("sqlite+aiosqlite://", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
** لماذا يتم تحديد نطاق الوظيفة؟ ** يحصل كل اختبار على محرك خاص به بمخطط جديد. يتم إنشاء قاعدة بيانات SQLite الموجودة في الذاكرة واستخدامها وتدميرها لكل اختبار، مما يضمن العزل.
جلسة قاعدة البيانات
@pytest_asyncio.fixture
async def db_session(db_engine) -> AsyncGenerator[AsyncSession, None]:
"""Provide a transactional test database session."""
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()
** لماذا expire_on_commit=False؟ ** بعد إجراء الاختبار، غالبًا ما تريد الوصول إلى سمات الكائن الذي تم إرجاعه (مثل id الذي تم إنشاؤه تلقائيًا). بدون هذه العلامة، فإن الوصول إلى أي سمة بعد الالتزام سيؤدي إلى تحميل بطيء يفشل في سياقات الاختبار.
** لماذا rollback() في عملية التفكيك؟ ** على الرغم من تدمير قاعدة البيانات في الذاكرة بعد كل اختبار، فإن التراجع الصريح يضمن أنه في حالة إرسال الاختبار للبيانات، تكون الجلسة في حالة نظيفة. هذا نمط دفاعي.
الناشر وهمية Redis
@pytest.fixture
def mock_redis():
"""Mock Redis publisher."""
publisher = AsyncMock()
publisher.publish_audit = AsyncMock()
publisher.publish_alert = AsyncMock()
publisher.publish_agent_task = AsyncMock()
publisher.publish_agent_result = AsyncMock()
return publisher
يوفر هذا بديلاً مباشرًا لفئة RedisPublisher الحقيقية. يمكن للاختبارات حقنه والتحقق من استدعاء طرق النشر الصحيحة باستخدام mock_redis.publish_alert.assert_called_once_with(...).
تركيبات بيانات العينة
@pytest.fixture
def sample_alert_data():
return {
"id": str(uuid.uuid4()),
"title": "Test DNS tunneling alert",
"severity": "critical",
"status": "new",
"source": "network",
"description": "Suspicious DNS queries detected to evil.example.com",
"iocs": ["evil.example.com", "192.168.1.100"],
"mitre_techniques": ["T1071.004"],
}
@pytest.fixture
def sample_device_data():
return {
"device_id": f"test_device_{uuid.uuid4().hex[:8]}",
"device_type": "access_controller",
"firmware_stack": "ada_spark",
"firmware_version": "2.0.0",
"firmware_hash": "a" * 64,
"attestation_status": "verified",
"risk_score": 0.1,
"location": "Test Lab",
}
قاعدة بيانات مملوءة مسبقًا
@pytest_asyncio.fixture
async def populated_db(db_session, sample_alert_data, sample_device_data):
"""Seeds: 12 alerts (3×4 severities) + 1 CPS device + 1 IOC."""
for severity in ["critical", "high", "medium", "low"]:
for i in range(3):
alert = AlertModel(
title=f"Test {severity} alert {i}",
severity=severity, status="new", source="test",
description=f"Test alert description {i}",
iocs=["test.example.com"],
mitre_techniques=["T1071"],
created_at=datetime.now(timezone.utc),
)
db_session.add(alert)
# ... also adds 1 CPSDeviceModel and 1 IOCModel
await db_session.commit()
return db_session
فئات الاختبار
| ملف الاختبار | عدد | فئة | التبعيات |
|---|---|---|---|
test_auth.py | 7 | جوت + رباك | لا شيء (منطق خالص) |
test_dispatch.py | 5 | قاطع الدائرة | لا شيء (منطق خالص) |
test_models.py | 5 | نماذج أورم | تركيبات db_session |
test_normalizer.py | 5 | نماذج المجال | لا شيء (Pydantic فقط) |
test_rate_limit.py | 2 | الحد من المعدل | MagicMock ريديس |
test_scheduler.py | 4 | مهام الخلفية | AsyncMock + patch |
test_settings.py | 4 | إعدادات | متغيرات البيئة |
test_tiered_memory.py | 12 | ذاكرة الوكيل | رسائل MagicMock |
| المجموع | 44 |
تشغيل الاختبارات
الأوامر الأساسية
# Run all tests
pytest
# With verbose output
pytest -v
# With coverage report
pytest --cov=aurorasoc --cov-report=html
# Run single file
pytest tests/test_auth.py
# Run single test
pytest tests/test_auth.py::TestJWTAuth::test_create_and_decode_token
# Run by marker
pytest -m integration
pytest -m "not slow"
باستخدام جعل
make test # Run all tests
make test-cov # Run with HTML coverage report
تقرير التغطية
بعد التشغيل باستخدام --cov-report=html، افتح htmlcov/index.html لرؤية التغطية سطرًا تلو الآخر.
أنماط ساخرة
النمط 1: AsyncMock للخدمات غير المتزامنة
للخدمات ذات أساليب async (Redis، NATS، عمليات قاعدة البيانات):
from unittest.mock import AsyncMock
mock_publisher = AsyncMock()
mock_publisher.publish_alert = AsyncMock(return_value=None)
# Inject into code under test
service = AlertService(publisher=mock_publisher)
await service.create_alert(data)
# Verify
mock_publisher.publish_alert.assert_called_once()
النمط 2: MagicMock مع خط الأنابيب
بالنسبة لعمليات Redis التي تستخدم خطوط الأنابيب (محدد المعدل):
from unittest.mock import MagicMock, AsyncMock
mock_redis = MagicMock()
mock_pipeline = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[0, 1, 5, True])
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=False)
mock_redis.pipeline.return_value = mock_pipeline
limiter._client = mock_redis
النمط 3: patch.object لحلقات الخلفية
بالنسبة لحلقات الجدولة التي تعمل إلى أجل غير مسمى:
from unittest.mock import patch, AsyncMock
with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
await scheduler.start()
assert scheduler._running is True
النمط 4: صياغة JWT اليدوية
لاختبار انتهاء صلاحية الرمز المميز دون انتظار:
import jwt
from datetime import datetime, timezone, timedelta
expired = jwt.encode(
{
"sub": "testuser",
"role": "analyst",
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
},
settings.jwt_secret_key,
algorithm="HS256",
)