انتقل إلى المحتوى الرئيسي

فئة قاعدة الأداة

تعمل جميع أدوات MCP البالغ عددها 31 أداة في AuroraSOC على توسيع الفئة الأساسية AuroraTool المحددة في aurorasoc/tools/base.py. تعمل هذه الفئة على ربط واجهة أداة إطار عمل BeeAI بنمط تنفيذ أبسط.

التسلسل الهرمي للفئة

تنفيذ أداة أورورا

from beeai import Tool, ToolOutput

class AuroraTool(Tool):
"""Base class for all AuroraSOC tools.

Bridges BeeAI's _run(input, options, context) interface
to a simpler _execute(**kwargs) pattern.
"""

async def _run(self, input: dict, options: dict, context: dict) -> ToolOutput:
"""BeeAI calls this method. We parse input and delegate."""
try:
# Parse input JSON to kwargs
kwargs = self._parse_input(input)

# Call subclass implementation
result = await self._execute(**kwargs)

# Wrap in ToolOutput
return ToolOutput(
result=json.dumps(result),
metadata={"tool": self.name, "success": True}
)
except Exception as e:
return ToolOutput(
result=json.dumps({"error": str(e)}),
metadata={"tool": self.name, "success": False}
)

async def _execute(self, **kwargs) -> dict:
"""Subclasses implement this."""
raise NotImplementedError

لماذا هذا التجريد؟

بدون AuroraTool:

class SearchLogs(Tool):
async def _run(self, input, options, context):
# Every tool repeats: parse input, handle errors, format output
try:
data = json.loads(input) if isinstance(input, str) else input
query = data.get("query", "")
# ... actual logic ...
return ToolOutput(result=json.dumps(result), metadata={...})
except Exception as e:
return ToolOutput(result=json.dumps({"error": str(e)}), metadata={...})

مع AuroraTool:

class SearchLogs(AuroraTool):
async def _execute(self, query: str, time_range: str = "15m", source: str = None) -> dict:
# Just the business logic
return {"events": [...], "count": 47}

فوائد:

  1. جاف — معالجة الأخطاء، وتحليل الإدخال، وتنسيق الإخراج مرة واحدة
  2. قابل للاختبار — اختبر _execute() مباشرة بدون إعداد إطار عمل BeeAI
  3. متسق — تقوم كل أداة بإرجاع نفس التنسيق
  4. مكتوب — توفر Kwargs توقيعات معلمات واضحة

تعريف مخطط الإدخال

تحدد كل أداة مخطط الإدخال الخاص بها لاستدعاء أداة LLM:

class SearchLogs(AuroraTool):
name = "search_logs"
description = "Search SIEM logs by query string, time range, and source"
input_schema = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (e.g., 'src_ip:192.168.1.100')"
},
"time_range": {
"type": "string",
"description": "Time window (e.g., '15m', '1h', '24h')",
"default": "15m"
},
"source": {
"type": "string",
"description": "Filter by source system",
"enum": ["wazuh", "suricata", "zeek", "velociraptor"]
}
},
"required": ["query"]
}

يرى LLM هذا المخطط ويقوم بإنشاء استدعاءات للأدوات مثل:

{"tool": "search_logs", "input": {"query": "src_ip:203.0.113.50", "time_range": "1h"}}

إنشاء أداة جديدة

1. قم بإنشاء فئة الأداة

# aurorasoc/tools/my_domain/my_tool.py
from aurorasoc.tools.base import AuroraTool

class MyNewTool(AuroraTool):
name = "my_new_tool"
description = "Describe what this tool does for the LLM"
input_schema = {
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "What this parameter means"
}
},
"required": ["param1"]
}

async def _execute(self, param1: str) -> dict:
# Your tool logic here
result = await some_external_api(param1)
return {"status": "success", "data": result}

2. سجل في الوحدة __init__.py

# aurorasoc/tools/my_domain/__init__.py
from .my_tool import MyNewTool

__all__ = ["MyNewTool"]

3. أضف إلى مصنع الوكيل

# In the appropriate factory method
tools = [
MyNewTool(),
# ... other tools
]

4. أضف إلى سجل MCP (اختياري)

# aurorasoc/tools/registry/server.py
from aurorasoc.tools.my_domain import MyNewTool
registry.register(MyNewTool())

استراتيجية التعامل مع الأخطاء

يجب أن تعرض الأدوات أخطاء منظمة، وليس رفع الاستثناءات:

async def _execute(self, hostname: str) -> dict:
try:
result = await isolate_host(hostname)
return {"status": "isolated", "hostname": hostname}
except HostNotFoundError:
return {"error": "host_not_found", "hostname": hostname}
except PermissionError:
return {"error": "insufficient_permissions", "hostname": hostname}

يمكن لـ LLM بعد ذلك التفكير في الأخطاء: "لم يتم العثور على المضيف. اسمح لي بالتحقق من اسم المضيف..."

اتفاقية إرجاع الأداة

قم دائمًا بإرجاع dict من _execute(). قم بتضمين حقل "status" أو "error" حتى يتمكن LLM من التمييز بين النجاح والفشل دون تحليل اللغة الطبيعية.