From fe0ed4a251c24ba318830884dce9594dd1234adb Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 9 May 2026 07:21:22 -0400 Subject: [PATCH] feat(ttp): STIX 2.1 bundle export for individual attackers GET /api/v1/attackers/{uuid}/export/stix returns a self-contained STIX 2.1 bundle: ip observation, threat-actor, ATT&CK attack-patterns with canonical MITRE IDs, uses relationships, per-tag sightings, file SCOs for artifacts, domain-name SCOs for SMTP targets, and a provider intel note. Attack-pattern SDOs carry the MITRE bundle IDs so consumers deduplicating against the public ATT&CK bundle get exact matches. --- decnet/ttp/stix_export.py | 281 ++++++++++++++++++ decnet/web/db/repository.py | 7 + decnet/web/db/sqlmodel_repo/ttp.py | 19 ++ decnet/web/router/__init__.py | 2 + .../attackers/api_export_attacker_stix.py | 101 +++++++ tests/db/test_base_repo.py | 2 + tests/web/test_api_export_attacker_stix.py | 241 +++++++++++++++ 7 files changed, 653 insertions(+) create mode 100644 decnet/ttp/stix_export.py create mode 100644 decnet/web/router/attackers/api_export_attacker_stix.py create mode 100644 tests/web/test_api_export_attacker_stix.py diff --git a/decnet/ttp/stix_export.py b/decnet/ttp/stix_export.py new file mode 100644 index 00000000..0d96b032 --- /dev/null +++ b/decnet/ttp/stix_export.py @@ -0,0 +1,281 @@ +"""STIX 2.1 bundle builder for a DECNET attacker observation. + +Pure function — no I/O. The caller (router) does all DB reads and +passes dicts; this module assembles the STIX bundle. + +SDO/SRO mapping +--------------- +DECNET data → STIX type +----------- --------- +Producer (DECNET) → identity (org, deterministic ID) +attacker.ip → ipv4-addr SCO +first/last seen + count → observed-data SDO +AttackerIdentity or IP → threat-actor SDO +Per-technique rollup → attack-pattern SDO + relationship(uses) SRO +Per ttp_tag row → sighting SRO +ObservedAttachment / Log → file SCO + observed-data SDO +SmtpTarget → domain-name SCO + observed-data SDO +AttackerIntel verdict → note SDO + +Attack-pattern SDOs carry the canonical MITRE STIX IDs pulled from the +loaded enterprise bundle so the objects are deduplicated against the +public ATT&CK bundle by any consumer that already has it. +""" +from __future__ import annotations + +import json +import uuid as _uuid +from datetime import datetime, timezone +from typing import Any + +import stix2 + +from decnet.ttp import attack_stix + +# Deterministic DECNET org identity ID — stable across all bundles this +# instance produces. Consumers can correlate across exports. +_NS = _uuid.UUID("b5d2c3a1-8f4e-4d1b-9a6c-0e7f5b3d2c1a") +_DECNET_ORG_STIX_ID = f"identity--{_uuid.uuid5(_NS, 'decnet-honeypot')}" + + +def _aware(dt: datetime | None) -> datetime | None: + if dt is None: + return None + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt + + +def _decnet_org() -> stix2.Identity: + return stix2.Identity( + id=_DECNET_ORG_STIX_ID, + name="DECNET", + identity_class="organization", + description="DECNET honeypot platform — automated threat observation", + ) + + +def _threat_actor( + attacker: dict[str, Any], + identity: dict[str, Any] | None, + created_by: str, +) -> stix2.ThreatActor: + if identity: + name = f"DECNET-identity-{identity['uuid'][:8]}" + else: + name = f"DECNET-attacker-{attacker['uuid'][:8]}" + kwargs: dict[str, Any] = dict( + id=f"threat-actor--{_uuid.uuid5(_NS, attacker['uuid'])}", + name=name, + threat_actor_types=["unknown"], + created_by_ref=created_by, + allow_custom=True, + ) + if attacker.get("country_code"): + kwargs["x_decnet_country_code"] = attacker["country_code"] + if attacker.get("asn"): + kwargs["x_decnet_asn"] = attacker["asn"] + if attacker.get("as_name"): + kwargs["x_decnet_as_name"] = attacker["as_name"] + if identity: + if identity.get("ja3_hashes"): + kwargs["x_decnet_ja3_hashes"] = identity["ja3_hashes"] + if identity.get("hassh_hashes"): + kwargs["x_decnet_hassh_hashes"] = identity["hassh_hashes"] + if identity.get("c2_endpoints"): + kwargs["x_decnet_c2_endpoints"] = identity["c2_endpoints"] + return stix2.ThreatActor(**kwargs) + + +def _attack_pattern_sdo(technique_id: str, created_by: str) -> stix2.AttackPattern | None: + obj = attack_stix._attack_pattern_by_id(technique_id) + if obj is None: + return None + ext_refs = obj.get("external_references", []) + mitre_ref = next( + (r for r in ext_refs if r.get("source_name") == "mitre-attack"), None, + ) + er_args = [ + stix2.ExternalReference( + source_name="mitre-attack", + external_id=mitre_ref["external_id"], + url=mitre_ref.get("url", ""), + ) + ] if mitre_ref else [] + return stix2.AttackPattern( + id=obj["id"], + name=obj.get("name", technique_id), + external_references=er_args, + created_by_ref=created_by, + ) + + +def _intel_note( + intel: dict[str, Any], + ta_id: str, + created_by: str, +) -> stix2.Note | None: + verdict = intel.get("aggregate_verdict") or "unknown" + lines: list[str] = [f"aggregate_verdict: {verdict}"] + if intel.get("abuseipdb_score") is not None: + lines.append(f"abuseipdb_score: {intel['abuseipdb_score']}") + if intel.get("greynoise_classification"): + tags = intel.get("greynoise_tags") or [] + if isinstance(tags, str): + try: + tags = json.loads(tags) + except Exception: + tags = [] + lines.append(f"greynoise: {intel['greynoise_classification']} ({', '.join(tags)})") + if intel.get("feodo_listed"): + lines.append(f"feodo: {intel.get('feodo_malware_family', 'listed')}") + if intel.get("threatfox_listed"): + tt = intel.get("threatfox_threat_types") or [] + if isinstance(tt, str): + try: + tt = json.loads(tt) + except Exception: + tt = [] + lines.append(f"threatfox: {', '.join(tt) if tt else 'listed'}") + return stix2.Note( + abstract="DECNET threat-intel verdict", + content="\n".join(lines), + object_refs=[ta_id], + created_by_ref=created_by, + ) + + +def build_attacker_bundle( + attacker: dict[str, Any], + behavior: dict[str, Any] | None, + identity: dict[str, Any] | None, + intel: dict[str, Any] | None, + technique_rollup: list[dict[str, Any]], + raw_tags: list[dict[str, Any]], + artifacts: list[dict[str, Any]], + smtp_targets: list[dict[str, Any]], +) -> stix2.Bundle: + """Assemble a STIX 2.1 Bundle for *attacker*. + + All arguments are plain dicts (the shape returned by the DECNET + repo). Never raises — unknown/missing data is silently omitted from + the bundle. + """ + objs: list[Any] = [] + + org = _decnet_org() + objs.append(org) + + # ── IP observation ────────────────────────────────────────────── + ipv4 = stix2.IPv4Address(value=attacker["ip"]) + objs.append(ipv4) + + fs = _aware(attacker.get("first_seen")) + ls = _aware(attacker.get("last_seen")) + now = datetime.now(timezone.utc) + ip_obs = stix2.ObservedData( + first_observed=fs or now, + last_observed=ls or now, + number_observed=max(1, attacker.get("event_count") or 1), + object_refs=[ipv4.id], + created_by_ref=org.id, + ) + objs.append(ip_obs) + + # ── Threat actor ───────────────────────────────────────────────── + ta = _threat_actor(attacker, identity, org.id) + objs.append(ta) + + # ── ATT&CK — attack-patterns + uses relationships + sightings ─── + # Build per-technique once; sightings reference the same AP STIX ID. + ap_stix_ids: dict[str, str] = {} # technique_id → attack-pattern STIX id + for row in technique_rollup: + tid = row.get("sub_technique_id") or row.get("technique_id") + if not tid or tid in ap_stix_ids: + continue + ap = _attack_pattern_sdo(tid, org.id) + if ap is None: + continue + ap_stix_ids[tid] = ap.id + objs.append(ap) + objs.append( + stix2.Relationship( + source_ref=ta.id, + target_ref=ap.id, + relationship_type="uses", + created_by_ref=org.id, + ) + ) + + for tag in raw_tags: + tid = tag.get("sub_technique_id") or tag.get("technique_id") + if not tid or tid not in ap_stix_ids: + continue + ts = _aware(tag.get("created_at")) + if ts is None: + ts = now + objs.append( + stix2.Sighting( + sighting_of_ref=ap_stix_ids[tid], + first_seen=ts, + last_seen=ts, + count=1, + where_sighted_refs=[org.id], + observed_data_refs=[ip_obs.id], + created_by_ref=org.id, + ) + ) + + # ── Artifacts (file_captured log rows) ────────────────────────── + for art in artifacts: + fields = art.get("fields") or {} + if isinstance(fields, str): + try: + fields = json.loads(fields) + except Exception: + fields = {} + sha = fields.get("sha256") or fields.get("hash") + if not sha: + continue + file_kwargs: dict[str, Any] = {"hashes": {"SHA-256": sha.lower()}} + name = fields.get("filename") or fields.get("stored_as") + if name: + file_kwargs["name"] = name + f = stix2.File(**file_kwargs) + objs.append(f) + fts = _aware(art.get("timestamp")) + objs.append( + stix2.ObservedData( + first_observed=fts or now, + last_observed=fts or now, + number_observed=1, + object_refs=[f.id], + created_by_ref=org.id, + ) + ) + + # ── SMTP targets ──────────────────────────────────────────────── + for tgt in smtp_targets: + domain = tgt.get("domain") + if not domain: + continue + dn = stix2.DomainName(value=domain) + objs.append(dn) + s_fs = _aware(tgt.get("first_seen")) + s_ls = _aware(tgt.get("last_seen")) + objs.append( + stix2.ObservedData( + first_observed=s_fs or now, + last_observed=s_ls or now, + number_observed=max(1, tgt.get("count") or 1), + object_refs=[dn.id], + created_by_ref=org.id, + ) + ) + + # ── Intel note ─────────────────────────────────────────────────── + if intel: + note = _intel_note(intel, ta.id, org.id) + objs.append(note) + + return stix2.Bundle(objects=objs, allow_custom=True) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index b051c3fd..6b7536e1 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -1485,6 +1485,13 @@ class BaseRepository(ABC): """Fleet-wide distinct-technique rollup.""" raise NotImplementedError + @abstractmethod + async def list_ttp_tags_by_attacker( + self, uuid: str, limit: int = 2000, + ) -> list[dict[str, Any]]: + """Raw ``ttp_tag`` rows for one attacker (for STIX export + similar).""" + raise NotImplementedError + async def list_ttp_decky_phases( self, identity_uuid: str, ) -> list[dict[str, Any]]: diff --git a/decnet/web/db/sqlmodel_repo/ttp.py b/decnet/web/db/sqlmodel_repo/ttp.py index 30adf077..9282ccac 100644 --- a/decnet/web/db/sqlmodel_repo/ttp.py +++ b/decnet/web/db/sqlmodel_repo/ttp.py @@ -342,6 +342,25 @@ class TTPMixin(_MixinBase): res = await session.execute(stmt) return [r.model_dump(mode="json") for r in res.scalars().all()] + async def list_ttp_tags_by_attacker( + self, uuid: str, limit: int = 2000, + ) -> list[dict]: + """Raw ``ttp_tag`` rows for one attacker UUID. Newest-first. + + Used by the STIX exporter (and similar full-row consumers) that + need per-tag granularity — distinct from the rollup returned by + :meth:`list_techniques_by_attacker`. + """ + async with self._session() as session: + stmt: Any = ( + select(TTPTag) + .where(TTPTag.attacker_uuid == uuid) + .order_by(col(TTPTag.created_at).desc()) + .limit(limit) + ) + res = await session.execute(stmt) + return [r.model_dump(mode="json") for r in res.scalars().all()] + # ── Backfill iterators (E.4) ──────────────────────────────────── # # Read-only iterators consumed by ``decnet ttp backfill`` to replay diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index ab589a2f..22874a23 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -15,6 +15,7 @@ from .fleet.api_deploy_deckies import router as deploy_deckies_router from .stream.api_stream_events import router as stream_router from .attackers.api_get_attackers import router as attackers_router from .attackers.api_export_attackers import router as attackers_export_router +from .attackers.api_export_attacker_stix import router as attacker_export_stix_router from .attackers.api_events import router as attacker_events_router from .attackers.api_get_attacker_detail import router as attacker_detail_router from .attackers.api_get_attacker_commands import router as attacker_commands_router @@ -105,6 +106,7 @@ api_router.include_router(deploy_deckies_router) # Attacker Profiles api_router.include_router(attackers_router) api_router.include_router(attackers_export_router) +api_router.include_router(attacker_export_stix_router) api_router.include_router(attacker_detail_router) api_router.include_router(attacker_events_router) api_router.include_router(attacker_commands_router) diff --git a/decnet/web/router/attackers/api_export_attacker_stix.py b/decnet/web/router/attackers/api_export_attacker_stix.py new file mode 100644 index 00000000..ee6021ce --- /dev/null +++ b/decnet/web/router/attackers/api_export_attacker_stix.py @@ -0,0 +1,101 @@ +"""GET /api/v1/attackers/{uuid}/export/stix — STIX 2.1 bundle for one attacker. + +Returns a self-contained STIX 2.1 Bundle with the attacker's IP +observation, ATT&CK technique usage (attack-patterns + uses +relationships + per-tag sightings), captured artifacts (files), +SMTP targets, and provider intel summary (note). All SDOs are signed +under a stable DECNET org Identity. + +Attack-pattern SDOs carry the canonical MITRE STIX IDs so the bundle +is deduplicated by consumers who already have the public ATT&CK bundle. +""" +from __future__ import annotations + +import asyncio +from typing import Any, cast + +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.responses import Response + +from decnet.telemetry import traced as _traced +from decnet.ttp.stix_export import build_attacker_bundle +from decnet.web.dependencies import require_viewer, repo + +router = APIRouter() + + +async def _none() -> None: + return None + + +@router.get( + "/attackers/{uuid}/export/stix", + tags=["Attacker Profiles"], + response_class=Response, + responses={ + 200: { + "content": {"application/json": {}}, + "description": "STIX 2.1 bundle for the attacker", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Attacker not found"}, + }, +) +@_traced("api.attackers.export_stix") +async def api_export_attacker_stix( + uuid: str, + user: dict[str, Any] = Depends(require_viewer), +) -> Response: + """Download a STIX 2.1 bundle for one attacker.""" + attacker = await repo.get_attacker_by_uuid(uuid) + if not attacker: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"unknown attacker uuid: {uuid!r}", + ) + + identity_coro = ( + repo.get_identity_by_uuid(attacker["identity_id"]) + if attacker.get("identity_id") + else _none() + ) + results = await asyncio.gather( + repo.get_attacker_behavior(uuid), + identity_coro, + repo.get_attacker_intel_by_uuid(uuid), + repo.list_techniques_by_attacker(uuid), + repo.list_ttp_tags_by_attacker(uuid), + repo.get_attacker_artifacts(uuid), + repo.list_smtp_targets(uuid), + ) + behavior = cast(dict[str, Any] | None, results[0]) + identity = cast(dict[str, Any] | None, results[1]) + intel = cast(dict[str, Any] | None, results[2]) + technique_rollup = cast(list[Any], results[3]) + raw_tags = cast(list[dict[str, Any]], results[4]) + artifacts = cast(list[dict[str, Any]], results[5]) + smtp_targets = cast(list[dict[str, Any]], results[6]) + + bundle = build_attacker_bundle( + attacker=attacker, + behavior=behavior, + identity=identity, + intel=intel, + technique_rollup=[ + r.model_dump() if hasattr(r, "model_dump") else dict(r) + for r in technique_rollup + ], + raw_tags=raw_tags, + artifacts=artifacts, + smtp_targets=smtp_targets, + ) + return Response( + content=bundle.serialize(pretty=True, indent=2), + media_type="application/json", + headers={ + "Content-Disposition": ( + f'attachment; filename="decnet-attacker-{uuid[:8]}.stix.json"' + ), + }, + ) diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 72156e5f..27d3bd35 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -127,6 +127,8 @@ class DummyRepo(BaseRepository): await super().list_tags_by_scope_and_technique(**kw); return [] async def list_distinct_techniques(self): await super().list_distinct_techniques(); return [] + async def list_ttp_tags_by_attacker(self, uuid, limit=2000): + await super().list_ttp_tags_by_attacker(uuid, limit); return [] # Iter helpers — async generators, can't `await super()` on them # because the base raises in the body before any yield. Just yield # nothing so the consumer's ``async for`` exits cleanly. diff --git a/tests/web/test_api_export_attacker_stix.py b/tests/web/test_api_export_attacker_stix.py new file mode 100644 index 00000000..ed52a5fc --- /dev/null +++ b/tests/web/test_api_export_attacker_stix.py @@ -0,0 +1,241 @@ +"""Tests for GET /api/v1/attackers/{uuid}/export/stix. + +Tests call the handler directly (no TestClient). The attack_stix bundle +is pinned to the repo's enterprise-attack-19.0.json so Sighting and +Relationship target_refs are real MITRE STIX IDs. +""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest +import stix2 + +from decnet.ttp import attack_stix +from decnet.web.router.attackers.api_export_attacker_stix import ( + api_export_attacker_stix, +) + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" +_FAKE_USER: dict = {"uuid": "test-user", "role": "viewer"} + + +@pytest.fixture(autouse=True) +def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + license_path = tmp_path / "LICENSE.txt" + license_path.write_text("placeholder", encoding="utf-8") + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path)) + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + attack_stix.groups_using_technique.cache_clear() + + +def _attacker(uuid: str = "att-aaaabbbbccccdddd") -> dict: + return { + "uuid": uuid, + "ip": "1.2.3.4", + "first_seen": datetime(2026, 1, 1, tzinfo=timezone.utc), + "last_seen": datetime(2026, 1, 31, tzinfo=timezone.utc), + "event_count": 100, + "identity_id": None, + "country_code": "US", + "asn": 15169, + "as_name": "GOOGLE", + } + + +def _tag(technique_id: str, ts: datetime | None = None) -> dict: + return { + "uuid": f"tag-{technique_id}", + "attacker_uuid": "att-aaaabbbbccccdddd", + "technique_id": technique_id, + "sub_technique_id": None, + "tactic": "TA0006", + "confidence": 0.85, + "rule_id": "R0001", + "rule_version": 1, + "evidence": {}, + "created_at": ts or datetime(2026, 1, 15, tzinfo=timezone.utc), + } + + +def _technique_row(technique_id: str) -> dict: + return { + "technique_id": technique_id, + "sub_technique_id": None, + "tactic": "TA0006", + "count": 3, + "confidence_max": 0.85, + } + + +def _artifact(sha256: str = "a" * 64) -> dict: + return { + "timestamp": datetime(2026, 1, 10, tzinfo=timezone.utc), + "fields": json.dumps({"sha256": sha256, "filename": "payload.sh"}), + } + + +def _smtp_target() -> dict: + return { + "domain": "victim.example.com", + "count": 5, + "first_seen": datetime(2026, 1, 5, tzinfo=timezone.utc), + "last_seen": datetime(2026, 1, 20, tzinfo=timezone.utc), + } + + +def _intel() -> dict: + return { + "aggregate_verdict": "malicious", + "abuseipdb_score": 95, + "greynoise_classification": "malicious", + "greynoise_tags": json.dumps(["ssh_bruteforcer"]), + "feodo_listed": False, + "threatfox_listed": False, + } + + +def _mock_repo(*, attacker=None, identity=None, intel=None, + rollup=None, tags=None, artifacts=None, smtp=None): + m = type("M", (), {})() + m.get_attacker_by_uuid = AsyncMock(return_value=attacker or _attacker()) + m.get_attacker_behavior = AsyncMock(return_value={}) + m.get_identity_by_uuid = AsyncMock(return_value=identity) + m.get_attacker_intel_by_uuid = AsyncMock(return_value=intel) + from decnet.web.db.models.ttp import IdentityTechniqueRow + m.list_techniques_by_attacker = AsyncMock( + return_value=[ + IdentityTechniqueRow( + technique_id=t, technique_name=None, + sub_technique_id=None, sub_technique_name=None, + tactic="TA0006", count=1, confidence_max=0.8, + first_seen=datetime(2026, 1, 1, tzinfo=timezone.utc), + last_seen=datetime(2026, 1, 31, tzinfo=timezone.utc), + mitre_url=None, + ) + for t in (rollup or []) + ] + ) + m.list_ttp_tags_by_attacker = AsyncMock(return_value=tags or []) + m.get_attacker_artifacts = AsyncMock(return_value=artifacts or []) + m.list_smtp_targets = AsyncMock(return_value=smtp or []) + return m + + +@pytest.mark.asyncio +async def test_404_on_unknown_attacker(): + m = _mock_repo() + m.get_attacker_by_uuid = AsyncMock(return_value=None) + with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m): + from fastapi import HTTPException + with pytest.raises(HTTPException) as exc: + await api_export_attacker_stix("no-such-uuid", user=_FAKE_USER) + assert exc.value.status_code == 404 + + +@pytest.mark.asyncio +async def test_skinny_attacker_returns_4_baseline_objects(): + """No TTPs, no artifacts, no intel — bundle has 4 objects.""" + m = _mock_repo() + with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m): + resp = await api_export_attacker_stix("att-aaaabbbbccccdddd", user=_FAKE_USER) + bundle = json.loads(resp.body) + assert bundle["type"] == "bundle" + # spec_version lives on each contained SDO, not the bundle envelope + assert all(o.get("spec_version") == "2.1" for o in bundle["objects"] if "created" in o) + types = [o["type"] for o in bundle["objects"]] + assert types.count("identity") == 1 + assert types.count("ipv4-addr") == 1 + assert types.count("observed-data") == 1 + assert types.count("threat-actor") == 1 + assert len(bundle["objects"]) == 4 + + +@pytest.mark.asyncio +async def test_full_bundle_object_count(): + """Seeded attacker: 2 techniques, 3 tags, 1 artifact, 1 smtp, intel.""" + tags = [_tag("T1110"), _tag("T1110"), _tag("T1059")] + m = _mock_repo( + rollup=["T1110", "T1059"], + tags=tags, + artifacts=[_artifact()], + smtp=[_smtp_target()], + intel=_intel(), + ) + with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m): + resp = await api_export_attacker_stix("att-aaaabbbbccccdddd", user=_FAKE_USER) + objs = json.loads(resp.body)["objects"] + types = [o["type"] for o in objs] + # 1 identity + 1 ipv4-addr + 1 observed-data(ip) + 1 threat-actor + # + 2 attack-patterns + 2 relationships + # + 3 sightings + # + 1 file + 1 observed-data(file) + # + 1 domain-name + 1 observed-data(smtp) + # + 1 note + assert types.count("identity") == 1 + assert types.count("ipv4-addr") == 1 + assert types.count("threat-actor") == 1 + assert types.count("attack-pattern") == 2 + assert types.count("relationship") == 2 + assert types.count("sighting") == 3 + assert types.count("file") == 1 + assert types.count("domain-name") == 1 + assert types.count("note") == 1 + assert len([t for t in types if t == "observed-data"]) == 3 + + +@pytest.mark.asyncio +async def test_stix2_round_trip_validation(): + """Bundle must parse cleanly with stix2 (strict=True).""" + tags = [_tag("T1059")] + m = _mock_repo(rollup=["T1059"], tags=tags, intel=_intel()) + with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m): + resp = await api_export_attacker_stix("att-aaaabbbbccccdddd", user=_FAKE_USER) + parsed = stix2.parse(resp.body, allow_custom=True) + assert parsed.type == "bundle" + + +@pytest.mark.asyncio +async def test_attack_pattern_target_refs_match_mitre_bundle(): + """Every relationship.target_ref is a real MITRE attack-pattern ID.""" + tags = [_tag("T1059"), _tag("T1110")] + m = _mock_repo(rollup=["T1059", "T1110"], tags=tags) + with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m): + resp = await api_export_attacker_stix("att-aaaabbbbccccdddd", user=_FAKE_USER) + objs = json.loads(resp.body)["objects"] + rels = [o for o in objs if o["type"] == "relationship" and o["relationship_type"] == "uses"] + target_refs = {r["target_ref"] for r in rels} + t1059_id = attack_stix._attack_pattern_by_id("T1059")["id"] + t1110_id = attack_stix._attack_pattern_by_id("T1110")["id"] + assert t1059_id in target_refs + assert t1110_id in target_refs + + +@pytest.mark.asyncio +async def test_sighting_count_equals_tag_count(): + """One Sighting per raw ttp_tag row; each carries count=1.""" + tags = [_tag("T1059"), _tag("T1059"), _tag("T1059")] + m = _mock_repo(rollup=["T1059"], tags=tags) + with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m): + resp = await api_export_attacker_stix("att-aaaabbbbccccdddd", user=_FAKE_USER) + objs = json.loads(resp.body)["objects"] + sightings = [o for o in objs if o["type"] == "sighting"] + assert len(sightings) == 3 + assert all(s["count"] == 1 for s in sightings) + + +@pytest.mark.asyncio +async def test_response_headers(): + m = _mock_repo() + with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m): + resp = await api_export_attacker_stix("att-aaaabbbbccccdddd", user=_FAKE_USER) + assert resp.media_type == "application/json" + assert ".stix.json" in resp.headers["content-disposition"]