feat(ttp): STIX 2.1 bundle export for individual attackers
GET /api/v1/attackers/{uuid}/export/stix returns a self-contained STIX
2.1 bundle: ip observation, threat-actor, ATT&CK attack-patterns with
canonical MITRE IDs, uses relationships, per-tag sightings, file SCOs
for artifacts, domain-name SCOs for SMTP targets, and a provider intel
note. Attack-pattern SDOs carry the MITRE bundle IDs so consumers
deduplicating against the public ATT&CK bundle get exact matches.
This commit is contained in:
@@ -1485,6 +1485,13 @@ class BaseRepository(ABC):
|
||||
"""Fleet-wide distinct-technique rollup."""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def list_ttp_tags_by_attacker(
|
||||
self, uuid: str, limit: int = 2000,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Raw ``ttp_tag`` rows for one attacker (for STIX export + similar)."""
|
||||
raise NotImplementedError
|
||||
|
||||
async def list_ttp_decky_phases(
|
||||
self, identity_uuid: str,
|
||||
) -> list[dict[str, Any]]:
|
||||
|
||||
@@ -342,6 +342,25 @@ class TTPMixin(_MixinBase):
|
||||
res = await session.execute(stmt)
|
||||
return [r.model_dump(mode="json") for r in res.scalars().all()]
|
||||
|
||||
async def list_ttp_tags_by_attacker(
|
||||
self, uuid: str, limit: int = 2000,
|
||||
) -> list[dict]:
|
||||
"""Raw ``ttp_tag`` rows for one attacker UUID. Newest-first.
|
||||
|
||||
Used by the STIX exporter (and similar full-row consumers) that
|
||||
need per-tag granularity — distinct from the rollup returned by
|
||||
:meth:`list_techniques_by_attacker`.
|
||||
"""
|
||||
async with self._session() as session:
|
||||
stmt: Any = (
|
||||
select(TTPTag)
|
||||
.where(TTPTag.attacker_uuid == uuid)
|
||||
.order_by(col(TTPTag.created_at).desc())
|
||||
.limit(limit)
|
||||
)
|
||||
res = await session.execute(stmt)
|
||||
return [r.model_dump(mode="json") for r in res.scalars().all()]
|
||||
|
||||
# ── Backfill iterators (E.4) ────────────────────────────────────
|
||||
#
|
||||
# Read-only iterators consumed by ``decnet ttp backfill`` to replay
|
||||
|
||||
@@ -15,6 +15,7 @@ from .fleet.api_deploy_deckies import router as deploy_deckies_router
|
||||
from .stream.api_stream_events import router as stream_router
|
||||
from .attackers.api_get_attackers import router as attackers_router
|
||||
from .attackers.api_export_attackers import router as attackers_export_router
|
||||
from .attackers.api_export_attacker_stix import router as attacker_export_stix_router
|
||||
from .attackers.api_events import router as attacker_events_router
|
||||
from .attackers.api_get_attacker_detail import router as attacker_detail_router
|
||||
from .attackers.api_get_attacker_commands import router as attacker_commands_router
|
||||
@@ -105,6 +106,7 @@ api_router.include_router(deploy_deckies_router)
|
||||
# Attacker Profiles
|
||||
api_router.include_router(attackers_router)
|
||||
api_router.include_router(attackers_export_router)
|
||||
api_router.include_router(attacker_export_stix_router)
|
||||
api_router.include_router(attacker_detail_router)
|
||||
api_router.include_router(attacker_events_router)
|
||||
api_router.include_router(attacker_commands_router)
|
||||
|
||||
101
decnet/web/router/attackers/api_export_attacker_stix.py
Normal file
101
decnet/web/router/attackers/api_export_attacker_stix.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""GET /api/v1/attackers/{uuid}/export/stix — STIX 2.1 bundle for one attacker.
|
||||
|
||||
Returns a self-contained STIX 2.1 Bundle with the attacker's IP
|
||||
observation, ATT&CK technique usage (attack-patterns + uses
|
||||
relationships + per-tag sightings), captured artifacts (files),
|
||||
SMTP targets, and provider intel summary (note). All SDOs are signed
|
||||
under a stable DECNET org Identity.
|
||||
|
||||
Attack-pattern SDOs carry the canonical MITRE STIX IDs so the bundle
|
||||
is deduplicated by consumers who already have the public ATT&CK bundle.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any, cast
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.responses import Response
|
||||
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.ttp.stix_export import build_attacker_bundle
|
||||
from decnet.web.dependencies import require_viewer, repo
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
async def _none() -> None:
|
||||
return None
|
||||
|
||||
|
||||
@router.get(
|
||||
"/attackers/{uuid}/export/stix",
|
||||
tags=["Attacker Profiles"],
|
||||
response_class=Response,
|
||||
responses={
|
||||
200: {
|
||||
"content": {"application/json": {}},
|
||||
"description": "STIX 2.1 bundle for the attacker",
|
||||
},
|
||||
401: {"description": "Could not validate credentials"},
|
||||
403: {"description": "Insufficient permissions"},
|
||||
404: {"description": "Attacker not found"},
|
||||
},
|
||||
)
|
||||
@_traced("api.attackers.export_stix")
|
||||
async def api_export_attacker_stix(
|
||||
uuid: str,
|
||||
user: dict[str, Any] = Depends(require_viewer),
|
||||
) -> Response:
|
||||
"""Download a STIX 2.1 bundle for one attacker."""
|
||||
attacker = await repo.get_attacker_by_uuid(uuid)
|
||||
if not attacker:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"unknown attacker uuid: {uuid!r}",
|
||||
)
|
||||
|
||||
identity_coro = (
|
||||
repo.get_identity_by_uuid(attacker["identity_id"])
|
||||
if attacker.get("identity_id")
|
||||
else _none()
|
||||
)
|
||||
results = await asyncio.gather(
|
||||
repo.get_attacker_behavior(uuid),
|
||||
identity_coro,
|
||||
repo.get_attacker_intel_by_uuid(uuid),
|
||||
repo.list_techniques_by_attacker(uuid),
|
||||
repo.list_ttp_tags_by_attacker(uuid),
|
||||
repo.get_attacker_artifacts(uuid),
|
||||
repo.list_smtp_targets(uuid),
|
||||
)
|
||||
behavior = cast(dict[str, Any] | None, results[0])
|
||||
identity = cast(dict[str, Any] | None, results[1])
|
||||
intel = cast(dict[str, Any] | None, results[2])
|
||||
technique_rollup = cast(list[Any], results[3])
|
||||
raw_tags = cast(list[dict[str, Any]], results[4])
|
||||
artifacts = cast(list[dict[str, Any]], results[5])
|
||||
smtp_targets = cast(list[dict[str, Any]], results[6])
|
||||
|
||||
bundle = build_attacker_bundle(
|
||||
attacker=attacker,
|
||||
behavior=behavior,
|
||||
identity=identity,
|
||||
intel=intel,
|
||||
technique_rollup=[
|
||||
r.model_dump() if hasattr(r, "model_dump") else dict(r)
|
||||
for r in technique_rollup
|
||||
],
|
||||
raw_tags=raw_tags,
|
||||
artifacts=artifacts,
|
||||
smtp_targets=smtp_targets,
|
||||
)
|
||||
return Response(
|
||||
content=bundle.serialize(pretty=True, indent=2),
|
||||
media_type="application/json",
|
||||
headers={
|
||||
"Content-Disposition": (
|
||||
f'attachment; filename="decnet-attacker-{uuid[:8]}.stix.json"'
|
||||
),
|
||||
},
|
||||
)
|
||||
Reference in New Issue
Block a user