feat(clustering): campaign-level similarity primitives

The signal taxonomy for the campaign clusterer (next commit). Mirror
of the identity-layer module but with edge families that don't
translate 1:1: phase-handoff (load-bearing for F5 multi_operator —
the signal the identity-side fingerprint-disagreement veto deliberately
isn't), shared-infra (vetoed at identity level, primary positive
signal here), temporal-overlap (pairwise-relative — F7 invariance
preserved), cohort (weak supporting weight only).

Tier weights tuned so phase-handoff alone crosses threshold (F5),
shared-infra + temporal-overlap together cross (canonical co-op
pattern), and shared-infra + cohort together do NOT (F1
shared_wordlist's failure mode). The F7 time-shift invariant is
explicitly tested on every time-bearing edge and on the combined
weight.
This commit is contained in:
2026-04-26 08:57:46 -04:00
parent 0a1cf65ddb
commit 0946bab424
4 changed files with 763 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
"""Campaign clusterer — groups resolved identities into operations.
The layer above identity resolution. See
``development/CAMPAIGN_CLUSTERING.md`` for the signal taxonomy.
"""

View File

@@ -0,0 +1,414 @@
"""Similarity-graph primitives for the campaign clusterer.
The campaign clusterer reads ``AttackerIdentity`` rows (the layer below)
and groups them into operations. The graph it builds is **not** the
identity-level graph: identity-level signals don't translate 1:1, and
some that get vetoed at identity level (shared infra) are the *primary
positive signal* at campaign level.
Mirror of ``decnet.clustering.impl.similarity`` for the
identity layer; see that module for the four-tier identity taxonomy.
**Time-agnostic.** Same F7 invariant as the identity layer — edges
MUST depend only on *pairwise relative* offsets, never on absolute
clocks. Shift two identities' session windows by the same Δ and the
edge weights MUST be identical. The temporal-overlap edge below uses
this invariant explicitly.
**Edge families** (from ``development/CAMPAIGN_CLUSTERING.md``):
* **Phase-handoff** — A ends in ``COMMAND_AND_CONTROL`` / ``PERSISTENCE``
on decky D, B begins ``DISCOVERY`` / ``LATERAL_MOVEMENT`` on D
within window W. Load-bearing for fixture F5 (multi_operator) — the
signal the identity-side fingerprint-disagreement veto deliberately
doesn't try to be.
* **Shared-infra** — Jaccard over aggregated payload-hashes /
C2-endpoints / decky-set across the identities' member observations.
Vetoed at identity level (``ed32358``); primary positive signal here.
* **Temporal overlap** — sessions inside a bounded *relative* window.
Campaigns are operations and operations have bounded duration;
overlap of distinct identities on shared infra is the canonical
co-op pattern.
* **Cohort** — ASN-cohort + tooling-cohort weak signals. Defeated alone
(per F2); useful as supporting weight only.
The functions are pure (no DB, no I/O); the worker maps identities into
:class:`IdentityFeatures` once per tick and feeds these into the graph
builder in a sibling module.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Mapping, Optional
# ─── Identity-level projection ──────────────────────────────────────────────
@dataclass(frozen=True)
class IdentityFeatures:
"""Minimal projection of an :class:`AttackerIdentity` row.
Built once per identity by the worker (or per fixture identity in
tests via :func:`from_synthetic_identity`). Keeping the projection
tight isolates the campaign-graph code from schema drift on the
identity layer.
"""
identity_uuid: str
"""Stable ID — production: ``AttackerIdentity.uuid``."""
asn_cohort: frozenset[int] = field(default_factory=frozenset)
"""All ASNs observed across the identity's member observations.
A single rotating actor (F2) appears in many ASNs; the *set*
overlap is the cohort signal."""
tooling_cohort: frozenset[str] = field(default_factory=frozenset)
"""Tooling labels (e.g. ``"hydra"``, ``"hping"``) inferred from
fingerprints / commands. Empty until tooling-attribution lands."""
payload_hashes: frozenset[str] = field(default_factory=frozenset)
"""Aggregated payload hashes across member observations."""
c2_endpoints: frozenset[str] = field(default_factory=frozenset)
"""Aggregated C2 endpoints across member observations."""
decky_set: frozenset[str] = field(default_factory=frozenset)
"""Aggregated decky IDs the identity touched."""
commands_by_phase_on_decky: Mapping[
tuple[str, str], tuple[str, ...]
] = field(default_factory=dict)
"""``(decky_id, UKCPhase.value)`` → ordered command sequence
observed on that decky in that phase. Required for the
phase-handoff edge — same decky is the join key. Empty when
``commands_by_phase`` is unavailable on the production-row
adapter (deferred per TODO.md until log-mining lands)."""
session_windows: tuple[tuple[float, float], ...] = ()
"""Per-session ``(start_ts, end_ts)`` tuples in seconds since
epoch. Used ONLY for pairwise relative deltas — never compared
to an absolute clock. F7 (slow_burn) invariance check verifies
that adding Δ to every entry on both sides yields the same edge
weight."""
last_phase_per_decky: Mapping[str, str] = field(default_factory=dict)
"""``decky_id`` → last UKC phase observed on that decky. The
"from" side of a phase handoff."""
first_phase_per_decky: Mapping[str, str] = field(default_factory=dict)
"""``decky_id`` → first UKC phase observed on that decky. The
"to" side of a phase handoff."""
last_seen_per_decky: Mapping[str, float] = field(default_factory=dict)
"""``decky_id`` → last activity timestamp on that decky. Pairs
with :attr:`first_seen_per_decky` to compute pairwise handoff
gap relative to the two identities (no absolute clock)."""
first_seen_per_decky: Mapping[str, float] = field(default_factory=dict)
"""``decky_id`` → first activity timestamp on that decky."""
# ─── Phase-handoff edge ─────────────────────────────────────────────────────
#: Phases that mark a *handoff-out* — operator A is finished setting
#: up a foothold and the next operator can step in. Drawn from the
#: STAGE_IN tail (PERSISTENCE / COMMAND_AND_CONTROL) per the UKC
#: vocabulary; expanding this set is a tunable knob.
HANDOFF_OUT_PHASES: frozenset[str] = frozenset({
"command_and_control",
"persistence",
})
#: Phases that mark a *handoff-in* — operator B picks up a prepared
#: foothold and starts operating through the network. STAGE_THROUGH
#: head (DISCOVERY / LATERAL_MOVEMENT).
HANDOFF_IN_PHASES: frozenset[str] = frozenset({
"discovery",
"lateral_movement",
})
#: Default handoff-window in seconds. The "B starts within W of A's
#: end" guard. Bounded relative to the pair — fixture F7 invariant
#: still holds because shifting both timestamps preserves the gap.
DEFAULT_HANDOFF_WINDOW_S: float = 24 * 3600.0 # 24h
def phase_handoff_weight(
a: IdentityFeatures,
b: IdentityFeatures,
window_s: float = DEFAULT_HANDOFF_WINDOW_S,
) -> float:
"""Phase-handoff edge — the load-bearing F5 signal.
Returns ``1.0`` if there exists a decky D such that EITHER:
* A's last phase on D is in :data:`HANDOFF_OUT_PHASES`, B's first
phase on D is in :data:`HANDOFF_IN_PHASES`, and B's first
activity on D is within ``window_s`` AFTER A's last activity
on D, OR
* the symmetric case with A and B swapped.
Returns ``0.0`` when no shared decky has a matching out→in pair
within window. Window comparison is on the *gap* (a single
subtraction) — pairwise-relative, so F7 invariance holds.
"""
return max(
_directed_handoff(a, b, window_s),
_directed_handoff(b, a, window_s),
)
def _directed_handoff(
out: IdentityFeatures, in_: IdentityFeatures, window_s: float,
) -> float:
shared = set(out.last_phase_per_decky) & set(in_.first_phase_per_decky)
for decky in shared:
out_phase = out.last_phase_per_decky.get(decky)
in_phase = in_.first_phase_per_decky.get(decky)
if out_phase not in HANDOFF_OUT_PHASES:
continue
if in_phase not in HANDOFF_IN_PHASES:
continue
out_t = out.last_seen_per_decky.get(decky)
in_t = in_.first_seen_per_decky.get(decky)
if out_t is None or in_t is None:
continue
gap = in_t - out_t
if 0.0 <= gap <= window_s:
return 1.0
return 0.0
# ─── Shared-infra edge ──────────────────────────────────────────────────────
def shared_infra_weight(a: IdentityFeatures, b: IdentityFeatures) -> float:
"""Jaccard over payload-hashes C2-endpoints decky-set.
At identity level this gets vetoed by the fingerprint-disagreement
rule (``ed32358``); at campaign level it's the *primary* positive
signal — distinct identities sharing infra is the canonical co-op
pattern. We treat all three sets as one combined alphabet so a
single shared payload + C2 + decky add together rather than
averaging away a strong signal in one set with weak overlap in
another.
Returns Jaccard across the union of the three set families,
``0.0`` when both sides are empty.
"""
a_set = a.payload_hashes | a.c2_endpoints | a.decky_set
b_set = b.payload_hashes | b.c2_endpoints | b.decky_set
if not a_set and not b_set:
return 0.0
union = a_set | b_set
if not union:
return 0.0
return len(a_set & b_set) / len(union)
# ─── Temporal-overlap edge ──────────────────────────────────────────────────
def temporal_overlap_weight(
a: IdentityFeatures, b: IdentityFeatures,
) -> float:
"""Pairwise-relative temporal overlap fraction.
Returns the fraction of A's total session time that overlaps any
B session, capped at ``1.0``. Pairwise-relative: the value is
invariant under a uniform Δ-shift of every timestamp on both
sides (F7 fixture's invariant). Returns ``0.0`` when either side
has no session windows.
Two non-cooperating actors with bounded operations rarely overlap
by chance; co-op campaigns overlap heavily. Defeated alone (one
overlapping minute means little) — combined with shared-infra
or handoff it pulls a pair over threshold.
"""
if not a.session_windows or not b.session_windows:
return 0.0
a_total = sum(end - start for start, end in a.session_windows)
if a_total <= 0:
return 0.0
overlap = 0.0
for a_start, a_end in a.session_windows:
for b_start, b_end in b.session_windows:
lo = max(a_start, b_start)
hi = min(a_end, b_end)
if hi > lo:
overlap += hi - lo
return min(1.0, overlap / a_total)
# ─── Cohort edges ───────────────────────────────────────────────────────────
def cohort_weight(a: IdentityFeatures, b: IdentityFeatures) -> float:
"""ASN-cohort + tooling-cohort weak signal.
Jaccard over the union of ASN cohort and tooling cohort. F2's
failure mode (one identity rotating across many ASNs) doesn't
apply at *campaign* level — but multiple identities cooperating
out of the same hosting cohort is plausible co-op evidence.
Weak by design: the combined-weight tier multiplier keeps this
from crossing threshold alone.
"""
a_set: frozenset = frozenset(
{("asn", str(x)) for x in a.asn_cohort}
| {("tool", x) for x in a.tooling_cohort}
)
b_set: frozenset = frozenset(
{("asn", str(x)) for x in b.asn_cohort}
| {("tool", x) for x in b.tooling_cohort}
)
if not a_set and not b_set:
return 0.0
union = a_set | b_set
if not union:
return 0.0
return len(a_set & b_set) / len(union)
# ─── Combined campaign-level weight ─────────────────────────────────────────
#: Tier multipliers for the campaign graph. Tuned so:
#:
#: * Phase-handoff alone (1.0 → 1.0) crosses threshold — a clean
#: F5-style handoff is sufficient evidence on its own.
#: * Shared-infra alone (max 1.0) yields 0.7 — strong but not enough
#: without supporting evidence (F1 burns the same wordlist /
#: different campaigns shouldn't fuse on infra alone).
#: * Temporal overlap alone (max 1.0) yields 0.4 — supporting weight.
#: * Cohort alone (max 1.0) yields 0.1 — defeats F2-style failures.
#:
#: Shared-infra + temporal overlap together (1.1) cross threshold —
#: the canonical co-op pattern. Shared-infra + cohort (0.8) does
#: NOT — F1's wordlist-overlap-only failure mode is preserved.
CAMPAIGN_TIER_WEIGHTS: dict[str, float] = {
"phase_handoff": 1.0,
"shared_infra": 0.7,
"temporal_overlap": 0.4,
"cohort": 0.1,
}
#: Threshold a combined campaign-edge weight must meet to survive
#: into the similarity graph.
CAMPAIGN_EDGE_THRESHOLD: float = 1.0
def combined_campaign_weight(
a: IdentityFeatures,
b: IdentityFeatures,
*,
handoff_window_s: float = DEFAULT_HANDOFF_WINDOW_S,
) -> float:
"""Sum of all four tier scores, weighted by
:data:`CAMPAIGN_TIER_WEIGHTS`.
The campaign-clusterer worker compares this against
:data:`CAMPAIGN_EDGE_THRESHOLD` to decide whether to draw an
edge. Pure / time-agnostic — F7 invariant preserved.
"""
return (
CAMPAIGN_TIER_WEIGHTS["phase_handoff"]
* phase_handoff_weight(a, b, handoff_window_s)
+ CAMPAIGN_TIER_WEIGHTS["shared_infra"] * shared_infra_weight(a, b)
+ CAMPAIGN_TIER_WEIGHTS["temporal_overlap"]
* temporal_overlap_weight(a, b)
+ CAMPAIGN_TIER_WEIGHTS["cohort"] * cohort_weight(a, b)
)
# ─── Adapter for synthetic-fixture tests ────────────────────────────────────
def from_synthetic_identity(att, identity_uuid: Optional[str] = None) -> IdentityFeatures: # type: ignore[no-untyped-def]
"""Build an :class:`IdentityFeatures` from a ``SyntheticAttacker``.
Treats one ``SyntheticAttacker`` as one identity — adequate for
fixture validation where the campaign-clusterer reads identities
not raw observations. The worker's production-row adapter
(commit 3) builds the same shape from real ``AttackerIdentity``
rows + their member observations.
Lives here so test code doesn't import the factory shape into the
production module — the adapter is a documented integration point.
"""
payload_hashes: set[str] = set()
c2_endpoints: set[str] = set()
decky_set: set[str] = set()
asn_cohort: set[int] = set()
if att.asn is not None:
asn_cohort.add(att.asn)
commands_by_phase_on_decky: dict[tuple[str, str], list[str]] = {}
last_phase_per_decky: dict[str, str] = {}
first_phase_per_decky: dict[str, str] = {}
last_seen_per_decky: dict[str, float] = {}
first_seen_per_decky: dict[str, float] = {}
session_windows: list[tuple[float, float]] = []
# SyntheticSession order is the campaign DSL's emission order, which
# is monotonically time-ordered by construction. We rely on that to
# extract first/last phase per decky.
for s in att.sessions:
if s.payload_hash:
payload_hashes.add(s.payload_hash)
if s.c2_callback:
c2_endpoints.add(s.c2_callback)
decky = getattr(s, "decky", None) or getattr(s, "decky_id", None)
if decky:
decky_set.add(decky)
ts_start = getattr(s, "start_ts", None)
ts_end = getattr(s, "end_ts", None)
if ts_start is not None and ts_end is not None:
session_windows.append((float(ts_start), float(ts_end)))
phase_value = s.phase.value if hasattr(s, "phase") else None
if decky and phase_value:
key = (decky, phase_value)
if s.commands:
commands_by_phase_on_decky.setdefault(key, []).extend(s.commands)
if decky not in first_phase_per_decky:
first_phase_per_decky[decky] = phase_value
if ts_start is not None:
first_seen_per_decky[decky] = float(ts_start)
last_phase_per_decky[decky] = phase_value
if ts_end is not None:
last_seen_per_decky[decky] = float(ts_end)
return IdentityFeatures(
identity_uuid=identity_uuid or att.attacker_id,
asn_cohort=frozenset(asn_cohort),
tooling_cohort=frozenset(),
payload_hashes=frozenset(payload_hashes),
c2_endpoints=frozenset(c2_endpoints),
decky_set=frozenset(decky_set),
commands_by_phase_on_decky={
k: tuple(v) for k, v in commands_by_phase_on_decky.items()
},
session_windows=tuple(session_windows),
last_phase_per_decky=dict(last_phase_per_decky),
first_phase_per_decky=dict(first_phase_per_decky),
last_seen_per_decky=dict(last_seen_per_decky),
first_seen_per_decky=dict(first_seen_per_decky),
)
__all__ = [
"IdentityFeatures",
"phase_handoff_weight",
"shared_infra_weight",
"temporal_overlap_weight",
"cohort_weight",
"combined_campaign_weight",
"from_synthetic_identity",
"HANDOFF_OUT_PHASES",
"HANDOFF_IN_PHASES",
"DEFAULT_HANDOFF_WINDOW_S",
"CAMPAIGN_TIER_WEIGHTS",
"CAMPAIGN_EDGE_THRESHOLD",
]

View File

@@ -0,0 +1,344 @@
"""Tests for campaign-level similarity primitives.
Covers, in order:
* Each edge family in isolation — phase-handoff, shared-infra,
temporal-overlap, cohort.
* The F7 (slow_burn) time-agnostic invariant — shifting every
timestamp on both sides by the same Δ preserves every edge weight.
* The F1 (shared_wordlist) failure mode — shared cohort alone must
NOT push a pair over threshold.
* The F5 (multi_operator) target — phase-handoff alone (the
load-bearing campaign-level signal) DOES cross threshold.
* Tier-combination arithmetic — shared-infra + temporal overlap
(the canonical co-op pattern) crosses threshold; shared-infra +
cohort does not.
"""
from __future__ import annotations
import pytest
from decnet.clustering.campaign.impl.similarity import (
CAMPAIGN_EDGE_THRESHOLD,
DEFAULT_HANDOFF_WINDOW_S,
IdentityFeatures,
cohort_weight,
combined_campaign_weight,
phase_handoff_weight,
shared_infra_weight,
temporal_overlap_weight,
)
def _features(uuid: str, **kwargs) -> IdentityFeatures:
return IdentityFeatures(identity_uuid=uuid, **kwargs)
# ─── phase_handoff_weight ────────────────────────────────────────────────────
def test_phase_handoff_clean_out_to_in_within_window():
a = _features(
"a",
last_phase_per_decky={"d1": "command_and_control"},
last_seen_per_decky={"d1": 1000.0},
)
b = _features(
"b",
first_phase_per_decky={"d1": "discovery"},
first_seen_per_decky={"d1": 1000.0 + 600.0}, # 10 min later
)
assert phase_handoff_weight(a, b) == 1.0
def test_phase_handoff_symmetric():
# B finishes, A picks up. The argument order shouldn't matter.
b = _features(
"b",
last_phase_per_decky={"d1": "persistence"},
last_seen_per_decky={"d1": 5000.0},
)
a = _features(
"a",
first_phase_per_decky={"d1": "lateral_movement"},
first_seen_per_decky={"d1": 5000.0 + 60.0},
)
assert phase_handoff_weight(a, b) == 1.0
assert phase_handoff_weight(b, a) == 1.0
def test_phase_handoff_no_decky_overlap():
a = _features(
"a",
last_phase_per_decky={"d1": "command_and_control"},
last_seen_per_decky={"d1": 1000.0},
)
b = _features(
"b",
first_phase_per_decky={"d2": "discovery"},
first_seen_per_decky={"d2": 1100.0},
)
assert phase_handoff_weight(a, b) == 0.0
def test_phase_handoff_phase_mismatch():
# A ends mid-pivoting (not a handoff-out phase) → no signal.
a = _features(
"a",
last_phase_per_decky={"d1": "exploitation"},
last_seen_per_decky={"d1": 1000.0},
)
b = _features(
"b",
first_phase_per_decky={"d1": "discovery"},
first_seen_per_decky={"d1": 1100.0},
)
assert phase_handoff_weight(a, b) == 0.0
def test_phase_handoff_outside_window():
a = _features(
"a",
last_phase_per_decky={"d1": "command_and_control"},
last_seen_per_decky={"d1": 0.0},
)
b = _features(
"b",
first_phase_per_decky={"d1": "discovery"},
# Way past the 24h default window.
first_seen_per_decky={"d1": DEFAULT_HANDOFF_WINDOW_S + 3600.0},
)
assert phase_handoff_weight(a, b) == 0.0
def test_phase_handoff_negative_gap_rejected():
# B starts BEFORE A ends — that's overlap, not a handoff.
a = _features(
"a",
last_phase_per_decky={"d1": "persistence"},
last_seen_per_decky={"d1": 2000.0},
)
b = _features(
"b",
first_phase_per_decky={"d1": "lateral_movement"},
first_seen_per_decky={"d1": 1000.0},
)
assert phase_handoff_weight(a, b) == 0.0
# ─── shared_infra_weight ─────────────────────────────────────────────────────
def test_shared_infra_full_overlap():
a = _features(
"a",
payload_hashes=frozenset({"hash-1"}),
c2_endpoints=frozenset({"1.2.3.4:443"}),
decky_set=frozenset({"d1"}),
)
b = _features(
"b",
payload_hashes=frozenset({"hash-1"}),
c2_endpoints=frozenset({"1.2.3.4:443"}),
decky_set=frozenset({"d1"}),
)
assert shared_infra_weight(a, b) == 1.0
def test_shared_infra_no_overlap():
a = _features("a", payload_hashes=frozenset({"hash-a"}))
b = _features("b", payload_hashes=frozenset({"hash-b"}))
assert shared_infra_weight(a, b) == 0.0
def test_shared_infra_empty_returns_zero():
a = _features("a")
b = _features("b")
assert shared_infra_weight(a, b) == 0.0
# ─── temporal_overlap_weight ─────────────────────────────────────────────────
def test_temporal_overlap_full():
a = _features("a", session_windows=((0.0, 100.0),))
b = _features("b", session_windows=((0.0, 100.0),))
assert temporal_overlap_weight(a, b) == 1.0
def test_temporal_overlap_partial():
a = _features("a", session_windows=((0.0, 100.0),))
b = _features("b", session_windows=((50.0, 150.0),))
# 50 of 100 of A's time overlaps B.
assert temporal_overlap_weight(a, b) == pytest.approx(0.5)
def test_temporal_overlap_disjoint():
a = _features("a", session_windows=((0.0, 100.0),))
b = _features("b", session_windows=((200.0, 300.0),))
assert temporal_overlap_weight(a, b) == 0.0
def test_temporal_overlap_empty():
a = _features("a")
b = _features("b", session_windows=((0.0, 100.0),))
assert temporal_overlap_weight(a, b) == 0.0
# ─── cohort_weight ───────────────────────────────────────────────────────────
def test_cohort_asn_overlap():
a = _features("a", asn_cohort=frozenset({64512}))
b = _features("b", asn_cohort=frozenset({64512}))
assert cohort_weight(a, b) == 1.0
def test_cohort_disjoint():
a = _features("a", asn_cohort=frozenset({64512}))
b = _features("b", asn_cohort=frozenset({64513}))
assert cohort_weight(a, b) == 0.0
# ─── F7 time-agnostic invariant ──────────────────────────────────────────────
def test_f7_invariant_temporal_overlap_unchanged_under_shift():
# The fixture-7 (slow_burn) invariant: shifting every timestamp on
# BOTH sides by the same Δ must yield the same edge weight. The
# campaign clusterer's edges are pairwise-relative; an absolute
# 90-day shift must not change anything.
a = _features("a", session_windows=((0.0, 100.0), (300.0, 400.0)))
b = _features("b", session_windows=((50.0, 150.0), (350.0, 450.0)))
base = temporal_overlap_weight(a, b)
shift = 90 * 24 * 3600.0
a_shifted = _features(
"a",
session_windows=tuple((s + shift, e + shift) for s, e in a.session_windows),
)
b_shifted = _features(
"b",
session_windows=tuple((s + shift, e + shift) for s, e in b.session_windows),
)
assert temporal_overlap_weight(a_shifted, b_shifted) == pytest.approx(base)
def test_f7_invariant_phase_handoff_unchanged_under_shift():
a = _features(
"a",
last_phase_per_decky={"d1": "command_and_control"},
last_seen_per_decky={"d1": 1000.0},
)
b = _features(
"b",
first_phase_per_decky={"d1": "discovery"},
first_seen_per_decky={"d1": 1600.0},
)
base = phase_handoff_weight(a, b)
shift = 90 * 24 * 3600.0
a_shifted = _features(
"a",
last_phase_per_decky=dict(a.last_phase_per_decky),
last_seen_per_decky={k: v + shift for k, v in a.last_seen_per_decky.items()},
)
b_shifted = _features(
"b",
first_phase_per_decky=dict(b.first_phase_per_decky),
first_seen_per_decky={k: v + shift for k, v in b.first_seen_per_decky.items()},
)
assert phase_handoff_weight(a_shifted, b_shifted) == base == 1.0
# ─── Combined-weight + threshold semantics ──────────────────────────────────
def test_phase_handoff_alone_crosses_threshold():
"""F5 multi_operator's load-bearing signal: handoff alone is enough."""
a = _features(
"a",
last_phase_per_decky={"d1": "persistence"},
last_seen_per_decky={"d1": 1000.0},
)
b = _features(
"b",
first_phase_per_decky={"d1": "lateral_movement"},
first_seen_per_decky={"d1": 1100.0},
)
assert combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD
def test_cohort_alone_below_threshold():
"""F2 vpn_hopping at campaign level: cohort alone is not co-op."""
a = _features("a", asn_cohort=frozenset({64512}))
b = _features("b", asn_cohort=frozenset({64512}))
assert combined_campaign_weight(a, b) < CAMPAIGN_EDGE_THRESHOLD
def test_shared_infra_plus_temporal_overlap_crosses_threshold():
"""The canonical co-op pattern: shared infra during the same window."""
a = _features(
"a",
payload_hashes=frozenset({"h"}),
c2_endpoints=frozenset({"c"}),
decky_set=frozenset({"d1"}),
session_windows=((0.0, 100.0),),
)
b = _features(
"b",
payload_hashes=frozenset({"h"}),
c2_endpoints=frozenset({"c"}),
decky_set=frozenset({"d1"}),
session_windows=((0.0, 100.0),),
)
assert combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD
def test_shared_infra_plus_cohort_below_threshold():
"""F1 shared_wordlist: shared signals minus operational overlap is NOT co-op."""
a = _features(
"a",
payload_hashes=frozenset({"h"}),
asn_cohort=frozenset({64512}),
)
b = _features(
"b",
payload_hashes=frozenset({"h"}),
asn_cohort=frozenset({64512}),
)
assert combined_campaign_weight(a, b) < CAMPAIGN_EDGE_THRESHOLD
def test_combined_invariant_under_shift():
"""End-to-end F7 invariant on the combined weight."""
a = _features(
"a",
last_phase_per_decky={"d1": "persistence"},
last_seen_per_decky={"d1": 1000.0},
session_windows=((0.0, 1500.0),),
payload_hashes=frozenset({"h"}),
)
b = _features(
"b",
first_phase_per_decky={"d1": "discovery"},
first_seen_per_decky={"d1": 1100.0},
session_windows=((1100.0, 2000.0),),
payload_hashes=frozenset({"h"}),
)
base = combined_campaign_weight(a, b)
shift = 90 * 24 * 3600.0
a_shifted = IdentityFeatures(
identity_uuid=a.identity_uuid,
last_phase_per_decky=dict(a.last_phase_per_decky),
last_seen_per_decky={k: v + shift for k, v in a.last_seen_per_decky.items()},
session_windows=tuple((s + shift, e + shift) for s, e in a.session_windows),
payload_hashes=a.payload_hashes,
)
b_shifted = IdentityFeatures(
identity_uuid=b.identity_uuid,
first_phase_per_decky=dict(b.first_phase_per_decky),
first_seen_per_decky={k: v + shift for k, v in b.first_seen_per_decky.items()},
session_windows=tuple((s + shift, e + shift) for s, e in b.session_windows),
payload_hashes=b.payload_hashes,
)
assert combined_campaign_weight(a_shifted, b_shifted) == pytest.approx(base)