اختبارات الكتابة
يرشدك هذا الدليل خلال عملية كتابة اختبارات جديدة لـ AuroraSOC، ويغطي كل نوع من المكونات بأمثلة ملموسة وأفضل الممارسات.
هيكل ملف الاختبار
يتبع كل ملف اختبار هذا القالب:
"""Tests for <module_name>."""
from __future__ import annotations
import pytest
# Import the module under test
from aurorasoc.module import ClassUnderTest
class TestFeatureName:
"""Group related tests."""
async def test_does_expected_thing(self):
"""Test names describe the expected behavior."""
result = ClassUnderTest().method()
assert result == expected_value
async def test_handles_edge_case(self):
"""Separate test for each edge case."""
with pytest.raises(ValueError):
ClassUnderTest().method(bad_input)
اتفاقيات التسمية
| مؤتمر | مثال | لماذا |
|---|---|---|
الملف: test_<module>.py | test_auth.py | pytest الاكتشاف التلقائي |
الفئة: Test<Feature> | TestJWTAuth | الاختبارات المتعلقة بالمجموعة |
الوظيفة: test_<behavior> | test_expired_token_raises | وصف ما تم اختباره |
كتابة اختبارات قاعدة البيانات
تستخدم اختبارات قاعدة البيانات تجهيزات db_session لجلسة SQLite للمعاملات.
إنشاء النماذج والاستعلام عنها
import pytest
from datetime import datetime, timezone
from sqlalchemy import select
from aurorasoc.core.models import AlertModel
async def test_create_alert(db_session):
"""AlertModel persists and retrieves correctly."""
# Arrange
alert = AlertModel(
title="Suspicious SSH brute force",
severity="high",
status="new",
source="network",
description="Multiple failed SSH attempts from 10.0.0.5",
iocs=["10.0.0.5"],
mitre_techniques=["T1110.001"],
created_at=datetime.now(timezone.utc),
)
# Act
db_session.add(alert)
await db_session.commit()
# Assert
result = await db_session.execute(
select(AlertModel).where(AlertModel.title == "Suspicious SSH brute force")
)
fetched = result.scalar_one()
assert fetched.id is not None # UUID was auto-generated
assert fetched.severity == "high"
assert "10.0.0.5" in fetched.iocs
assert fetched.mitre_techniques == ["T1110.001"]
اختبار العلاقات
from aurorasoc.core.models import CaseModel, TimelineEntryModel
async def test_case_with_timeline(db_session):
"""Case timeline entries maintain parent relationship."""
# Create parent
case = CaseModel(
title="Investigation: SSH brute force",
status="open",
severity="high",
assigned_to="analyst-1",
created_at=datetime.now(timezone.utc),
)
db_session.add(case)
await db_session.flush() # Get case.id without committing
# Create child
entry = TimelineEntryModel(
case_id=case.id,
action="case_created",
actor="system",
details="Auto-created from alert correlation",
timestamp=datetime.now(timezone.utc),
)
db_session.add(entry)
await db_session.commit()
# Verify relationship
result = await db_session.execute(
select(TimelineEntryModel).where(TimelineEntryModel.case_id == case.id)
)
fetched = result.scalar_one()
assert fetched.action == "case_created"
assert fetched.case_id == case.id
استخدام البيانات المعبأة مسبقًا
async def test_query_critical_alerts(populated_db):
"""populated_db provides 12 alerts (3 per severity)."""
result = await populated_db.execute(
select(AlertModel).where(AlertModel.severity == "critical")
)
alerts = result.scalars().all()
assert len(alerts) == 3
كتابة اختبارات المصادقة
عادةً ما تكون اختبارات المصادقة منطقية تمامًا، ولا حاجة إلى أي تركيبات.
JWT ذهابًا وإيابًا
from aurorasoc.core.auth import create_token, decode_token
from aurorasoc.models.domain import Role
class TestJWTAuth:
async def test_create_and_decode_token(self):
"""Token round-trip preserves claims."""
token = create_token(sub="analyst-1", role=Role.ANALYST)
payload = decode_token(token)
assert payload.sub == "analyst-1"
assert payload.role == Role.ANALYST
async def test_invalid_token_raises_401(self):
"""Garbage tokens produce HTTP 401."""
with pytest.raises(HTTPException) as exc_info:
decode_token("not.a.real.token")
assert exc_info.value.status_code == 401
أذونات RBAC
from aurorasoc.core.auth import ROLE_PERMISSIONS, Role
class TestRBAC:
def test_admin_has_all_permissions(self):
"""Admin role includes read, write, and revoke."""
perms = ROLE_PERMISSIONS[Role.ADMIN]
assert "alerts:read" in perms
assert "cases:write" in perms
assert "cps:revoke" in perms
def test_viewer_has_limited_permissions(self):
"""Viewer role is read-only."""
perms = ROLE_PERMISSIONS[Role.VIEWER]
assert "alerts:read" in perms
assert "cases:write" not in perms
كتابة اختبارات نموذج المجال
نماذج المجال هي فئات Pydantic — اختبرها بدون أي قاعدة بيانات:
from aurorasoc.models.domain import Alert, Severity, CPSDevice, FirmwareStack
class TestAlertModel:
def test_alert_creation(self):
"""Alert domain model accepts all fields."""
alert = Alert(
id="alert-001",
title="Test alert",
severity=Severity.HIGH,
source="network",
)
assert alert.id == "alert-001"
assert alert.severity == Severity.HIGH
def test_severity_enum_values(self):
"""All severity variants are string-valued."""
for sev in Severity:
assert isinstance(sev.value, str)
كتابة اختبارات الخدمة باستخدام المحاكاة
عند اختبار الخدمات التي تعتمد على أنظمة خارجية، قم بالسخرية من التبعيات:
السخرية من ريديس
from unittest.mock import AsyncMock, MagicMock
async def test_rate_limiter_allows_within_limit():
"""Requests below threshold are allowed."""
# Setup mock Redis client
mock_redis = MagicMock()
mock_pipeline = MagicMock()
mock_pipeline.execute = AsyncMock(
return_value=[0, 1, 5, True] # zremrangebyscore, zadd, zcard=5, expire
)
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=False)
mock_redis.pipeline.return_value = mock_pipeline
# Inject mock
limiter = RateLimiter(max_requests=100, window_seconds=60)
limiter._client = mock_redis
# Test
allowed, remaining = await limiter.check("user-1")
assert allowed is True
assert remaining == 95 # 100 - 5
السخرية من المهام الخلفية
from unittest.mock import patch, AsyncMock
async def test_scheduler_creates_tasks():
"""start() spawns all 4 background loops."""
scheduler = BackgroundScheduler()
with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
with patch.object(scheduler, "_expired_approval_loop", new=AsyncMock()):
with patch.object(scheduler, "_metrics_collector_loop", new=AsyncMock()):
await scheduler.start()
assert scheduler._running is True
assert len(scheduler._tasks) == 4
await scheduler.stop()
كتابة اختبارات نظام الذاكرة
تتحقق اختبارات الذاكرة من صحة تكوين الذاكرة المتدرجة وعملياتها:
from aurorasoc.memory.tiered import (
create_tiered_memory,
ANALYST_MEMORY,
HUNTER_MEMORY,
)
class TestTieredMemory:
def test_config_presets(self):
"""Each preset has expected sliding window size."""
assert ANALYST_MEMORY.sliding_window == 30
assert HUNTER_MEMORY.sliding_window == 40
def test_factory_sets_agent_name(self):
"""create_tiered_memory stamps the agent name."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
assert mem._config.agent_name == "TestAgent"
assert mem._config.sliding_window == 30
async def test_add_and_retrieve(self):
"""Messages round-trip through add/messages."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
msg = MagicMock()
msg.text = "Suspicious activity detected"
mem.add(msg)
assert len(mem.messages) == 1
async def test_disabled_features_return_empty(self):
"""Disabled features return empty rather than error."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
# Analyst preset has episodic/threat-intel disabled
assert await mem.recall_similar("query") == []
assert await mem.search_threat_intel("ioc") == []
assert await mem.enrich_ioc("1.2.3.4") == {}
كتابة اختبارات قواطع الدائرة
قاطع الدائرة عبارة عن آلة ذات حالة نقية - مثالية للاختبار الشامل:
from aurorasoc.agents.factory import CircuitBreaker, CircuitState
class TestCircuitBreaker:
def test_initial_state_is_closed(self):
"""New circuit breakers start CLOSED (allowing requests)."""
cb = CircuitBreaker(threshold=3, timeout=30)
assert cb.state == CircuitState.CLOSED
def test_opens_after_threshold_failures(self):
"""Circuit opens after N consecutive failures."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_failure()
assert cb.state == CircuitState.OPEN
def test_success_resets_failures(self):
"""A success clears the failure counter."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_success() # Resets!
assert cb.state == CircuitState.CLOSED
assert cb.allow_request() is True
اختبار أفضل الممارسات
يفعل
- مفهوم تأكيد واحد لكل اختبار — يمكن أن يحتوي الاختبار على عبارات
assertمتعددة، ولكن يجب عليها جميعًا التحقق من صحة السلوك نفسه - استخدم أسماء وصفية —
test_expired_token_raises_401وليسtest_token_3 - اختبر حالات الحافة بشكل منفصل — تحصل كل مدخلات فارغة، ولا قيم، وشروط الحدود على اختبار خاص بها
- استخدم التركيبات للإعداد المشترك — لا تكرر تهيئة قاعدة البيانات في كل اختبار
- حافظ على سرعة الاختبارات — إذا استغرق الاختبار أكثر من ثانية واحدة، فهذا يعني أن هناك خطأ ما
لا
- لا تختبر المكتبات الخارجية — لا تختبر قدرة SQLAlchemy على إجراء عملية SELECT؛ اختبار منطق الاستعلام الخاص بك
- لا تستخدم
time.sleep()— استخدمAsyncMockوتحكم في التدفق بشكل صريح - لا تشارك الحالة بين الاختبارات — يجب أن يعمل كل اختبار بشكل منفصل
- لا تلتقط الاستثناءات في الاختبارات — دعها تنتشر؛ استخدم
pytest.raises()للأخطاء المتوقعة
قائمة التحقق من اختبار المزامنة
# ✅ Correct - pytest-asyncio handles it (asyncio_mode = auto)
async def test_something():
result = await async_function()
assert result is not None
# ❌ Wrong - don't manually run the loop
def test_something():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_function()) # Don't do this
إضافة ملف اختبار جديد
- إنشاء
tests/test_<module>.py - قم باستيراد الوحدة قيد الاختبار
- أضف تركيبات إلى
conftest.pyإذا لزم الأمر (مشتركة عبر الاختبارات) - اكتب الاختبارات باتباع الأنماط المذكورة أعلاه
- تشغيل:
pytest tests/test_<module>.py -v - التحقق من التغطية:
pytest tests/test_<module>.py --cov=aurorasoc.<module>