feat(ttp): E.3.4 API handlers wired to repo (rollups + Navigator)

Five GET rollup endpoints (techniques, by-identity, by-attacker,
by-campaign, by-session) and the Navigator export (fleet +
per-identity) now call into the TTPMixin methods. Rule catalogue
endpoint still returns [] — backed by the RuleStore which lands
at E.3.5/E.3.6.
This commit is contained in:
2026-05-01 08:06:53 -04:00
parent fee697694d
commit 89ce893792
7 changed files with 37 additions and 19 deletions

View File

@@ -12,8 +12,8 @@ from typing import Any
from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import NavigatorLayer
from decnet.web.dependencies import require_viewer
from decnet.web.db.models import NavigatorLayer, NavigatorTechnique
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@@ -31,8 +31,16 @@ router = APIRouter()
async def api_export_navigator_fleet(
user: dict[str, Any] = Depends(require_viewer),
) -> NavigatorLayer:
"""Fleet-wide Navigator layer. Empty-but-valid at contract phase."""
return NavigatorLayer(name="DECNET TTP coverage — fleet")
"""Fleet-wide Navigator layer."""
rows = await repo.list_distinct_techniques()
techniques = [
NavigatorTechnique(techniqueID=r.technique_id, score=r.count)
for r in rows
]
return NavigatorLayer(
name="DECNET TTP coverage — fleet",
techniques=techniques,
)
@router.get(
@@ -51,6 +59,12 @@ async def api_export_navigator_identity(
user: dict[str, Any] = Depends(require_viewer),
) -> NavigatorLayer:
"""Per-Identity Navigator layer (the SOC demo)."""
rows = await repo.list_techniques_by_identity(identity_uuid)
techniques = [
NavigatorTechnique(techniqueID=r.technique_id, score=r.count)
for r in rows
]
return NavigatorLayer(
name=f"DECNET TTP coverage — identity {identity_uuid}",
techniques=techniques,
)

View File

@@ -11,7 +11,7 @@ from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import IdentityTechniqueRow
from decnet.web.dependencies import require_viewer
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@@ -31,5 +31,5 @@ async def api_ttp_by_attacker(
attacker_uuid: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[IdentityTechniqueRow]:
"""Per-Attacker (per-IP) TTP rows. Empty at contract phase."""
return []
"""Per-Attacker (per-IP) TTP rows."""
return await repo.list_techniques_by_attacker(attacker_uuid)

View File

@@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import CampaignTechniqueRow
from decnet.web.dependencies import require_viewer
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@@ -27,5 +27,5 @@ async def api_ttp_by_campaign(
campaign_uuid: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[CampaignTechniqueRow]:
"""Campaign-rollup TTP rows. Empty at contract phase."""
return []
"""Campaign-rollup TTP rows."""
return await repo.list_techniques_by_campaign(campaign_uuid)

View File

@@ -11,7 +11,7 @@ from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import IdentityTechniqueRow
from decnet.web.dependencies import require_viewer
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@@ -31,5 +31,5 @@ async def api_ttp_by_identity(
identity_uuid: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[IdentityTechniqueRow]:
"""Per-Identity TTP heatmap rows. Empty at contract phase."""
return []
"""Per-Identity TTP heatmap rows."""
return await repo.list_techniques_by_identity(identity_uuid)

View File

@@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import IdentityTechniqueRow
from decnet.web.dependencies import require_viewer
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@@ -27,5 +27,5 @@ async def api_ttp_by_session(
session_id: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[IdentityTechniqueRow]:
"""Per-session TTP tag timeline. Empty at contract phase."""
return []
"""Per-session TTP tag timeline."""
return await repo.list_techniques_by_session(session_id)

View File

@@ -11,7 +11,7 @@ from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import TechniqueRollupRow
from decnet.web.dependencies import require_viewer
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@@ -30,5 +30,5 @@ async def api_list_techniques(
user: dict[str, Any] = Depends(require_viewer),
) -> list[TechniqueRollupRow]:
"""Distinct techniques observed across the fleet, with counts and
last-seen timestamps. Empty list at contract phase."""
return []
last-seen timestamps."""
return await repo.list_distinct_techniques()

View File

@@ -2925,6 +2925,10 @@ Order:
dialect hook fires.
4. **API endpoints** — fill in handlers reading from repo. Empty
store still returns empty lists; `test_*.py` shape tests green.
✅ done. Five GET rollup endpoints + Navigator (fleet + identity)
wired to the repo singleton from `decnet.web.dependencies`. Rule
catalogue (`GET /ttp/rules`) still returns `[]` — backed by the
RuleStore, lands at E.3.5/E.3.6.
5. **RuleStore — FilesystemRuleStore** — implement YAML parse,
Pydantic validation, inotify watch, in-process state cache,
`subscribe_changes()` async iterator yielding per-rule events.