انتقل إلى المحتوى الرئيسي

اختبارات الكتابة

يرشدك هذا الدليل خلال عملية كتابة اختبارات جديدة لـ AuroraSOC، ويغطي كل نوع من المكونات بأمثلة ملموسة وأفضل الممارسات.

هيكل ملف الاختبار

يتبع كل ملف اختبار هذا القالب:

"""Tests for <module_name>."""

from __future__ import annotations

import pytest
# Import the module under test
from aurorasoc.module import ClassUnderTest


class TestFeatureName:
"""Group related tests."""

async def test_does_expected_thing(self):
"""Test names describe the expected behavior."""
result = ClassUnderTest().method()
assert result == expected_value

async def test_handles_edge_case(self):
"""Separate test for each edge case."""
with pytest.raises(ValueError):
ClassUnderTest().method(bad_input)

اتفاقيات التسمية

مؤتمرمثاللماذا
الملف: test_<module>.pytest_auth.pypytest الاكتشاف التلقائي
الفئة: Test<Feature>TestJWTAuthالاختبارات المتعلقة بالمجموعة
الوظيفة: test_<behavior>test_expired_token_raisesوصف ما تم اختباره

كتابة اختبارات قاعدة البيانات

تستخدم اختبارات قاعدة البيانات تجهيزات db_session لجلسة SQLite للمعاملات.

إنشاء النماذج والاستعلام عنها

import pytest
from datetime import datetime, timezone
from sqlalchemy import select
from aurorasoc.core.models import AlertModel


async def test_create_alert(db_session):
"""AlertModel persists and retrieves correctly."""
# Arrange
alert = AlertModel(
title="Suspicious SSH brute force",
severity="high",
status="new",
source="network",
description="Multiple failed SSH attempts from 10.0.0.5",
iocs=["10.0.0.5"],
mitre_techniques=["T1110.001"],
created_at=datetime.now(timezone.utc),
)

# Act
db_session.add(alert)
await db_session.commit()

# Assert
result = await db_session.execute(
select(AlertModel).where(AlertModel.title == "Suspicious SSH brute force")
)
fetched = result.scalar_one()

assert fetched.id is not None # UUID was auto-generated
assert fetched.severity == "high"
assert "10.0.0.5" in fetched.iocs
assert fetched.mitre_techniques == ["T1110.001"]

اختبار العلاقات

from aurorasoc.core.models import CaseModel, TimelineEntryModel


async def test_case_with_timeline(db_session):
"""Case timeline entries maintain parent relationship."""
# Create parent
case = CaseModel(
title="Investigation: SSH brute force",
status="open",
severity="high",
assigned_to="analyst-1",
created_at=datetime.now(timezone.utc),
)
db_session.add(case)
await db_session.flush() # Get case.id without committing

# Create child
entry = TimelineEntryModel(
case_id=case.id,
action="case_created",
actor="system",
details="Auto-created from alert correlation",
timestamp=datetime.now(timezone.utc),
)
db_session.add(entry)
await db_session.commit()

# Verify relationship
result = await db_session.execute(
select(TimelineEntryModel).where(TimelineEntryModel.case_id == case.id)
)
fetched = result.scalar_one()
assert fetched.action == "case_created"
assert fetched.case_id == case.id

استخدام البيانات المعبأة مسبقًا

async def test_query_critical_alerts(populated_db):
"""populated_db provides 12 alerts (3 per severity)."""
result = await populated_db.execute(
select(AlertModel).where(AlertModel.severity == "critical")
)
alerts = result.scalars().all()
assert len(alerts) == 3

كتابة اختبارات المصادقة

عادةً ما تكون اختبارات المصادقة منطقية تمامًا، ولا حاجة إلى أي تركيبات.

JWT ذهابًا وإيابًا

from aurorasoc.core.auth import create_token, decode_token
from aurorasoc.models.domain import Role


class TestJWTAuth:
async def test_create_and_decode_token(self):
"""Token round-trip preserves claims."""
token = create_token(sub="analyst-1", role=Role.ANALYST)
payload = decode_token(token)

assert payload.sub == "analyst-1"
assert payload.role == Role.ANALYST

async def test_invalid_token_raises_401(self):
"""Garbage tokens produce HTTP 401."""
with pytest.raises(HTTPException) as exc_info:
decode_token("not.a.real.token")
assert exc_info.value.status_code == 401

أذونات RBAC

from aurorasoc.core.auth import ROLE_PERMISSIONS, Role


class TestRBAC:
def test_admin_has_all_permissions(self):
"""Admin role includes read, write, and revoke."""
perms = ROLE_PERMISSIONS[Role.ADMIN]
assert "alerts:read" in perms
assert "cases:write" in perms
assert "cps:revoke" in perms

def test_viewer_has_limited_permissions(self):
"""Viewer role is read-only."""
perms = ROLE_PERMISSIONS[Role.VIEWER]
assert "alerts:read" in perms
assert "cases:write" not in perms

كتابة اختبارات نموذج المجال

نماذج المجال هي فئات Pydantic — اختبرها بدون أي قاعدة بيانات:

from aurorasoc.models.domain import Alert, Severity, CPSDevice, FirmwareStack


class TestAlertModel:
def test_alert_creation(self):
"""Alert domain model accepts all fields."""
alert = Alert(
id="alert-001",
title="Test alert",
severity=Severity.HIGH,
source="network",
)
assert alert.id == "alert-001"
assert alert.severity == Severity.HIGH

def test_severity_enum_values(self):
"""All severity variants are string-valued."""
for sev in Severity:
assert isinstance(sev.value, str)

كتابة اختبارات الخدمة باستخدام المحاكاة

عند اختبار الخدمات التي تعتمد على أنظمة خارجية، قم بالسخرية من التبعيات:

