feat(ttp): E.1.9 API contract — seven router endpoints, admin-gated state mutations, response models

Mounts /api/v1/ttp/* with empty-list / empty-Navigator responses.
GET endpoints viewer-gated; POST/DELETE /rules/{rule_id}/state
admin-gated server-side. POST parses JSON manually so a malformed
body returns the documented 400 (per feedback_schemathesis_400).

Drops xfail-strict markers from E.2.8 tests now that the router is
mounted; 26 tests pass against the contract handlers.
This commit is contained in:
2026-05-01 07:20:13 -04:00
parent cfbfaabfcd
commit b7f206c8c5
15 changed files with 515 additions and 56 deletions

View File

@@ -186,13 +186,21 @@ from .tarpit import (
TarpitStatusResponse,
)
from .ttp import (
CampaignTechniqueRow,
CanaryFingerprintEvidence,
CommandEvidence,
EmailEvidence,
IdentityTechniqueRow,
IntelEvidence,
NavigatorLayer,
NavigatorTechnique,
RuleCatalogueRow,
RuleStateRequest,
RuleStateResponse,
TTPRule,
TTPRuleState,
TTPTag,
TechniqueRollupRow,
compute_tag_uuid,
)
@@ -356,12 +364,20 @@ __all__ = [
"TarpitRuleResponse",
"TarpitStatusResponse",
# ttp
"CampaignTechniqueRow",
"CanaryFingerprintEvidence",
"CommandEvidence",
"EmailEvidence",
"IdentityTechniqueRow",
"IntelEvidence",
"NavigatorLayer",
"NavigatorTechnique",
"RuleCatalogueRow",
"RuleStateRequest",
"RuleStateResponse",
"TTPRule",
"TTPRuleState",
"TTPTag",
"TechniqueRollupRow",
"compute_tag_uuid",
]

View File

@@ -10,6 +10,7 @@ import uuid as _uuid
from datetime import datetime, timezone
from typing import Any, Literal, Optional, TypedDict
from pydantic import BaseModel
from sqlalchemy import JSON, CheckConstraint, Column, Index
from sqlmodel import Field, SQLModel
@@ -235,3 +236,117 @@ class TTPRuleState(SQLModel, table=True):
set_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
)
# ── API response models (Pydantic) ──────────────────────────────────
# Routed by `decnet/web/router/ttp/`. Per the project's "all models in
# models.py" rule these live here alongside the SQLModel tables, not
# in a sibling schemas.py. Empty-list returns at contract phase are
# typed against these models so the OpenAPI shape is stable from day
# one. See TTP_TAGGING.md §E.1.9.
class TechniqueRollupRow(BaseModel):
"""One row of /api/v1/ttp/techniques — distinct technique observed
across the fleet with a count and a most-recent-seen timestamp."""
technique_id: str
sub_technique_id: Optional[str] = None
tactic: str
count: int
last_seen: datetime
class IdentityTechniqueRow(BaseModel):
"""One row of the by-identity / by-attacker / by-session endpoints —
a distinct (technique, sub_technique) tuple within the requested
scope, with an aggregate count and first/last-seen timestamps."""
technique_id: str
sub_technique_id: Optional[str] = None
tactic: str
count: int
first_seen: datetime
last_seen: datetime
confidence_max: float
class CampaignTechniqueRow(BaseModel):
"""One row of /api/v1/ttp/by-campaign/{uuid} — a technique observed
across at least one Identity rolled up into the campaign."""
technique_id: str
sub_technique_id: Optional[str] = None
tactic: str
count: int
identity_count: int
last_seen: datetime
class RuleCatalogueRow(BaseModel):
"""One row of /api/v1/ttp/rules — a rule definition + its current
operational state. The operator-facing rule list."""
rule_id: str
rule_version: int
name: str
description: str
state: Literal["enabled", "disabled", "clipped"]
confidence_max: Optional[float] = None
expires_at: Optional[datetime] = None
reason: Optional[str] = None
set_by: Optional[str] = None
set_at: Optional[datetime] = None
class RuleStateRequest(BaseModel):
"""POST /api/v1/ttp/rules/{rule_id}/state body — admin operator
sets disable / clip / TTL on a rule. Pre-v1: schema is the public
contract; downward changes require an OpenAPI version bump."""
state: Literal["enabled", "disabled", "clipped"]
confidence_max: Optional[float] = None
expires_at: Optional[datetime] = None
reason: Optional[str] = None
class RuleStateResponse(BaseModel):
"""Response for POST/DELETE /api/v1/ttp/rules/{rule_id}/state and
the per-rule entry of GET /rules. Mirrors :class:`TTPRuleState`."""
rule_id: str
state: Literal["enabled", "disabled", "clipped"]
confidence_max: Optional[float] = None
expires_at: Optional[datetime] = None
reason: Optional[str] = None
set_by: Optional[str] = None
set_at: Optional[datetime] = None
class NavigatorTechnique(BaseModel):
"""Per-technique entry of the MITRE ATT&CK Navigator JSON layer."""
techniqueID: str
score: int
color: str = ""
comment: str = ""
enabled: bool = True
class NavigatorLayer(BaseModel):
"""MITRE ATT&CK Navigator JSON layer envelope. Empty-but-valid at
contract phase: a SOC analyst pasting this JSON into the official
Navigator sees the file load cleanly with no highlighted
techniques. See TTP_TAGGING.md §"UI surface — Empty state".
"""
name: str = "DECNET TTP coverage"
versions: dict[str, str] = Field(
default_factory=lambda: {
"attack": "15",
"navigator": "5.1.0",
"layer": "4.5",
}
)
domain: str = "enterprise-attack"
description: str = ""
techniques: list[NavigatorTechnique] = Field(default_factory=list)

View File

@@ -53,6 +53,13 @@ from .topology import topology_router
from .canary import canary_router
from .deckies import deckies_router
from .webhooks import webhooks_router
from .ttp.api_get_techniques import router as ttp_techniques_router
from .ttp.api_get_by_identity import router as ttp_by_identity_router
from .ttp.api_get_by_attacker import router as ttp_by_attacker_router
from .ttp.api_get_by_campaign import router as ttp_by_campaign_router
from .ttp.api_get_by_session import router as ttp_by_session_router
from .ttp.api_get_rules import router as ttp_rules_router
from .ttp.api_export_navigator import router as ttp_navigator_router
api_router = APIRouter(
# Every route under /api/v1 is auth-guarded (either by an explicit
@@ -163,3 +170,14 @@ api_router.include_router(deckies_router)
# External webhook subscriptions (SIEM/SOAR egress)
api_router.include_router(webhooks_router)
# TTP Tagging — see development/TTP_TAGGING.md. Contract phase: every
# handler returns the typed empty value; impl phase wires the repo
# and rule engine.
api_router.include_router(ttp_techniques_router)
api_router.include_router(ttp_by_identity_router)
api_router.include_router(ttp_by_attacker_router)
api_router.include_router(ttp_by_campaign_router)
api_router.include_router(ttp_by_session_router)
api_router.include_router(ttp_rules_router)
api_router.include_router(ttp_navigator_router)

View File

@@ -0,0 +1,6 @@
"""TTP-tagging API router package — see development/TTP_TAGGING.md.
Contract phase E.1.9: handlers return typed empty values. The repo
methods (E.1.10) and engine (E.3) land separately; the router shape +
auth gating + OpenAPI surface are stable from this commit forward.
"""

View File

@@ -0,0 +1,56 @@
"""GET /api/v1/ttp/export/navigator{,/identity/{uuid}} — Navigator JSON layer.
Empty-but-valid Navigator layer at contract phase per TTP_TAGGING.md
§"UI surface — Empty state": a SOC analyst pasting the JSON into the
official MITRE ATT&CK Navigator sees the file load with no
highlighted techniques — correct, not broken.
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import NavigatorLayer
from decnet.web.dependencies import require_viewer
router = APIRouter()
@router.get(
"/ttp/export/navigator",
tags=["TTP Tagging"],
response_model=NavigatorLayer,
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
},
)
@_traced("api.ttp.export_navigator_fleet")
async def api_export_navigator_fleet(
user: dict[str, Any] = Depends(require_viewer),
) -> NavigatorLayer:
"""Fleet-wide Navigator layer. Empty-but-valid at contract phase."""
return NavigatorLayer(name="DECNET TTP coverage — fleet")
@router.get(
"/ttp/export/navigator/identity/{identity_uuid}",
tags=["TTP Tagging"],
response_model=NavigatorLayer,
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Identity not found"},
},
)
@_traced("api.ttp.export_navigator_identity")
async def api_export_navigator_identity(
identity_uuid: str,
user: dict[str, Any] = Depends(require_viewer),
) -> NavigatorLayer:
"""Per-Identity Navigator layer (the SOC demo)."""
return NavigatorLayer(
name=f"DECNET TTP coverage — identity {identity_uuid}",
)

View File

@@ -0,0 +1,35 @@
"""GET /api/v1/ttp/by-attacker/{attacker_uuid} — per-IP TTP slice.
Backs the AttackerDetail page's TTP section. See TTP_TAGGING.md
§"UI surface" + project_attacker_detail_keep memory.
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import IdentityTechniqueRow
from decnet.web.dependencies import require_viewer
router = APIRouter()
@router.get(
"/ttp/by-attacker/{attacker_uuid}",
tags=["TTP Tagging"],
response_model=list[IdentityTechniqueRow],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Attacker not found"},
},
)
@_traced("api.ttp.by_attacker")
async def api_ttp_by_attacker(
attacker_uuid: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[IdentityTechniqueRow]:
"""Per-Attacker (per-IP) TTP rows. Empty at contract phase."""
return []

View File

@@ -0,0 +1,31 @@
"""GET /api/v1/ttp/by-campaign/{campaign_uuid} — campaign-wide TTP rollup."""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import CampaignTechniqueRow
from decnet.web.dependencies import require_viewer
router = APIRouter()
@router.get(
"/ttp/by-campaign/{campaign_uuid}",
tags=["TTP Tagging"],
response_model=list[CampaignTechniqueRow],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Campaign not found"},
},
)
@_traced("api.ttp.by_campaign")
async def api_ttp_by_campaign(
campaign_uuid: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[CampaignTechniqueRow]:
"""Campaign-rollup TTP rows. Empty at contract phase."""
return []

View File

@@ -0,0 +1,35 @@
"""GET /api/v1/ttp/by-identity/{identity_uuid} — Identity-scoped TTP rollup.
Primary endpoint for the IdentityDetail "TTPs Observed" section. See
TTP_TAGGING.md §"UI surface". Empty at contract phase (E.1.9).
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import IdentityTechniqueRow
from decnet.web.dependencies import require_viewer
router = APIRouter()
@router.get(
"/ttp/by-identity/{identity_uuid}",
tags=["TTP Tagging"],
response_model=list[IdentityTechniqueRow],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Identity not found"},
},
)
@_traced("api.ttp.by_identity")
async def api_ttp_by_identity(
identity_uuid: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[IdentityTechniqueRow]:
"""Per-Identity TTP heatmap rows. Empty at contract phase."""
return []

View File

@@ -0,0 +1,31 @@
"""GET /api/v1/ttp/by-session/{session_id} — session timeline of TTP tags."""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import IdentityTechniqueRow
from decnet.web.dependencies import require_viewer
router = APIRouter()
@router.get(
"/ttp/by-session/{session_id}",
tags=["TTP Tagging"],
response_model=list[IdentityTechniqueRow],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Session not found"},
},
)
@_traced("api.ttp.by_session")
async def api_ttp_by_session(
session_id: str,
user: dict[str, Any] = Depends(require_viewer),
) -> list[IdentityTechniqueRow]:
"""Per-session TTP tag timeline. Empty at contract phase."""
return []

View File

@@ -0,0 +1,128 @@
"""TTP rule catalogue + admin-only state mutations.
Three endpoints in one router:
* ``GET /api/v1/ttp/rules`` — viewer-readable rule list
* ``POST /api/v1/ttp/rules/{rule_id}/state`` — admin: set state
* ``DELETE /api/v1/ttp/rules/{rule_id}/state`` — admin: revert to default
Per the project's "no client-side role checks" rule, the admin guard
is server-side via :func:`require_admin`. Per
``feedback_schemathesis_400.md``, the POST handler parses the body
manually and returns ``400`` on a malformed JSON body so the
documented status code matches reality.
"""
from __future__ import annotations
from typing import Any
import json
from fastapi import APIRouter, Depends, HTTPException, Request, status
from pydantic import ValidationError
from decnet.telemetry import traced as _traced
from decnet.web.db.models import (
RuleCatalogueRow,
RuleStateRequest,
RuleStateResponse,
)
from decnet.web.dependencies import require_admin, require_viewer
router = APIRouter()
@router.get(
"/ttp/rules",
tags=["TTP Tagging"],
response_model=list[RuleCatalogueRow],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
},
)
@_traced("api.ttp.list_rules")
async def api_list_rules(
user: dict[str, Any] = Depends(require_viewer),
) -> list[RuleCatalogueRow]:
"""Operator-facing rule catalogue. Empty at contract phase."""
return []
@router.post(
"/ttp/rules/{rule_id}/state",
tags=["TTP Tagging"],
response_model=RuleStateResponse,
responses={
400: {"description": "Bad Request (malformed JSON or invalid body)"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Rule not found"},
},
)
@_traced("api.ttp.set_rule_state")
async def api_set_rule_state(
rule_id: str,
request: Request,
admin: dict[str, Any] = Depends(require_admin),
) -> RuleStateResponse:
"""Set operational state (disable / clip / TTL) on a rule.
Body parse is manual so a malformed JSON body surfaces as the
documented ``400`` rather than the framework default of ``422``
(per ``feedback_schemathesis_400.md``).
"""
try:
raw = await request.json()
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body must be valid JSON",
) from exc
try:
body = RuleStateRequest.model_validate(raw)
except ValidationError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid rule-state body: {exc.errors()}",
) from exc
# Contract phase: no persistence yet (E.1.10 / E.3 lands the repo
# write). Echo the requested state back so the response shape is
# exercisable and OpenAPI-stable.
return RuleStateResponse(
rule_id=rule_id,
state=body.state,
confidence_max=body.confidence_max,
expires_at=body.expires_at,
reason=body.reason,
set_by=str(admin.get("sub", "")),
set_at=None,
)
@router.delete(
"/ttp/rules/{rule_id}/state",
tags=["TTP Tagging"],
response_model=RuleStateResponse,
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Rule not found"},
},
)
@_traced("api.ttp.revert_rule_state")
async def api_revert_rule_state(
rule_id: str,
admin: dict[str, Any] = Depends(require_admin),
) -> RuleStateResponse:
"""Revert a rule to the default ``enabled`` state."""
return RuleStateResponse(
rule_id=rule_id,
state="enabled",
confidence_max=None,
expires_at=None,
reason=None,
set_by=str(admin.get("sub", "")),
set_at=None,
)

View File

@@ -0,0 +1,34 @@
"""GET /api/v1/ttp/techniques — distinct techniques observed fleet-wide.
Returns an empty list at contract phase (E.1.9). Repo wiring lands in
E.1.10 / E.3 implementation; the response shape is stable from here.
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends
from decnet.telemetry import traced as _traced
from decnet.web.db.models import TechniqueRollupRow
from decnet.web.dependencies import require_viewer
router = APIRouter()
@router.get(
"/ttp/techniques",
tags=["TTP Tagging"],
response_model=list[TechniqueRollupRow],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
},
)
@_traced("api.ttp.list_techniques")
async def api_list_techniques(
user: dict[str, Any] = Depends(require_viewer),
) -> list[TechniqueRollupRow]:
"""Distinct techniques observed across the fleet, with counts and
last-seen timestamps. Empty list at contract phase."""
return []