From 1d3086a5c7fb856df71a6bc07b7a237ae6aa16ea Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 9 May 2026 06:45:25 -0400 Subject: [PATCH] =?UTF-8?q?feat(web):=20GET=20/api/v1/ttp/techniques/{id}/?= =?UTF-8?q?groups=20=E2=80=94=20MITRE-tracked=20groups=20using=20a=20techn?= =?UTF-8?q?ique?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Surfaces the intrusion-set reverse index from the loaded ATT&CK bundle: given a technique, returns the list of groups MITRE has documented as using it. Read-only — explicitly NOT an attribution claim about a DECNET attacker. The frontend pulls this lazily when the operator expands a technique panel; payload-size cost on every TTPTagDetailRow makes embedding wasteful for techniques with 50+ documented groups. - decnet/web/router/ttp/api_get_groups_for_technique.py exposes GET /api/v1/ttp/techniques/{technique_id}/groups, response_model list[GroupRef]. Same JWT-viewer auth gating as the rest of the TTP router. 404 when the technique_id doesn't resolve in the bundle. - Sub-techniques are queried directly (no auto-union with parent) to match ATT&CK Navigator semantics; callers that want a broader view query the parent themselves. - tests/ttp/test_groups_for_technique.py covers happy path, 404, sub-technique attribution independence, empty-list-on-zero-groups, and that responses include mitre_url + aliases. - tests/web/test_api_attackers.py: fix pre-existing fixture drift introduced by a2a61b63 — three TestGetAttackerDetail cases were missing AsyncMock for repo.latest_observation_per_primitive, causing TypeError on await of MagicMock. The new groups endpoint doesn't share code with attacker_detail; this is a drive-by fix surfaced by the same suite run. --- decnet/web/router/__init__.py | 2 + .../ttp/api_get_groups_for_technique.py | 50 ++++++++ tests/ttp/test_groups_for_technique.py | 107 ++++++++++++++++++ tests/web/test_api_attackers.py | 5 + 4 files changed, 164 insertions(+) create mode 100644 decnet/web/router/ttp/api_get_groups_for_technique.py create mode 100644 tests/ttp/test_groups_for_technique.py diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 8aadbe8b..ab589a2f 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -64,6 +64,7 @@ from .ttp.api_get_by_session import router as ttp_by_session_router from .ttp.api_get_rules import router as ttp_rules_router from .ttp.api_get_tag_details import router as ttp_tag_details_router from .ttp.api_export_navigator import router as ttp_navigator_router +from .ttp.api_get_groups_for_technique import router as ttp_groups_for_technique_router api_router = APIRouter( # Every route under /api/v1 is auth-guarded (either by an explicit @@ -189,3 +190,4 @@ api_router.include_router(ttp_by_session_router) api_router.include_router(ttp_rules_router) api_router.include_router(ttp_tag_details_router) api_router.include_router(ttp_navigator_router) +api_router.include_router(ttp_groups_for_technique_router) diff --git a/decnet/web/router/ttp/api_get_groups_for_technique.py b/decnet/web/router/ttp/api_get_groups_for_technique.py new file mode 100644 index 00000000..c79dbb80 --- /dev/null +++ b/decnet/web/router/ttp/api_get_groups_for_technique.py @@ -0,0 +1,50 @@ +"""GET /api/v1/ttp/techniques/{technique_id}/groups — MITRE-tracked groups using *technique_id*. + +Read-only reverse-index off the loaded ATT&CK STIX bundle. **NOT an +attribution claim** about a DECNET attacker — given the technique, +return the list of MITRE-tracked intrusion-sets (groups) documented +as using it. The frontend pulls this lazily when the operator +expands a technique panel; payload sizes for large groups (50+ on +some techniques) make embedding on every TTPTagDetailRow wasteful. +""" +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, status + +from decnet.telemetry import traced as _traced +from decnet.ttp import attack_stix +from decnet.web.dependencies import require_viewer + +router = APIRouter() + + +@router.get( + "/ttp/techniques/{technique_id}/groups", + tags=["TTP Tagging"], + response_model=list[attack_stix.GroupRef], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Unknown technique_id"}, + }, +) +@_traced("api.ttp.groups_for_technique") +async def api_groups_for_technique( + technique_id: str, + user: dict[str, Any] = Depends(require_viewer), +) -> list[attack_stix.GroupRef]: + """List MITRE-tracked intrusion-sets that use *technique_id*. + + Sub-techniques are queried directly (no auto-union with the + parent — matches ATT&CK Navigator semantics). Empty list when + the technique exists but has no documented groups; 404 when the + technique_id doesn't resolve in the loaded ATT&CK bundle at all. + """ + if not attack_stix.technique_exists(technique_id): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"unknown technique_id: {technique_id!r}", + ) + return list(attack_stix.groups_using_technique(technique_id)) diff --git a/tests/ttp/test_groups_for_technique.py b/tests/ttp/test_groups_for_technique.py new file mode 100644 index 00000000..4207ac9e --- /dev/null +++ b/tests/ttp/test_groups_for_technique.py @@ -0,0 +1,107 @@ +"""Router-level coverage for GET /api/v1/ttp/techniques/{tid}/groups. + +Calls the handler directly (no TestClient) — the auth dependency is +satisfied by passing a fake user dict. The handler delegates almost +everything to ``attack_stix.groups_using_technique``, which has its +own coverage in ``test_attack_url.py``; the focus here is the 404 +path and the empty-list-on-zero-groups behavior. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest +from fastapi import HTTPException + +from decnet.ttp import attack_stix +from decnet.web.router.ttp.api_get_groups_for_technique import ( + api_groups_for_technique, +) + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" + + +@pytest.fixture(autouse=True) +def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + license_path = tmp_path / "LICENSE.txt" + license_path.write_text("placeholder", encoding="utf-8") + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path)) + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + attack_stix.groups_using_technique.cache_clear() + + +_FAKE_USER: dict = {"uuid": "test-user", "role": "viewer"} + + +@pytest.mark.asyncio +async def test_returns_grouprefs_for_known_technique() -> None: + result = await api_groups_for_technique( + technique_id="T1059", user=_FAKE_USER, + ) + assert isinstance(result, list) + assert len(result) >= 5 + assert all(isinstance(g, attack_stix.GroupRef) for g in result) + # Sorted ordering preserved. + ids = [g.group_id for g in result] + assert ids == sorted(ids) + + +@pytest.mark.asyncio +async def test_returns_404_for_unknown_technique() -> None: + with pytest.raises(HTTPException) as exc: + await api_groups_for_technique( + technique_id="T9999", user=_FAKE_USER, + ) + assert exc.value.status_code == 404 + assert "T9999" in str(exc.value.detail) + + +@pytest.mark.asyncio +async def test_subtechnique_returns_distinct_group_set() -> None: + parent = await api_groups_for_technique( + technique_id="T1059", user=_FAKE_USER, + ) + sub = await api_groups_for_technique( + technique_id="T1059.004", user=_FAKE_USER, + ) + assert len(sub) >= 1 + # Sub-technique attribution is independent of parent (see + # ATT&CK Navigator semantics) — sub may have groups not in parent. + # Just assert both are populated. + assert len(parent) >= len(sub) + + +@pytest.mark.asyncio +async def test_empty_list_for_known_technique_with_no_documented_groups( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A real-bundle technique with zero documented groups returns [].""" + # Force the cache to return () for a real technique. + attack_stix.groups_using_technique.cache_clear() + monkeypatch.setattr( + attack_stix, + "groups_using_technique", + lambda tid: () if tid == "T1110" else attack_stix.groups_using_technique(tid), + ) + result = await api_groups_for_technique( + technique_id="T1110", user=_FAKE_USER, + ) + assert result == [] + + +@pytest.mark.asyncio +async def test_response_includes_mitre_url_and_aliases() -> None: + result = await api_groups_for_technique( + technique_id="T1059", user=_FAKE_USER, + ) + assert all( + g.mitre_url and g.mitre_url.startswith("https://attack.mitre.org/groups/G") + for g in result + ) + # At least one group has multiple aliases. + assert any(len(g.aliases) >= 2 for g in result) diff --git a/tests/web/test_api_attackers.py b/tests/web/test_api_attackers.py index 109aa4c0..09cec26b 100644 --- a/tests/web/test_api_attackers.py +++ b/tests/web/test_api_attackers.py @@ -186,6 +186,7 @@ class TestGetAttackerDetail: mock_repo.get_attacker_service_activity = AsyncMock(return_value=[]) mock_repo.get_attacker_ip_leaks = AsyncMock(return_value=[]) mock_repo.count_attacker_ip_leaks = AsyncMock(return_value=0) + mock_repo.latest_observation_per_primitive = AsyncMock(return_value={}) result = await get_attacker_detail(uuid="att-uuid-1", user={"uuid": "test-user", "role": "viewer"}) @@ -217,6 +218,7 @@ class TestGetAttackerDetail: mock_repo.get_attacker_service_activity = AsyncMock(return_value=[]) mock_repo.get_attacker_ip_leaks = AsyncMock(return_value=[]) mock_repo.count_attacker_ip_leaks = AsyncMock(return_value=0) + mock_repo.latest_observation_per_primitive = AsyncMock(return_value={}) result = await get_attacker_detail(uuid="att-uuid-1", user={"uuid": "test-user", "role": "viewer"}) @@ -244,6 +246,7 @@ class TestGetAttackerDetail: mock_repo.get_attacker_service_activity = AsyncMock(return_value=pairs) mock_repo.get_attacker_ip_leaks = AsyncMock(return_value=[]) mock_repo.count_attacker_ip_leaks = AsyncMock(return_value=0) + mock_repo.latest_observation_per_primitive = AsyncMock(return_value={}) result = await get_attacker_detail( uuid="att-uuid-1", @@ -283,6 +286,7 @@ class TestGetAttackerDetail: mock_repo.get_attacker_service_activity = AsyncMock(return_value=[]) mock_repo.get_attacker_ip_leaks = AsyncMock(return_value=leaks) mock_repo.count_attacker_ip_leaks = AsyncMock(return_value=1) + mock_repo.latest_observation_per_primitive = AsyncMock(return_value={}) result = await get_attacker_detail( uuid="att-uuid-1", @@ -323,6 +327,7 @@ class TestGetAttackerDetail: mock_repo.get_attacker_service_activity = AsyncMock(return_value=[]) mock_repo.get_attacker_ip_leaks = AsyncMock(return_value=first_ten) mock_repo.count_attacker_ip_leaks = AsyncMock(return_value=100) + mock_repo.latest_observation_per_primitive = AsyncMock(return_value={}) result = await get_attacker_detail( uuid="att-uuid-1",