السخرية من ريديس

from unittest.mock import AsyncMock, MagicMock


async def test_rate_limiter_allows_within_limit():
"""Requests below threshold are allowed."""
# Setup mock Redis client
mock_redis = MagicMock()
mock_pipeline = MagicMock()
mock_pipeline.execute = AsyncMock(
return_value=[0, 1, 5, True] # zremrangebyscore, zadd, zcard=5, expire
)
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=False)
mock_redis.pipeline.return_value = mock_pipeline

# Inject mock
limiter = RateLimiter(max_requests=100, window_seconds=60)
limiter._client = mock_redis

# Test
allowed, remaining = await limiter.check("user-1")
assert allowed is True
assert remaining == 95 # 100 - 5

السخرية من المهام الخلفية

from unittest.mock import patch, AsyncMock


async def test_scheduler_creates_tasks():
"""start() spawns all 4 background loops."""
scheduler = BackgroundScheduler()

with patch.object(scheduler, "_alert_dedup_loop", new=AsyncMock()):
with patch.object(scheduler, "_scheduled_hunt_loop", new=AsyncMock()):
with patch.object(scheduler, "_expired_approval_loop", new=AsyncMock()):
with patch.object(scheduler, "_metrics_collector_loop", new=AsyncMock()):
await scheduler.start()

assert scheduler._running is True
assert len(scheduler._tasks) == 4
await scheduler.stop()

كتابة اختبارات نظام الذاكرة

تتحقق اختبارات الذاكرة من صحة تكوين الذاكرة المتدرجة وعملياتها:

from aurorasoc.memory.tiered import (
create_tiered_memory,
ANALYST_MEMORY,
HUNTER_MEMORY,
)


class TestTieredMemory:
def test_config_presets(self):
"""Each preset has expected sliding window size."""
assert ANALYST_MEMORY.sliding_window == 30
assert HUNTER_MEMORY.sliding_window == 40

def test_factory_sets_agent_name(self):
"""create_tiered_memory stamps the agent name."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
assert mem._config.agent_name == "TestAgent"
assert mem._config.sliding_window == 30

async def test_add_and_retrieve(self):
"""Messages round-trip through add/messages."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
msg = MagicMock()
msg.text = "Suspicious activity detected"
mem.add(msg)
assert len(mem.messages) == 1

async def test_disabled_features_return_empty(self):
"""Disabled features return empty rather than error."""
mem = create_tiered_memory("TestAgent", ANALYST_MEMORY)
# Analyst preset has episodic/threat-intel disabled
assert await mem.recall_similar("query") == []
assert await mem.search_threat_intel("ioc") == []
assert await mem.enrich_ioc("1.2.3.4") == {}

كتابة اختبارات قواطع الدائرة

قاطع الدائرة عبارة عن آلة ذات حالة نقية - مثالية للاختبار الشامل:

from aurorasoc.agents.factory import CircuitBreaker, CircuitState


class TestCircuitBreaker:
def test_initial_state_is_closed(self):
"""New circuit breakers start CLOSED (allowing requests)."""
cb = CircuitBreaker(threshold=3, timeout=30)
assert cb.state == CircuitState.CLOSED

def test_opens_after_threshold_failures(self):
"""Circuit opens after N consecutive failures."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_failure()
assert cb.state == CircuitState.OPEN

def test_success_resets_failures(self):
"""A success clears the failure counter."""
cb = CircuitBreaker(threshold=3, timeout=30)
cb.record_failure()
cb.record_failure()
cb.record_success() # Resets!
assert cb.state == CircuitState.CLOSED
assert cb.allow_request() is True

اختبار أفضل الممارسات

يفعل

  • مفهوم تأكيد واحد لكل اختبار — يمكن أن يحتوي الاختبار على عبارات assert متعددة، ولكن يجب عليها جميعًا التحقق من صحة السلوك نفسه
  • استخدم أسماء وصفيةtest_expired_token_raises_401 وليس test_token_3
  • اختبر حالات الحافة بشكل منفصل — تحصل كل مدخلات فارغة، ولا قيم، وشروط الحدود على اختبار خاص بها
  • استخدم التركيبات للإعداد المشترك — لا تكرر تهيئة قاعدة البيانات في كل اختبار
  • حافظ على سرعة الاختبارات — إذا استغرق الاختبار أكثر من ثانية واحدة، فهذا يعني أن هناك خطأ ما

لا

  • لا تختبر المكتبات الخارجية — لا تختبر قدرة SQLAlchemy على إجراء عملية SELECT؛ اختبار منطق الاستعلام الخاص بك
  • لا تستخدم time.sleep() — استخدم AsyncMock وتحكم في التدفق بشكل صريح
  • لا تشارك الحالة بين الاختبارات — يجب أن يعمل كل اختبار بشكل منفصل
  • لا تلتقط الاستثناءات في الاختبارات — دعها تنتشر؛ استخدم pytest.raises() للأخطاء المتوقعة

قائمة التحقق من اختبار المزامنة

# ✅ Correct - pytest-asyncio handles it (asyncio_mode = auto)
async def test_something():
result = await async_function()
assert result is not None

# ❌ Wrong - don't manually run the loop
def test_something():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_function()) # Don't do this

إضافة ملف اختبار جديد

  1. إنشاء tests/test_<module>.py
  2. قم باستيراد الوحدة قيد الاختبار
  3. أضف تركيبات إلى conftest.py إذا لزم الأمر (مشتركة عبر الاختبارات)
  4. اكتب الاختبارات باتباع الأنماط المذكورة أعلاه
  5. تشغيل: pytest tests/test_<module>.py -v
  6. التحقق من التغطية: pytest tests/test_<module>.py --cov=aurorasoc.<module>