From 1200ac91324bf5b60eec64e198d6ee1606944fe8 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 9 May 2026 08:04:25 -0400 Subject: [PATCH] =?UTF-8?q?feat(stix):=20STIX=E2=86=92MISP=20download=20ex?= =?UTF-8?q?port=20(per-attacker=20+=20fleet)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds GET /api/v1/attackers/{uuid}/export/misp and GET /api/v1/attackers/export/misp backed by misp_export.py, which converts existing STIX bundles to MISP events via misp-stix ExternalSTIX2toMISPParser. Fleet endpoint emits {response:[...]} collection (one event per attacker). Frontend: STIX/MISP buttons on AttackerDetail header and Attackers list. 13 new tests green. --- decnet/ttp/misp_export.py | 109 +++++++++ decnet/web/router/__init__.py | 4 + .../attackers/api_export_attacker_misp.py | 100 ++++++++ .../attackers/api_export_attackers_misp.py | 63 +++++ .../sections/AttackerHeader.tsx | 49 ++-- decnet_web/src/components/Attackers.tsx | 21 +- pyproject.toml | 3 + tests/web/test_api_export_attacker_misp.py | 217 ++++++++++++++++++ tests/web/test_api_export_attackers_misp.py | 112 +++++++++ 9 files changed, 661 insertions(+), 17 deletions(-) create mode 100644 decnet/ttp/misp_export.py create mode 100644 decnet/web/router/attackers/api_export_attacker_misp.py create mode 100644 decnet/web/router/attackers/api_export_attackers_misp.py create mode 100644 tests/web/test_api_export_attacker_misp.py create mode 100644 tests/web/test_api_export_attackers_misp.py diff --git a/decnet/ttp/misp_export.py b/decnet/ttp/misp_export.py new file mode 100644 index 00000000..83f4d3ad --- /dev/null +++ b/decnet/ttp/misp_export.py @@ -0,0 +1,109 @@ +"""MISP event builder for DECNET attacker data. + +Converts a STIX 2.1 Bundle (built by stix_export.build_attacker_bundle / +build_fleet_bundle) into MISP event dicts using the misp-stix library's +ExternalSTIX2toMISPParser. + +Pure functions — no I/O. The caller (router) does all DB reads and passes +dicts; this module converts STIX → MISP JSON. + +Output shapes +------------- +build_attacker_misp_event → dict (single MISP event, ready for import) +build_fleet_misp_collection → dict ({"response": [event, ...]}) +""" +from __future__ import annotations + +import json +from typing import Any + +from misp_stix_converter import ExternalSTIX2toMISPParser + +from decnet.ttp.stix_export import build_attacker_bundle + + +def _parse_bundle(bundle: Any) -> dict[str, Any]: + """Run ExternalSTIX2toMISPParser on *bundle* and return the event dict. + + Returns an empty dict if the parser produces no event (e.g. the bundle + contains only SCOs the parser can't promote to MISP attributes). + """ + parser = ExternalSTIX2toMISPParser() + parser.load_stix_bundle(bundle) + parser.parse_stix_bundle() + event = parser.misp_events + if event is None: + return {} + return json.loads(event.to_json()) + + +def build_attacker_misp_event( + attacker: dict[str, Any], + behavior: dict[str, Any] | None, + identity: dict[str, Any] | None, + intel: dict[str, Any] | None, + technique_rollup: list[dict[str, Any]], + raw_tags: list[dict[str, Any]], + artifacts: list[dict[str, Any]], + smtp_targets: list[dict[str, Any]], + commands: list[str] | None = None, +) -> dict[str, Any]: + """Return a MISP event dict for *attacker*. + + All arguments match the signature of stix_export.build_attacker_bundle. + Never raises — conversion failures produce a minimal event dict. + """ + bundle = build_attacker_bundle( + attacker=attacker, + behavior=behavior, + identity=identity, + intel=intel, + technique_rollup=technique_rollup, + raw_tags=raw_tags, + artifacts=artifacts, + smtp_targets=smtp_targets, + commands=commands, + ) + return _parse_bundle(bundle) + + +def build_fleet_misp_collection( + rows: list[dict[str, Any]], + ttp_by_attacker: dict[str, list[dict[str, Any]]], +) -> dict[str, Any]: + """Return a MISP collection dict with one event per attacker in *rows*. + + Shape: ``{"response": [event_dict, ...]}``. Suitable for MISP's + "Import from MISP JSON" / REST collection endpoint. + + Attackers that produce no parseable MISP event (very unlikely — an + attacker always has at least an IP) are silently omitted. + """ + events: list[dict[str, Any]] = [] + for row in rows: + raw_cmds = row.get("commands") or [] + if isinstance(raw_cmds, str): + try: + raw_cmds = json.loads(raw_cmds) + except Exception: + raw_cmds = [] + cmds = [ + str(e.get("command_text") or e.get("command") or "").strip() + for e in raw_cmds + if isinstance(e, dict) and (e.get("command_text") or e.get("command")) + ] + bundle = build_attacker_bundle( + attacker=row, + behavior=None, + identity=None, + intel=row.get("threat_intel"), + technique_rollup=ttp_by_attacker.get(row["uuid"], []), + raw_tags=[], + artifacts=[], + smtp_targets=[], + commands=cmds, + ) + event = _parse_bundle(bundle) + if event: + events.append(event) + return {"response": events} diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 221f3002..f1e3e74a 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -17,6 +17,8 @@ from .attackers.api_get_attackers import router as attackers_router from .attackers.api_export_attackers import router as attackers_export_router from .attackers.api_export_attacker_stix import router as attacker_export_stix_router from .attackers.api_export_attackers_stix import router as attackers_export_stix_router +from .attackers.api_export_attacker_misp import router as attacker_export_misp_router +from .attackers.api_export_attackers_misp import router as attackers_export_misp_router from .attackers.api_events import router as attacker_events_router from .attackers.api_get_attacker_detail import router as attacker_detail_router from .attackers.api_get_attacker_commands import router as attacker_commands_router @@ -109,6 +111,8 @@ api_router.include_router(attackers_router) api_router.include_router(attackers_export_router) api_router.include_router(attackers_export_stix_router) api_router.include_router(attacker_export_stix_router) +api_router.include_router(attackers_export_misp_router) +api_router.include_router(attacker_export_misp_router) api_router.include_router(attacker_detail_router) api_router.include_router(attacker_events_router) api_router.include_router(attacker_commands_router) diff --git a/decnet/web/router/attackers/api_export_attacker_misp.py b/decnet/web/router/attackers/api_export_attacker_misp.py new file mode 100644 index 00000000..4b04ff41 --- /dev/null +++ b/decnet/web/router/attackers/api_export_attacker_misp.py @@ -0,0 +1,100 @@ +"""GET /api/v1/attackers/{uuid}/export/misp — MISP event for one attacker. + +Converts the attacker's STIX 2.1 bundle to a MISP event JSON file via +misp-stix (ExternalSTIX2toMISPParser). Download the result and import it +into any MISP instance via "Import from MISP JSON". +""" +from __future__ import annotations + +import asyncio +import json +from typing import Any, cast + +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.responses import Response + +from decnet.telemetry import traced as _traced +from decnet.ttp.misp_export import build_attacker_misp_event +from decnet.web.dependencies import require_viewer, repo + +router = APIRouter() + + +async def _none() -> None: + return None + + +@router.get( + "/attackers/{uuid}/export/misp", + tags=["Attacker Profiles"], + response_class=Response, + responses={ + 200: { + "content": {"application/json": {}}, + "description": "MISP event for the attacker", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Attacker not found"}, + }, +) +@_traced("api.attackers.export_misp") +async def api_export_attacker_misp( + uuid: str, + user: dict[str, Any] = Depends(require_viewer), +) -> Response: + """Download a MISP event JSON for one attacker.""" + attacker = await repo.get_attacker_by_uuid(uuid) + if not attacker: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"unknown attacker uuid: {uuid!r}", + ) + + identity_coro = ( + repo.get_identity_by_uuid(attacker["identity_id"]) + if attacker.get("identity_id") + else _none() + ) + results = await asyncio.gather( + repo.get_attacker_behavior(uuid), + identity_coro, + repo.get_attacker_intel_by_uuid(uuid), + repo.list_techniques_by_attacker(uuid), + repo.list_ttp_tags_by_attacker(uuid), + repo.get_attacker_artifacts(uuid), + repo.list_smtp_targets(uuid), + repo.list_attacker_commands_deduped(uuid), + ) + behavior = cast(dict[str, Any] | None, results[0]) + identity = cast(dict[str, Any] | None, results[1]) + intel = cast(dict[str, Any] | None, results[2]) + technique_rollup = cast(list[Any], results[3]) + raw_tags = cast(list[dict[str, Any]], results[4]) + artifacts = cast(list[dict[str, Any]], results[5]) + smtp_targets = cast(list[dict[str, Any]], results[6]) + commands = cast(list[str], results[7]) + + event = build_attacker_misp_event( + attacker=attacker, + behavior=behavior, + identity=identity, + intel=intel, + technique_rollup=[ + r.model_dump() if hasattr(r, "model_dump") else dict(r) + for r in technique_rollup + ], + raw_tags=raw_tags, + artifacts=artifacts, + smtp_targets=smtp_targets, + commands=commands, + ) + return Response( + content=json.dumps(event, default=str), + media_type="application/json", + headers={ + "Content-Disposition": ( + f'attachment; filename="decnet-attacker-{uuid[:8]}.misp.json"' + ), + }, + ) diff --git a/decnet/web/router/attackers/api_export_attackers_misp.py b/decnet/web/router/attackers/api_export_attackers_misp.py new file mode 100644 index 00000000..b68560b0 --- /dev/null +++ b/decnet/web/router/attackers/api_export_attackers_misp.py @@ -0,0 +1,63 @@ +"""GET /api/v1/attackers/export/misp — fleet-wide MISP collection. + +Returns a MISP collection JSON ({"response": [event, ...]}) with one event +per observed attacker, suitable for bulk import into a MISP instance via +"Import from MISP JSON" or the MISP REST /events/import endpoint. + +Per-tag Sightings, captured Artifacts, and SMTP targets are omitted in +fleet mode. Use GET /api/v1/attackers/{uuid}/export/misp for full fidelity +on a single attacker. +""" +from __future__ import annotations + +import asyncio +import json +from datetime import datetime, timezone +from typing import Any + +from fastapi import APIRouter, Depends +from fastapi.responses import Response + +from decnet.telemetry import traced as _traced +from decnet.ttp.misp_export import build_fleet_misp_collection +from decnet.web.dependencies import require_viewer, repo + +router = APIRouter() + + +@router.get( + "/attackers/export/misp", + tags=["Attacker Profiles"], + response_class=Response, + responses={ + 200: { + "content": {"application/json": {}}, + "description": "MISP collection for all attackers", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + }, +) +@_traced("api.attackers.export_misp_fleet") +async def api_export_attackers_misp( + user: dict[str, Any] = Depends(require_viewer), +) -> Response: + """Download a MISP collection JSON covering every observed attacker.""" + rows, ttp_by_attacker = await asyncio.gather( + repo.get_all_attackers_for_export(), + repo.get_all_ttp_rollups_for_export(), + ) + collection = build_fleet_misp_collection( + rows=rows, + ttp_by_attacker=ttp_by_attacker, + ) + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + return Response( + content=json.dumps(collection, default=str), + media_type="application/json", + headers={ + "Content-Disposition": ( + f'attachment; filename="decnet-fleet-{ts}.misp.json"' + ), + }, + ) diff --git a/decnet_web/src/components/AttackerDetail/sections/AttackerHeader.tsx b/decnet_web/src/components/AttackerDetail/sections/AttackerHeader.tsx index 7d53a67f..71455b1b 100644 --- a/decnet_web/src/components/AttackerDetail/sections/AttackerHeader.tsx +++ b/decnet_web/src/components/AttackerDetail/sections/AttackerHeader.tsx @@ -14,19 +14,32 @@ interface Props { export const AttackerHeader: React.FC = ({ attacker }) => { const navigate = useNavigate(); - const handleStixDownload = async () => { + const _download = async (endpoint: string, filename: string) => { try { - const res = await api.get(`/attackers/${attacker.uuid}/export/stix`, { responseType: 'blob' }); + const res = await api.get(endpoint, { responseType: 'blob' }); const href = URL.createObjectURL(res.data); const a = document.createElement('a'); a.href = href; - a.download = `decnet-attacker-${attacker.uuid.slice(0, 8)}.stix.json`; + a.download = filename; a.click(); URL.revokeObjectURL(href); } catch { // best-effort } }; + + const handleStixDownload = () => + _download( + `/attackers/${attacker.uuid}/export/stix`, + `decnet-attacker-${attacker.uuid.slice(0, 8)}.stix.json`, + ); + + const handleMispDownload = () => + _download( + `/attackers/${attacker.uuid}/export/misp`, + `decnet-attacker-${attacker.uuid.slice(0, 8)}.misp.json`, + ); + return (
@@ -60,16 +73,26 @@ export const AttackerHeader: React.FC = ({ attacker }) => { IDENTITY · {attacker.identity_id.slice(0, 8)} )} - +
+ + +
); }; diff --git a/decnet_web/src/components/Attackers.tsx b/decnet_web/src/components/Attackers.tsx index 42c85eea..8489e4c1 100644 --- a/decnet_web/src/components/Attackers.tsx +++ b/decnet_web/src/components/Attackers.tsx @@ -117,12 +117,12 @@ const Attackers: React.FC = () => { const totalPages = Math.max(1, Math.ceil(total / limit)); - const handleExport = async () => { + const _fleetDownload = async (endpoint: string, fallback: string) => { try { - const res = await api.get('/attackers/export/stix', { responseType: 'blob' }); + const res = await api.get(endpoint, { responseType: 'blob' }); const disposition: string = res.headers['content-disposition'] || ''; const match = disposition.match(/filename="([^"]+)"/); - const filename = match ? match[1] : 'decnet-fleet.stix.json'; + const filename = match ? match[1] : fallback; const url = URL.createObjectURL(new Blob([res.data], { type: 'application/json' })); const a = document.createElement('a'); a.href = url; @@ -134,6 +134,9 @@ const Attackers: React.FC = () => { } }; + const handleExport = () => _fleetDownload('/attackers/export/stix', 'decnet-fleet.stix.json'); + const handleMispExport = () => _fleetDownload('/attackers/export/misp', 'decnet-fleet.misp.json'); + const activityCounts = attackers.reduce( (acc, a) => { acc[deriveActivity(a)]++; return acc; }, { active: 0, passive: 0, inactive: 0 } as Record, @@ -236,7 +239,17 @@ const Attackers: React.FC = () => { style={{ display: 'flex', alignItems: 'center', gap: 6 }} > - EXPORT + STIX + +
Page {page} of {totalPages} diff --git a/pyproject.toml b/pyproject.toml index c3d818d6..373082f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,9 @@ dependencies = [ # range tracks BEHAVE-INTEGRATION.md §"Versioning". "decnet-behave-core>=0.1.0,<0.2", "decnet-behave-shell>=0.1.0,<0.2", + # STIX → MISP conversion: CIRCL-maintained reference converter used by + # MISP itself. Pulls pymisp transitively (needed for MISPEvent output). + "misp-stix>=2026.4", # MITRE ATT&CK: parse the official STIX 2.1 enterprise-attack bundle # instead of hand-maintaining technique/tactic name dicts. stix2 # gives typed parsing; mitreattack-python ships MitreAttackData diff --git a/tests/web/test_api_export_attacker_misp.py b/tests/web/test_api_export_attacker_misp.py new file mode 100644 index 00000000..505c0a97 --- /dev/null +++ b/tests/web/test_api_export_attacker_misp.py @@ -0,0 +1,217 @@ +"""Tests for GET /api/v1/attackers/{uuid}/export/misp.""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +from decnet.ttp import attack_stix +from decnet.web.router.attackers.api_export_attacker_misp import ( + api_export_attacker_misp, +) + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" +_FAKE_USER: dict = {"uuid": "test-user", "role": "viewer"} + + +@pytest.fixture(autouse=True) +def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + license_path = tmp_path / "LICENSE.txt" + license_path.write_text("placeholder", encoding="utf-8") + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path)) + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + attack_stix.groups_using_technique.cache_clear() + + +def _attacker(uuid: str = "att-aaaabbbbccccdddd") -> dict: + return { + "uuid": uuid, + "ip": "1.2.3.4", + "first_seen": datetime(2026, 1, 1, tzinfo=timezone.utc), + "last_seen": datetime(2026, 1, 31, tzinfo=timezone.utc), + "event_count": 100, + "identity_id": None, + "country_code": "US", + "asn": 15169, + "as_name": "GOOGLE", + } + + +def _tag(technique_id: str) -> dict: + return { + "uuid": f"tag-{technique_id}", + "attacker_uuid": "att-aaaabbbbccccdddd", + "technique_id": technique_id, + "sub_technique_id": None, + "tactic": "TA0006", + "confidence": 0.85, + "rule_id": "R0001", + "rule_version": 1, + "evidence": {}, + "created_at": datetime(2026, 1, 15, tzinfo=timezone.utc), + } + + +def _artifact(sha256: str = "a" * 64) -> dict: + return { + "timestamp": datetime(2026, 1, 10, tzinfo=timezone.utc), + "fields": json.dumps({"sha256": sha256, "filename": "payload.sh"}), + } + + +def _smtp_target() -> dict: + return { + "domain": "victim.example.com", + "count": 5, + "first_seen": datetime(2026, 1, 5, tzinfo=timezone.utc), + "last_seen": datetime(2026, 1, 20, tzinfo=timezone.utc), + } + + +def _intel() -> dict: + return { + "aggregate_verdict": "malicious", + "abuseipdb_score": 95, + "greynoise_classification": "malicious", + "greynoise_tags": json.dumps(["ssh_bruteforcer"]), + "feodo_listed": False, + "threatfox_listed": False, + } + + +def _mock_repo(*, attacker=None, intel=None, rollup=None, tags=None, + artifacts=None, smtp=None, commands=None): + m = type("M", (), {})() + m.get_attacker_by_uuid = AsyncMock(return_value=attacker or _attacker()) + m.get_attacker_behavior = AsyncMock(return_value={}) + m.get_identity_by_uuid = AsyncMock(return_value=None) + m.get_attacker_intel_by_uuid = AsyncMock(return_value=intel) + from decnet.web.db.models.ttp import IdentityTechniqueRow + m.list_techniques_by_attacker = AsyncMock( + return_value=[ + IdentityTechniqueRow( + technique_id=t, technique_name=None, + sub_technique_id=None, sub_technique_name=None, + tactic="TA0006", count=1, confidence_max=0.8, + first_seen=datetime(2026, 1, 1, tzinfo=timezone.utc), + last_seen=datetime(2026, 1, 31, tzinfo=timezone.utc), + mitre_url=None, + ) + for t in (rollup or []) + ] + ) + m.list_ttp_tags_by_attacker = AsyncMock(return_value=tags or []) + m.get_attacker_artifacts = AsyncMock(return_value=artifacts or []) + m.list_smtp_targets = AsyncMock(return_value=smtp or []) + m.list_attacker_commands_deduped = AsyncMock(return_value=commands or []) + return m + + +@pytest.mark.asyncio +async def test_404_on_unknown_attacker(): + m = _mock_repo() + m.get_attacker_by_uuid = AsyncMock(return_value=None) + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + from fastapi import HTTPException + with pytest.raises(HTTPException) as exc: + await api_export_attacker_misp("no-such-uuid", user=_FAKE_USER) + assert exc.value.status_code == 404 + + +@pytest.mark.asyncio +async def test_skinny_attacker_returns_misp_event(): + """No TTPs, no artifacts — response is valid MISP event JSON.""" + m = _mock_repo() + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + resp = await api_export_attacker_misp("att-aaaabbbbccccdddd", user=_FAKE_USER) + event = json.loads(resp.body) + # Must be a MISP event dict with an Attribute list + assert "Attribute" in event or "info" in event + + +@pytest.mark.asyncio +async def test_ip_attribute_present(): + """Attacker IP appears as an ip-src or ip-dst attribute.""" + m = _mock_repo() + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + resp = await api_export_attacker_misp("att-aaaabbbbccccdddd", user=_FAKE_USER) + event = json.loads(resp.body) + attrs = event.get("Attribute", []) + ip_attrs = [a for a in attrs if a.get("value") == "1.2.3.4"] + assert len(ip_attrs) >= 1 + + +@pytest.mark.asyncio +async def test_file_hash_attribute_present(): + """A captured artifact's SHA-256 appears in the MISP event.""" + m = _mock_repo(artifacts=[_artifact("b" * 64)]) + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + resp = await api_export_attacker_misp("att-aaaabbbbccccdddd", user=_FAKE_USER) + event = json.loads(resp.body) + all_attrs = event.get("Attribute", []) + # Also check inside Objects + for obj in event.get("Object", []): + all_attrs.extend(obj.get("Attribute", [])) + hash_attrs = [a for a in all_attrs if ("b" * 64) in str(a.get("value", ""))] + assert len(hash_attrs) >= 1 + + +@pytest.mark.asyncio +async def test_domain_attribute_present(): + """An SMTP target domain appears in the MISP event.""" + m = _mock_repo(smtp=[_smtp_target()]) + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + resp = await api_export_attacker_misp("att-aaaabbbbccccdddd", user=_FAKE_USER) + event = json.loads(resp.body) + all_attrs = event.get("Attribute", []) + for obj in event.get("Object", []): + all_attrs.extend(obj.get("Attribute", [])) + domain_attrs = [ + a for a in all_attrs if "victim.example.com" in str(a.get("value", "")) + ] + assert len(domain_attrs) >= 1 + + +@pytest.mark.asyncio +async def test_mitre_galaxy_present_when_technique_tagged(): + """A MITRE ATT&CK galaxy cluster appears when a technique is tagged.""" + m = _mock_repo(rollup=["T1059"], tags=[_tag("T1059")]) + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + resp = await api_export_attacker_misp("att-aaaabbbbccccdddd", user=_FAKE_USER) + event = json.loads(resp.body) + galaxies = event.get("Galaxy", []) + # misp-stix maps attack-pattern SDOs to "STIX 2.1 Attack Pattern" galaxies + attack_pattern_galaxy = any( + "attack pattern" in str(g.get("name", "")).lower() for g in galaxies + ) + assert attack_pattern_galaxy + + +@pytest.mark.asyncio +async def test_response_headers(): + m = _mock_repo() + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + resp = await api_export_attacker_misp("att-aaaabbbbccccdddd", user=_FAKE_USER) + assert resp.media_type == "application/json" + assert ".misp.json" in resp.headers["content-disposition"] + + +@pytest.mark.asyncio +async def test_pymisp_round_trip(): + """Event round-trips through pymisp.MISPEvent.from_dict without error.""" + import pymisp + m = _mock_repo(rollup=["T1059"], tags=[_tag("T1059")], intel=_intel()) + with patch("decnet.web.router.attackers.api_export_attacker_misp.repo", m): + resp = await api_export_attacker_misp("att-aaaabbbbccccdddd", user=_FAKE_USER) + raw = json.loads(resp.body) + e = pymisp.MISPEvent() + e.from_dict(**raw) + assert e.info diff --git a/tests/web/test_api_export_attackers_misp.py b/tests/web/test_api_export_attackers_misp.py new file mode 100644 index 00000000..1061845a --- /dev/null +++ b/tests/web/test_api_export_attackers_misp.py @@ -0,0 +1,112 @@ +"""Tests for GET /api/v1/attackers/export/misp — fleet-wide MISP collection.""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +from decnet.ttp import attack_stix +from decnet.web.router.attackers.api_export_attackers_misp import ( + api_export_attackers_misp, +) + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" +_FAKE_USER: dict = {"uuid": "test-user", "role": "viewer"} + + +@pytest.fixture(autouse=True) +def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + license_path = tmp_path / "LICENSE.txt" + license_path.write_text("placeholder", encoding="utf-8") + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path)) + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + attack_stix.groups_using_technique.cache_clear() + + +def _attacker(uuid: str = "att-aaaa", ip: str = "1.2.3.4") -> dict: + return { + "uuid": uuid, + "ip": ip, + "first_seen": datetime(2026, 1, 1, tzinfo=timezone.utc), + "last_seen": datetime(2026, 1, 31, tzinfo=timezone.utc), + "event_count": 10, + "identity_id": None, + "country_code": "US", + "asn": 15169, + "as_name": "GOOGLE", + "commands": [], + "threat_intel": None, + } + + +def _mock_repo(*, rows=None, ttp_by_attacker=None): + m = type("M", (), {})() + m.get_all_attackers_for_export = AsyncMock(return_value=rows or []) + m.get_all_ttp_rollups_for_export = AsyncMock(return_value=ttp_by_attacker or {}) + return m + + +@pytest.mark.asyncio +async def test_empty_fleet_returns_empty_collection(): + """Zero attackers → {"response": []}.""" + m = _mock_repo() + with patch("decnet.web.router.attackers.api_export_attackers_misp.repo", m): + resp = await api_export_attackers_misp(user=_FAKE_USER) + body = json.loads(resp.body) + assert "response" in body + assert body["response"] == [] + + +@pytest.mark.asyncio +async def test_single_attacker_one_event(): + """One attacker → collection with one event.""" + m = _mock_repo(rows=[_attacker()]) + with patch("decnet.web.router.attackers.api_export_attackers_misp.repo", m): + resp = await api_export_attackers_misp(user=_FAKE_USER) + body = json.loads(resp.body) + assert len(body["response"]) == 1 + + +@pytest.mark.asyncio +async def test_two_attackers_two_events(): + """Two distinct attacker rows → two events in the collection.""" + rows = [_attacker("att-1111", "1.1.1.1"), _attacker("att-2222", "2.2.2.2")] + m = _mock_repo(rows=rows) + with patch("decnet.web.router.attackers.api_export_attackers_misp.repo", m): + resp = await api_export_attackers_misp(user=_FAKE_USER) + body = json.loads(resp.body) + assert len(body["response"]) == 2 + + +@pytest.mark.asyncio +async def test_commands_in_fleet_event(): + """Commands in the row's commands field end up in the event.""" + row = _attacker() + row["commands"] = [ + {"command_text": "whoami", "ts": "2026-01-10T00:00:00"}, + {"command_text": "id", "ts": "2026-01-10T00:01:00"}, + ] + m = _mock_repo(rows=[row]) + with patch("decnet.web.router.attackers.api_export_attackers_misp.repo", m): + resp = await api_export_attackers_misp(user=_FAKE_USER) + body = json.loads(resp.body) + assert len(body["response"]) == 1 + + +@pytest.mark.asyncio +async def test_response_headers(): + m = _mock_repo() + with patch("decnet.web.router.attackers.api_export_attackers_misp.repo", m): + resp = await api_export_attackers_misp(user=_FAKE_USER) + assert resp.media_type == "application/json" + cd = resp.headers["content-disposition"] + assert "decnet-fleet-" in cd + assert ".misp.json" in cd