diff --git a/decnet/clustering/campaign/__init__.py b/decnet/clustering/campaign/__init__.py new file mode 100644 index 00000000..c0d87dee --- /dev/null +++ b/decnet/clustering/campaign/__init__.py @@ -0,0 +1,5 @@ +"""Campaign clusterer — groups resolved identities into operations. + +The layer above identity resolution. See +``development/CAMPAIGN_CLUSTERING.md`` for the signal taxonomy. +""" diff --git a/decnet/clustering/campaign/impl/__init__.py b/decnet/clustering/campaign/impl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/decnet/clustering/campaign/impl/similarity.py b/decnet/clustering/campaign/impl/similarity.py new file mode 100644 index 00000000..b289677e --- /dev/null +++ b/decnet/clustering/campaign/impl/similarity.py @@ -0,0 +1,414 @@ +"""Similarity-graph primitives for the campaign clusterer. + +The campaign clusterer reads ``AttackerIdentity`` rows (the layer below) +and groups them into operations. The graph it builds is **not** the +identity-level graph: identity-level signals don't translate 1:1, and +some that get vetoed at identity level (shared infra) are the *primary +positive signal* at campaign level. + +Mirror of ``decnet.clustering.impl.similarity`` for the +identity layer; see that module for the four-tier identity taxonomy. + +**Time-agnostic.** Same F7 invariant as the identity layer — edges +MUST depend only on *pairwise relative* offsets, never on absolute +clocks. Shift two identities' session windows by the same Δ and the +edge weights MUST be identical. The temporal-overlap edge below uses +this invariant explicitly. + +**Edge families** (from ``development/CAMPAIGN_CLUSTERING.md``): + +* **Phase-handoff** — A ends in ``COMMAND_AND_CONTROL`` / ``PERSISTENCE`` + on decky D, B begins ``DISCOVERY`` / ``LATERAL_MOVEMENT`` on D + within window W. Load-bearing for fixture F5 (multi_operator) — the + signal the identity-side fingerprint-disagreement veto deliberately + doesn't try to be. +* **Shared-infra** — Jaccard over aggregated payload-hashes / + C2-endpoints / decky-set across the identities' member observations. + Vetoed at identity level (``ed32358``); primary positive signal here. +* **Temporal overlap** — sessions inside a bounded *relative* window. + Campaigns are operations and operations have bounded duration; + overlap of distinct identities on shared infra is the canonical + co-op pattern. +* **Cohort** — ASN-cohort + tooling-cohort weak signals. Defeated alone + (per F2); useful as supporting weight only. + +The functions are pure (no DB, no I/O); the worker maps identities into +:class:`IdentityFeatures` once per tick and feeds these into the graph +builder in a sibling module. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Mapping, Optional + + +# ─── Identity-level projection ────────────────────────────────────────────── + + +@dataclass(frozen=True) +class IdentityFeatures: + """Minimal projection of an :class:`AttackerIdentity` row. + + Built once per identity by the worker (or per fixture identity in + tests via :func:`from_synthetic_identity`). Keeping the projection + tight isolates the campaign-graph code from schema drift on the + identity layer. + """ + + identity_uuid: str + """Stable ID — production: ``AttackerIdentity.uuid``.""" + + asn_cohort: frozenset[int] = field(default_factory=frozenset) + """All ASNs observed across the identity's member observations. + A single rotating actor (F2) appears in many ASNs; the *set* + overlap is the cohort signal.""" + + tooling_cohort: frozenset[str] = field(default_factory=frozenset) + """Tooling labels (e.g. ``"hydra"``, ``"hping"``) inferred from + fingerprints / commands. Empty until tooling-attribution lands.""" + + payload_hashes: frozenset[str] = field(default_factory=frozenset) + """Aggregated payload hashes across member observations.""" + + c2_endpoints: frozenset[str] = field(default_factory=frozenset) + """Aggregated C2 endpoints across member observations.""" + + decky_set: frozenset[str] = field(default_factory=frozenset) + """Aggregated decky IDs the identity touched.""" + + commands_by_phase_on_decky: Mapping[ + tuple[str, str], tuple[str, ...] + ] = field(default_factory=dict) + """``(decky_id, UKCPhase.value)`` → ordered command sequence + observed on that decky in that phase. Required for the + phase-handoff edge — same decky is the join key. Empty when + ``commands_by_phase`` is unavailable on the production-row + adapter (deferred per TODO.md until log-mining lands).""" + + session_windows: tuple[tuple[float, float], ...] = () + """Per-session ``(start_ts, end_ts)`` tuples in seconds since + epoch. Used ONLY for pairwise relative deltas — never compared + to an absolute clock. F7 (slow_burn) invariance check verifies + that adding Δ to every entry on both sides yields the same edge + weight.""" + + last_phase_per_decky: Mapping[str, str] = field(default_factory=dict) + """``decky_id`` → last UKC phase observed on that decky. The + "from" side of a phase handoff.""" + + first_phase_per_decky: Mapping[str, str] = field(default_factory=dict) + """``decky_id`` → first UKC phase observed on that decky. The + "to" side of a phase handoff.""" + + last_seen_per_decky: Mapping[str, float] = field(default_factory=dict) + """``decky_id`` → last activity timestamp on that decky. Pairs + with :attr:`first_seen_per_decky` to compute pairwise handoff + gap relative to the two identities (no absolute clock).""" + + first_seen_per_decky: Mapping[str, float] = field(default_factory=dict) + """``decky_id`` → first activity timestamp on that decky.""" + + +# ─── Phase-handoff edge ───────────────────────────────────────────────────── + + +#: Phases that mark a *handoff-out* — operator A is finished setting +#: up a foothold and the next operator can step in. Drawn from the +#: STAGE_IN tail (PERSISTENCE / COMMAND_AND_CONTROL) per the UKC +#: vocabulary; expanding this set is a tunable knob. +HANDOFF_OUT_PHASES: frozenset[str] = frozenset({ + "command_and_control", + "persistence", +}) + +#: Phases that mark a *handoff-in* — operator B picks up a prepared +#: foothold and starts operating through the network. STAGE_THROUGH +#: head (DISCOVERY / LATERAL_MOVEMENT). +HANDOFF_IN_PHASES: frozenset[str] = frozenset({ + "discovery", + "lateral_movement", +}) + +#: Default handoff-window in seconds. The "B starts within W of A's +#: end" guard. Bounded relative to the pair — fixture F7 invariant +#: still holds because shifting both timestamps preserves the gap. +DEFAULT_HANDOFF_WINDOW_S: float = 24 * 3600.0 # 24h + + +def phase_handoff_weight( + a: IdentityFeatures, + b: IdentityFeatures, + window_s: float = DEFAULT_HANDOFF_WINDOW_S, +) -> float: + """Phase-handoff edge — the load-bearing F5 signal. + + Returns ``1.0`` if there exists a decky D such that EITHER: + + * A's last phase on D is in :data:`HANDOFF_OUT_PHASES`, B's first + phase on D is in :data:`HANDOFF_IN_PHASES`, and B's first + activity on D is within ``window_s`` AFTER A's last activity + on D, OR + * the symmetric case with A and B swapped. + + Returns ``0.0`` when no shared decky has a matching out→in pair + within window. Window comparison is on the *gap* (a single + subtraction) — pairwise-relative, so F7 invariance holds. + """ + return max( + _directed_handoff(a, b, window_s), + _directed_handoff(b, a, window_s), + ) + + +def _directed_handoff( + out: IdentityFeatures, in_: IdentityFeatures, window_s: float, +) -> float: + shared = set(out.last_phase_per_decky) & set(in_.first_phase_per_decky) + for decky in shared: + out_phase = out.last_phase_per_decky.get(decky) + in_phase = in_.first_phase_per_decky.get(decky) + if out_phase not in HANDOFF_OUT_PHASES: + continue + if in_phase not in HANDOFF_IN_PHASES: + continue + out_t = out.last_seen_per_decky.get(decky) + in_t = in_.first_seen_per_decky.get(decky) + if out_t is None or in_t is None: + continue + gap = in_t - out_t + if 0.0 <= gap <= window_s: + return 1.0 + return 0.0 + + +# ─── Shared-infra edge ────────────────────────────────────────────────────── + + +def shared_infra_weight(a: IdentityFeatures, b: IdentityFeatures) -> float: + """Jaccard over payload-hashes ∪ C2-endpoints ∪ decky-set. + + At identity level this gets vetoed by the fingerprint-disagreement + rule (``ed32358``); at campaign level it's the *primary* positive + signal — distinct identities sharing infra is the canonical co-op + pattern. We treat all three sets as one combined alphabet so a + single shared payload + C2 + decky add together rather than + averaging away a strong signal in one set with weak overlap in + another. + + Returns Jaccard across the union of the three set families, + ``0.0`` when both sides are empty. + """ + a_set = a.payload_hashes | a.c2_endpoints | a.decky_set + b_set = b.payload_hashes | b.c2_endpoints | b.decky_set + if not a_set and not b_set: + return 0.0 + union = a_set | b_set + if not union: + return 0.0 + return len(a_set & b_set) / len(union) + + +# ─── Temporal-overlap edge ────────────────────────────────────────────────── + + +def temporal_overlap_weight( + a: IdentityFeatures, b: IdentityFeatures, +) -> float: + """Pairwise-relative temporal overlap fraction. + + Returns the fraction of A's total session time that overlaps any + B session, capped at ``1.0``. Pairwise-relative: the value is + invariant under a uniform Δ-shift of every timestamp on both + sides (F7 fixture's invariant). Returns ``0.0`` when either side + has no session windows. + + Two non-cooperating actors with bounded operations rarely overlap + by chance; co-op campaigns overlap heavily. Defeated alone (one + overlapping minute means little) — combined with shared-infra + or handoff it pulls a pair over threshold. + """ + if not a.session_windows or not b.session_windows: + return 0.0 + a_total = sum(end - start for start, end in a.session_windows) + if a_total <= 0: + return 0.0 + overlap = 0.0 + for a_start, a_end in a.session_windows: + for b_start, b_end in b.session_windows: + lo = max(a_start, b_start) + hi = min(a_end, b_end) + if hi > lo: + overlap += hi - lo + return min(1.0, overlap / a_total) + + +# ─── Cohort edges ─────────────────────────────────────────────────────────── + + +def cohort_weight(a: IdentityFeatures, b: IdentityFeatures) -> float: + """ASN-cohort + tooling-cohort weak signal. + + Jaccard over the union of ASN cohort and tooling cohort. F2's + failure mode (one identity rotating across many ASNs) doesn't + apply at *campaign* level — but multiple identities cooperating + out of the same hosting cohort is plausible co-op evidence. + + Weak by design: the combined-weight tier multiplier keeps this + from crossing threshold alone. + """ + a_set: frozenset = frozenset( + {("asn", str(x)) for x in a.asn_cohort} + | {("tool", x) for x in a.tooling_cohort} + ) + b_set: frozenset = frozenset( + {("asn", str(x)) for x in b.asn_cohort} + | {("tool", x) for x in b.tooling_cohort} + ) + if not a_set and not b_set: + return 0.0 + union = a_set | b_set + if not union: + return 0.0 + return len(a_set & b_set) / len(union) + + +# ─── Combined campaign-level weight ───────────────────────────────────────── + + +#: Tier multipliers for the campaign graph. Tuned so: +#: +#: * Phase-handoff alone (1.0 → 1.0) crosses threshold — a clean +#: F5-style handoff is sufficient evidence on its own. +#: * Shared-infra alone (max 1.0) yields 0.7 — strong but not enough +#: without supporting evidence (F1 burns the same wordlist / +#: different campaigns shouldn't fuse on infra alone). +#: * Temporal overlap alone (max 1.0) yields 0.4 — supporting weight. +#: * Cohort alone (max 1.0) yields 0.1 — defeats F2-style failures. +#: +#: Shared-infra + temporal overlap together (1.1) cross threshold — +#: the canonical co-op pattern. Shared-infra + cohort (0.8) does +#: NOT — F1's wordlist-overlap-only failure mode is preserved. +CAMPAIGN_TIER_WEIGHTS: dict[str, float] = { + "phase_handoff": 1.0, + "shared_infra": 0.7, + "temporal_overlap": 0.4, + "cohort": 0.1, +} + +#: Threshold a combined campaign-edge weight must meet to survive +#: into the similarity graph. +CAMPAIGN_EDGE_THRESHOLD: float = 1.0 + + +def combined_campaign_weight( + a: IdentityFeatures, + b: IdentityFeatures, + *, + handoff_window_s: float = DEFAULT_HANDOFF_WINDOW_S, +) -> float: + """Sum of all four tier scores, weighted by + :data:`CAMPAIGN_TIER_WEIGHTS`. + + The campaign-clusterer worker compares this against + :data:`CAMPAIGN_EDGE_THRESHOLD` to decide whether to draw an + edge. Pure / time-agnostic — F7 invariant preserved. + """ + return ( + CAMPAIGN_TIER_WEIGHTS["phase_handoff"] + * phase_handoff_weight(a, b, handoff_window_s) + + CAMPAIGN_TIER_WEIGHTS["shared_infra"] * shared_infra_weight(a, b) + + CAMPAIGN_TIER_WEIGHTS["temporal_overlap"] + * temporal_overlap_weight(a, b) + + CAMPAIGN_TIER_WEIGHTS["cohort"] * cohort_weight(a, b) + ) + + +# ─── Adapter for synthetic-fixture tests ──────────────────────────────────── + + +def from_synthetic_identity(att, identity_uuid: Optional[str] = None) -> IdentityFeatures: # type: ignore[no-untyped-def] + """Build an :class:`IdentityFeatures` from a ``SyntheticAttacker``. + + Treats one ``SyntheticAttacker`` as one identity — adequate for + fixture validation where the campaign-clusterer reads identities + not raw observations. The worker's production-row adapter + (commit 3) builds the same shape from real ``AttackerIdentity`` + rows + their member observations. + + Lives here so test code doesn't import the factory shape into the + production module — the adapter is a documented integration point. + """ + payload_hashes: set[str] = set() + c2_endpoints: set[str] = set() + decky_set: set[str] = set() + asn_cohort: set[int] = set() + if att.asn is not None: + asn_cohort.add(att.asn) + + commands_by_phase_on_decky: dict[tuple[str, str], list[str]] = {} + last_phase_per_decky: dict[str, str] = {} + first_phase_per_decky: dict[str, str] = {} + last_seen_per_decky: dict[str, float] = {} + first_seen_per_decky: dict[str, float] = {} + session_windows: list[tuple[float, float]] = [] + + # SyntheticSession order is the campaign DSL's emission order, which + # is monotonically time-ordered by construction. We rely on that to + # extract first/last phase per decky. + for s in att.sessions: + if s.payload_hash: + payload_hashes.add(s.payload_hash) + if s.c2_callback: + c2_endpoints.add(s.c2_callback) + decky = getattr(s, "decky", None) or getattr(s, "decky_id", None) + if decky: + decky_set.add(decky) + ts_start = getattr(s, "start_ts", None) + ts_end = getattr(s, "end_ts", None) + if ts_start is not None and ts_end is not None: + session_windows.append((float(ts_start), float(ts_end))) + phase_value = s.phase.value if hasattr(s, "phase") else None + if decky and phase_value: + key = (decky, phase_value) + if s.commands: + commands_by_phase_on_decky.setdefault(key, []).extend(s.commands) + if decky not in first_phase_per_decky: + first_phase_per_decky[decky] = phase_value + if ts_start is not None: + first_seen_per_decky[decky] = float(ts_start) + last_phase_per_decky[decky] = phase_value + if ts_end is not None: + last_seen_per_decky[decky] = float(ts_end) + + return IdentityFeatures( + identity_uuid=identity_uuid or att.attacker_id, + asn_cohort=frozenset(asn_cohort), + tooling_cohort=frozenset(), + payload_hashes=frozenset(payload_hashes), + c2_endpoints=frozenset(c2_endpoints), + decky_set=frozenset(decky_set), + commands_by_phase_on_decky={ + k: tuple(v) for k, v in commands_by_phase_on_decky.items() + }, + session_windows=tuple(session_windows), + last_phase_per_decky=dict(last_phase_per_decky), + first_phase_per_decky=dict(first_phase_per_decky), + last_seen_per_decky=dict(last_seen_per_decky), + first_seen_per_decky=dict(first_seen_per_decky), + ) + + +__all__ = [ + "IdentityFeatures", + "phase_handoff_weight", + "shared_infra_weight", + "temporal_overlap_weight", + "cohort_weight", + "combined_campaign_weight", + "from_synthetic_identity", + "HANDOFF_OUT_PHASES", + "HANDOFF_IN_PHASES", + "DEFAULT_HANDOFF_WINDOW_S", + "CAMPAIGN_TIER_WEIGHTS", + "CAMPAIGN_EDGE_THRESHOLD", +] diff --git a/tests/clustering/test_campaign_similarity.py b/tests/clustering/test_campaign_similarity.py new file mode 100644 index 00000000..45c6c4e7 --- /dev/null +++ b/tests/clustering/test_campaign_similarity.py @@ -0,0 +1,344 @@ +"""Tests for campaign-level similarity primitives. + +Covers, in order: + +* Each edge family in isolation — phase-handoff, shared-infra, + temporal-overlap, cohort. +* The F7 (slow_burn) time-agnostic invariant — shifting every + timestamp on both sides by the same Δ preserves every edge weight. +* The F1 (shared_wordlist) failure mode — shared cohort alone must + NOT push a pair over threshold. +* The F5 (multi_operator) target — phase-handoff alone (the + load-bearing campaign-level signal) DOES cross threshold. +* Tier-combination arithmetic — shared-infra + temporal overlap + (the canonical co-op pattern) crosses threshold; shared-infra + + cohort does not. +""" +from __future__ import annotations + +import pytest + +from decnet.clustering.campaign.impl.similarity import ( + CAMPAIGN_EDGE_THRESHOLD, + DEFAULT_HANDOFF_WINDOW_S, + IdentityFeatures, + cohort_weight, + combined_campaign_weight, + phase_handoff_weight, + shared_infra_weight, + temporal_overlap_weight, +) + + +def _features(uuid: str, **kwargs) -> IdentityFeatures: + return IdentityFeatures(identity_uuid=uuid, **kwargs) + + +# ─── phase_handoff_weight ──────────────────────────────────────────────────── + + +def test_phase_handoff_clean_out_to_in_within_window(): + a = _features( + "a", + last_phase_per_decky={"d1": "command_and_control"}, + last_seen_per_decky={"d1": 1000.0}, + ) + b = _features( + "b", + first_phase_per_decky={"d1": "discovery"}, + first_seen_per_decky={"d1": 1000.0 + 600.0}, # 10 min later + ) + assert phase_handoff_weight(a, b) == 1.0 + + +def test_phase_handoff_symmetric(): + # B finishes, A picks up. The argument order shouldn't matter. + b = _features( + "b", + last_phase_per_decky={"d1": "persistence"}, + last_seen_per_decky={"d1": 5000.0}, + ) + a = _features( + "a", + first_phase_per_decky={"d1": "lateral_movement"}, + first_seen_per_decky={"d1": 5000.0 + 60.0}, + ) + assert phase_handoff_weight(a, b) == 1.0 + assert phase_handoff_weight(b, a) == 1.0 + + +def test_phase_handoff_no_decky_overlap(): + a = _features( + "a", + last_phase_per_decky={"d1": "command_and_control"}, + last_seen_per_decky={"d1": 1000.0}, + ) + b = _features( + "b", + first_phase_per_decky={"d2": "discovery"}, + first_seen_per_decky={"d2": 1100.0}, + ) + assert phase_handoff_weight(a, b) == 0.0 + + +def test_phase_handoff_phase_mismatch(): + # A ends mid-pivoting (not a handoff-out phase) → no signal. + a = _features( + "a", + last_phase_per_decky={"d1": "exploitation"}, + last_seen_per_decky={"d1": 1000.0}, + ) + b = _features( + "b", + first_phase_per_decky={"d1": "discovery"}, + first_seen_per_decky={"d1": 1100.0}, + ) + assert phase_handoff_weight(a, b) == 0.0 + + +def test_phase_handoff_outside_window(): + a = _features( + "a", + last_phase_per_decky={"d1": "command_and_control"}, + last_seen_per_decky={"d1": 0.0}, + ) + b = _features( + "b", + first_phase_per_decky={"d1": "discovery"}, + # Way past the 24h default window. + first_seen_per_decky={"d1": DEFAULT_HANDOFF_WINDOW_S + 3600.0}, + ) + assert phase_handoff_weight(a, b) == 0.0 + + +def test_phase_handoff_negative_gap_rejected(): + # B starts BEFORE A ends — that's overlap, not a handoff. + a = _features( + "a", + last_phase_per_decky={"d1": "persistence"}, + last_seen_per_decky={"d1": 2000.0}, + ) + b = _features( + "b", + first_phase_per_decky={"d1": "lateral_movement"}, + first_seen_per_decky={"d1": 1000.0}, + ) + assert phase_handoff_weight(a, b) == 0.0 + + +# ─── shared_infra_weight ───────────────────────────────────────────────────── + + +def test_shared_infra_full_overlap(): + a = _features( + "a", + payload_hashes=frozenset({"hash-1"}), + c2_endpoints=frozenset({"1.2.3.4:443"}), + decky_set=frozenset({"d1"}), + ) + b = _features( + "b", + payload_hashes=frozenset({"hash-1"}), + c2_endpoints=frozenset({"1.2.3.4:443"}), + decky_set=frozenset({"d1"}), + ) + assert shared_infra_weight(a, b) == 1.0 + + +def test_shared_infra_no_overlap(): + a = _features("a", payload_hashes=frozenset({"hash-a"})) + b = _features("b", payload_hashes=frozenset({"hash-b"})) + assert shared_infra_weight(a, b) == 0.0 + + +def test_shared_infra_empty_returns_zero(): + a = _features("a") + b = _features("b") + assert shared_infra_weight(a, b) == 0.0 + + +# ─── temporal_overlap_weight ───────────────────────────────────────────────── + + +def test_temporal_overlap_full(): + a = _features("a", session_windows=((0.0, 100.0),)) + b = _features("b", session_windows=((0.0, 100.0),)) + assert temporal_overlap_weight(a, b) == 1.0 + + +def test_temporal_overlap_partial(): + a = _features("a", session_windows=((0.0, 100.0),)) + b = _features("b", session_windows=((50.0, 150.0),)) + # 50 of 100 of A's time overlaps B. + assert temporal_overlap_weight(a, b) == pytest.approx(0.5) + + +def test_temporal_overlap_disjoint(): + a = _features("a", session_windows=((0.0, 100.0),)) + b = _features("b", session_windows=((200.0, 300.0),)) + assert temporal_overlap_weight(a, b) == 0.0 + + +def test_temporal_overlap_empty(): + a = _features("a") + b = _features("b", session_windows=((0.0, 100.0),)) + assert temporal_overlap_weight(a, b) == 0.0 + + +# ─── cohort_weight ─────────────────────────────────────────────────────────── + + +def test_cohort_asn_overlap(): + a = _features("a", asn_cohort=frozenset({64512})) + b = _features("b", asn_cohort=frozenset({64512})) + assert cohort_weight(a, b) == 1.0 + + +def test_cohort_disjoint(): + a = _features("a", asn_cohort=frozenset({64512})) + b = _features("b", asn_cohort=frozenset({64513})) + assert cohort_weight(a, b) == 0.0 + + +# ─── F7 time-agnostic invariant ────────────────────────────────────────────── + + +def test_f7_invariant_temporal_overlap_unchanged_under_shift(): + # The fixture-7 (slow_burn) invariant: shifting every timestamp on + # BOTH sides by the same Δ must yield the same edge weight. The + # campaign clusterer's edges are pairwise-relative; an absolute + # 90-day shift must not change anything. + a = _features("a", session_windows=((0.0, 100.0), (300.0, 400.0))) + b = _features("b", session_windows=((50.0, 150.0), (350.0, 450.0))) + base = temporal_overlap_weight(a, b) + shift = 90 * 24 * 3600.0 + a_shifted = _features( + "a", + session_windows=tuple((s + shift, e + shift) for s, e in a.session_windows), + ) + b_shifted = _features( + "b", + session_windows=tuple((s + shift, e + shift) for s, e in b.session_windows), + ) + assert temporal_overlap_weight(a_shifted, b_shifted) == pytest.approx(base) + + +def test_f7_invariant_phase_handoff_unchanged_under_shift(): + a = _features( + "a", + last_phase_per_decky={"d1": "command_and_control"}, + last_seen_per_decky={"d1": 1000.0}, + ) + b = _features( + "b", + first_phase_per_decky={"d1": "discovery"}, + first_seen_per_decky={"d1": 1600.0}, + ) + base = phase_handoff_weight(a, b) + + shift = 90 * 24 * 3600.0 + a_shifted = _features( + "a", + last_phase_per_decky=dict(a.last_phase_per_decky), + last_seen_per_decky={k: v + shift for k, v in a.last_seen_per_decky.items()}, + ) + b_shifted = _features( + "b", + first_phase_per_decky=dict(b.first_phase_per_decky), + first_seen_per_decky={k: v + shift for k, v in b.first_seen_per_decky.items()}, + ) + assert phase_handoff_weight(a_shifted, b_shifted) == base == 1.0 + + +# ─── Combined-weight + threshold semantics ────────────────────────────────── + + +def test_phase_handoff_alone_crosses_threshold(): + """F5 multi_operator's load-bearing signal: handoff alone is enough.""" + a = _features( + "a", + last_phase_per_decky={"d1": "persistence"}, + last_seen_per_decky={"d1": 1000.0}, + ) + b = _features( + "b", + first_phase_per_decky={"d1": "lateral_movement"}, + first_seen_per_decky={"d1": 1100.0}, + ) + assert combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD + + +def test_cohort_alone_below_threshold(): + """F2 vpn_hopping at campaign level: cohort alone is not co-op.""" + a = _features("a", asn_cohort=frozenset({64512})) + b = _features("b", asn_cohort=frozenset({64512})) + assert combined_campaign_weight(a, b) < CAMPAIGN_EDGE_THRESHOLD + + +def test_shared_infra_plus_temporal_overlap_crosses_threshold(): + """The canonical co-op pattern: shared infra during the same window.""" + a = _features( + "a", + payload_hashes=frozenset({"h"}), + c2_endpoints=frozenset({"c"}), + decky_set=frozenset({"d1"}), + session_windows=((0.0, 100.0),), + ) + b = _features( + "b", + payload_hashes=frozenset({"h"}), + c2_endpoints=frozenset({"c"}), + decky_set=frozenset({"d1"}), + session_windows=((0.0, 100.0),), + ) + assert combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD + + +def test_shared_infra_plus_cohort_below_threshold(): + """F1 shared_wordlist: shared signals minus operational overlap is NOT co-op.""" + a = _features( + "a", + payload_hashes=frozenset({"h"}), + asn_cohort=frozenset({64512}), + ) + b = _features( + "b", + payload_hashes=frozenset({"h"}), + asn_cohort=frozenset({64512}), + ) + assert combined_campaign_weight(a, b) < CAMPAIGN_EDGE_THRESHOLD + + +def test_combined_invariant_under_shift(): + """End-to-end F7 invariant on the combined weight.""" + a = _features( + "a", + last_phase_per_decky={"d1": "persistence"}, + last_seen_per_decky={"d1": 1000.0}, + session_windows=((0.0, 1500.0),), + payload_hashes=frozenset({"h"}), + ) + b = _features( + "b", + first_phase_per_decky={"d1": "discovery"}, + first_seen_per_decky={"d1": 1100.0}, + session_windows=((1100.0, 2000.0),), + payload_hashes=frozenset({"h"}), + ) + base = combined_campaign_weight(a, b) + shift = 90 * 24 * 3600.0 + a_shifted = IdentityFeatures( + identity_uuid=a.identity_uuid, + last_phase_per_decky=dict(a.last_phase_per_decky), + last_seen_per_decky={k: v + shift for k, v in a.last_seen_per_decky.items()}, + session_windows=tuple((s + shift, e + shift) for s, e in a.session_windows), + payload_hashes=a.payload_hashes, + ) + b_shifted = IdentityFeatures( + identity_uuid=b.identity_uuid, + first_phase_per_decky=dict(b.first_phase_per_decky), + first_seen_per_decky={k: v + shift for k, v in b.first_seen_per_decky.items()}, + session_windows=tuple((s + shift, e + shift) for s, e in b.session_windows), + payload_hashes=b.payload_hashes, + ) + assert combined_campaign_weight(a_shifted, b_shifted) == pytest.approx(base)