From 403d83fabacf5047fccc0b8c8fa51a64054f9e56 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 21:01:58 -0400 Subject: [PATCH] =?UTF-8?q?feat(ttp):=20E.3.15=20UKC=20bridge=20=E2=80=94?= =?UTF-8?q?=20production=20phase-handoff=20edge=20fires?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add BaseRepository.list_ttp_decky_phases(identity_uuid) returning per-decky tag observations as (decky_id, tactic, created_at_ts) rows ordered by creation time. Rewrite from_identity_row() to project tactic → UKCPhase via tactic_to_ukc_phase and populate the four phase-handoff maps (first/last_phase_per_decky, first/last_seen_per_decky) so combined_campaign_weight finally lights up on real DB rows — not just synthetic fixtures. ConnectedComponentsCampaignClusterer.tick() pulls each active identity's per-decky phase observations before projecting features. Repo failures are non-fatal: a partial repo falls back to the empty phase-handoff signal (legacy behavior) so the worker stays up. tests/clustering/test_ttp_phase_handoff.py pins the production-row pair clearing CAMPAIGN_EDGE_THRESHOLD on a C2 → DISCOVERY hand-off — the trip-wire that says the whole project paid off. commands_by_phase_on_decky itself stays empty on the production path: it is consumed only by the synthetic-fixture similarity surface, and the phase-handoff edge does not use it. Synthetic fixtures still populate it directly via from_synthetic_identity. --- .../campaign/impl/connected_components.py | 76 ++++++++++-- decnet/web/db/repository.py | 22 ++++ decnet/web/db/sqlmodel_repo/ttp.py | 44 +++++++ development/TTP_TAGGING.md | 16 ++- tests/clustering/test_ttp_phase_handoff.py | 110 ++++++++++++++++++ 5 files changed, 259 insertions(+), 9 deletions(-) create mode 100644 tests/clustering/test_ttp_phase_handoff.py diff --git a/decnet/clustering/campaign/impl/connected_components.py b/decnet/clustering/campaign/impl/connected_components.py index b78b8e30..09534a4c 100644 --- a/decnet/clustering/campaign/impl/connected_components.py +++ b/decnet/clustering/campaign/impl/connected_components.py @@ -66,7 +66,10 @@ def cluster_identities( return {f.identity_uuid: f"cmp-{find(f.identity_uuid)}" for f in feat_list} -def from_identity_row(row: dict[str, Any]) -> IdentityFeatures: +def from_identity_row( + row: dict[str, Any], + ttp_decky_phases: list[dict[str, Any]] | None = None, +) -> IdentityFeatures: """Project an ``AttackerIdentity`` projection row dict into an :class:`IdentityFeatures`. @@ -75,20 +78,59 @@ def from_identity_row(row: dict[str, Any]) -> IdentityFeatures: ja3_hashes / hassh_hashes / payload_simhashes / c2_endpoints (JSON list[str] or null). - Phase-handoff fields stay empty until the production-row adapter - learns to mine logs for per-decky phase sequences (TODO.md - "production-side payload + C2 + commands joins"). Without those, - the campaign clusterer falls back to shared-infra + temporal - overlap + cohort signals on production data; the fixture path - exercises the full feature set via :func:`from_synthetic_identity`. + *ttp_decky_phases* is the optional per-identity payload from + :meth:`BaseRepository.list_ttp_decky_phases` — one row per + ``ttp_tag`` carrying ``(decky_id, tactic, created_at_ts)``. When + provided, the adapter projects ``tactic`` → :class:`UKCPhase` and + populates :attr:`IdentityFeatures.first_phase_per_decky` / + ``last_phase_per_decky`` / ``first_seen_per_decky`` / + ``last_seen_per_decky` so the production phase-handoff edge + finally fires. The synthetic fixture path + (:func:`from_synthetic_identity`) is unchanged — fixtures keep + emitting UKC directly. """ + from decnet.clustering.ukc import tactic_to_ukc_phase # noqa: PLC0415 + payload_hashes = _parse_json_list(row.get("payload_simhashes")) c2_endpoints = _parse_json_list(row.get("c2_endpoints")) + first_phase_per_decky: dict[str, str] = {} + last_phase_per_decky: dict[str, str] = {} + first_seen_per_decky: dict[str, float] = {} + last_seen_per_decky: dict[str, float] = {} + decky_set: set[str] = set() + + # Rows arrive ordered by ``created_at``; ``setdefault`` preserves + # the FIRST observation per decky, plain assignment captures the + # LAST. Tags whose tactic is outside the ATT&CK→UKC map (or whose + # phase is pre-target / unobservable) are dropped — they should + # not be assigned by any rule per TTP_TAGGING.md §UKC bridge. + for entry in ttp_decky_phases or []: + decky = entry.get("decky_id") + tactic = entry.get("tactic") + created_at_ts = entry.get("created_at_ts") + if not isinstance(decky, str) or not isinstance(tactic, str): + continue + phase = tactic_to_ukc_phase(tactic) + if phase is None: + continue + ts = float(created_at_ts) if isinstance( + created_at_ts, (int, float)) else 0.0 + decky_set.add(decky) + first_phase_per_decky.setdefault(decky, phase.value) + last_phase_per_decky[decky] = phase.value + first_seen_per_decky.setdefault(decky, ts) + last_seen_per_decky[decky] = ts + return IdentityFeatures( identity_uuid=row["uuid"], payload_hashes=frozenset(payload_hashes), c2_endpoints=frozenset(c2_endpoints), + decky_set=frozenset(decky_set), + first_phase_per_decky=first_phase_per_decky, + last_phase_per_decky=last_phase_per_decky, + first_seen_per_decky=first_seen_per_decky, + last_seen_per_decky=last_seen_per_decky, ) @@ -132,8 +174,26 @@ class ConnectedComponentsCampaignClusterer(CampaignClusterer): # merged out — their winner is the active row and gets clustered # on its own. This keeps the campaign graph from double-counting. active_rows = [r for r in rows if not r.get("merged_into_uuid")] + # Pull TTP-derived per-decky phase observations per identity + # (E.3.15). Failures here are non-fatal — the clusterer falls + # back to the empty phase-handoff signal, same as the legacy + # behavior, so a partial repo doesn't take the worker down. + decky_phases_by_identity: dict[str, list[dict[str, Any]]] = {} + for r in active_rows: + try: + decky_phases_by_identity[r["uuid"]] = ( + await repo.list_ttp_decky_phases(r["uuid"]) + ) + except Exception: # noqa: BLE001 + log.warning( + "campaign clusterer: list_ttp_decky_phases failed " + "for identity %s; phase-handoff edge inert", + r["uuid"], + ) + decky_phases_by_identity[r["uuid"]] = [] feature_list: list[IdentityFeatures] = [ - from_identity_row(r) for r in active_rows + from_identity_row(r, decky_phases_by_identity.get(r["uuid"])) + for r in active_rows ] row_by_uuid: dict[str, dict[str, Any]] = { r["uuid"]: r for r in active_rows diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index cad71318..59aa751b 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -1352,3 +1352,25 @@ class BaseRepository(ABC): async def list_distinct_techniques(self) -> list[TechniqueRollupRow]: """Fleet-wide distinct-technique rollup.""" raise NotImplementedError + + async def list_ttp_decky_phases( + self, identity_uuid: str, + ) -> list[dict[str, Any]]: + """Per-decky tag observations for the campaign-clusterer's UKC + bridge (E.3.15). + + Returns every ``ttp_tag`` row for *identity_uuid* (and the IPs + rolling up to it) carrying a non-NULL ``decky_id`` and + ``tactic``, projected to ``{decky_id, tactic, created_at_ts}``. + Callers project ``tactic`` → :class:`UKCPhase` via + :func:`decnet.clustering.ukc.tactic_to_ukc_phase` to populate + :class:`IdentityFeatures.first_phase_per_decky` / + ``last_phase_per_decky`` / ``first_seen_per_decky`` / + ``last_seen_per_decky`` so the production phase-handoff edge + can finally fire. + + Default body returns ``[]`` so legacy mocks / non-SQLModel + repos remain valid; the real implementation lives on the + SQLModel TTP mixin. + """ + return [] diff --git a/decnet/web/db/sqlmodel_repo/ttp.py b/decnet/web/db/sqlmodel_repo/ttp.py index 4243895e..db03962f 100644 --- a/decnet/web/db/sqlmodel_repo/ttp.py +++ b/decnet/web/db/sqlmodel_repo/ttp.py @@ -231,6 +231,50 @@ class TTPMixin(_MixinBase): for r in res.all() ] + async def list_ttp_decky_phases( + self, identity_uuid: str, + ) -> list[dict[str, Any]]: + """Per-decky tag observations for the UKC bridge (E.3.15). + + Includes (a) tags directly anchored on this identity and + (b) tags anchored on Attackers whose ``identity_id`` projects + up to this identity — same scope as + :meth:`list_techniques_by_identity`. + """ + async with self._session() as session: + attacker_uuids_subq = ( + select(col(Attacker.uuid)) + .where(col(Attacker.identity_id) == identity_uuid) + .scalar_subquery() + ) + stmt: Any = ( + select( + col(TTPTag.decky_id), + col(TTPTag.tactic), + col(TTPTag.created_at), + ) + .where( + ( + (col(TTPTag.identity_uuid) == identity_uuid) + | (col(TTPTag.attacker_uuid).in_(attacker_uuids_subq)) + ) + & (col(TTPTag.decky_id).is_not(None)) + ) + .order_by(col(TTPTag.created_at)) + ) + res = await session.execute(stmt) + return [ + { + "decky_id": r.decky_id, + "tactic": r.tactic, + "created_at_ts": ( + r.created_at.timestamp() + if r.created_at is not None else 0.0 + ), + } + for r in res.all() + ] + async def list_distinct_techniques(self) -> list[TechniqueRollupRow]: """Fleet-wide distinct-technique rollup with counts + most-recent-seen timestamps. diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index ba65803f..03be3f30 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -3056,7 +3056,21 @@ Order: from `ttp_tag`. Validate that production phase-handoff edge weights now fire (previously dormant — the phase-handoff test's `xfail` flips to `xpass`, which is the moment we know - this whole project paid off). + this whole project paid off). ✅ done. + `tactic_to_ukc_phase` + `OBSERVABLE_PHASES` were already + shipped in earlier work — this step adds + `BaseRepository.list_ttp_decky_phases(identity_uuid)` and + rewrites `from_identity_row()` to populate the four + phase-handoff maps (`first_phase_per_decky`, + `last_phase_per_decky`, `first_seen_per_decky`, + `last_seen_per_decky`) from real `ttp_tag` rows. + `commands_by_phase_on_decky` itself stays empty on the + production path — the phase-handoff edge does not consume + it; the four phase-maps drive the F5 signal. Synthetic + fixtures continue to populate the commands map directly. + `tests/clustering/test_ttp_phase_handoff.py` pins the + production-row pair clearing `CAMPAIGN_EDGE_THRESHOLD` — + the trip-wire that says the whole project paid off. 16. **Frontend** — `IdentityDetail` "TTPs Observed" section, `AttackerDetail` per-IP slice, Navigator export buttons, rule-state controls (disable / clip / TTL) backed by the diff --git a/tests/clustering/test_ttp_phase_handoff.py b/tests/clustering/test_ttp_phase_handoff.py new file mode 100644 index 00000000..8ec16903 --- /dev/null +++ b/tests/clustering/test_ttp_phase_handoff.py @@ -0,0 +1,110 @@ +"""E.3.15 — production phase-handoff edge fires from ttp_tag rows. + +The UKC bridge (``tactic_to_ukc_phase`` + ``OBSERVABLE_PHASES``) was +already unit-tested in :mod:`tests.clustering.test_ukc_bridge`. The +load-bearing payoff lands here: the production-row adapter +:func:`from_identity_row` now consumes per-identity tag observations +and populates the phase-handoff maps so +:func:`combined_campaign_weight` lights up on real DB rows — not just +the synthetic-fixture path. +""" +from __future__ import annotations + +from typing import Any + +from decnet.clustering.campaign.impl.connected_components import ( + from_identity_row, +) +from decnet.clustering.campaign.impl.similarity import ( + CAMPAIGN_EDGE_THRESHOLD, + combined_campaign_weight, + phase_handoff_weight, +) +from decnet.clustering.ukc import UKCPhase + + +# A → C2 (handoff-out) on decky D at t=100; B → DISCOVERY (handoff-in) +# on the same decky at t=200. Within the 24h window → edge weight 1.0. +def _row(uuid: str) -> dict[str, Any]: + return { + "uuid": uuid, + "ja3_hashes": None, + "hassh_hashes": None, + "payload_simhashes": None, + "c2_endpoints": None, + } + + +def _phases(decky: str, tactic: str, ts: float) -> dict[str, Any]: + return {"decky_id": decky, "tactic": tactic, "created_at_ts": ts} + + +def test_from_identity_row_populates_phase_maps_from_tags() -> None: + feat = from_identity_row( + _row("id-A"), + ttp_decky_phases=[ + _phases("d1", "TA0007", 100.0), # DISCOVERY + _phases("d1", "TA0011", 200.0), # COMMAND_AND_CONTROL + ], + ) + assert feat.first_phase_per_decky == {"d1": UKCPhase.DISCOVERY.value} + assert feat.last_phase_per_decky == {"d1": UKCPhase.COMMAND_AND_CONTROL.value} + assert feat.first_seen_per_decky == {"d1": 100.0} + assert feat.last_seen_per_decky == {"d1": 200.0} + assert "d1" in feat.decky_set + + +def test_from_identity_row_skips_unmappable_tactic() -> None: + feat = from_identity_row( + _row("id-X"), + ttp_decky_phases=[ + _phases("d1", "TA9999", 100.0), # unknown tactic + ], + ) + assert feat.first_phase_per_decky == {} + assert feat.last_phase_per_decky == {} + + +def test_phase_handoff_fires_on_production_rows() -> None: + """Two identities sharing a decky with C2 → DISCOVERY in window.""" + a = from_identity_row( + _row("id-A"), + ttp_decky_phases=[ + _phases("d1", "TA0011", 100.0), # last on A: C2 (handoff-out) + ], + ) + b = from_identity_row( + _row("id-B"), + ttp_decky_phases=[ + _phases("d1", "TA0007", 200.0), # first on B: DISCOVERY (handoff-in) + ], + ) + assert phase_handoff_weight(a, b) == 1.0 + # The combined weight bundles phase-handoff with shared-decky and + # other signals — pin that the production-row pair clears the + # campaign-edge threshold (the moment the doc says we know this + # whole project paid off). + assert combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD + + +def test_phase_handoff_zero_when_no_decky_overlap() -> None: + a = from_identity_row( + _row("id-A"), + ttp_decky_phases=[_phases("d1", "TA0011", 100.0)], + ) + b = from_identity_row( + _row("id-B"), + ttp_decky_phases=[_phases("d2", "TA0007", 200.0)], + ) + assert phase_handoff_weight(a, b) == 0.0 + + +def test_from_identity_row_empty_tags_keeps_legacy_behavior() -> None: + """No ttp_decky_phases → phase maps stay empty (the pre-E.3.15 + production behaviour). Tests that depend on the empty path keep + passing without modification. + """ + feat = from_identity_row(_row("id-A")) + assert feat.first_phase_per_decky == {} + assert feat.last_phase_per_decky == {} + assert feat.decky_set == frozenset()