From 27f7de9886d80d92e5b70356a6acadfb06d1bcb3 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 07:46:14 -0400 Subject: [PATCH] test(clustering): fixture 5 multi_operator + c2/shift/composite refs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new reference clusterers in fixture_harness: * c2_callback_clusterer — union-find on overlapping C2 callback sets across an attacker's sessions. Pass-clusterer for fixture 5 where two operators with distinct tooling share a C2 endpoint as the campaign signal. * shift_clusterer — deliberately-bad reference that buckets attackers by majority session-start hour into night/day/swing. Adversarial reference for fixture 5; proves operational schedule is NOT a campaign signal. * composite_signals_clusterer — union-find combining (ja3, hassh) match OR overlapping C2 callback. Will serve as the pass- clusterer for fixture 6 (noise_floor) where multiple campaigns with heterogeneous signal types are scored together. Also factored a small _union_find helper for the new clusterers (existing time_window/credential_jaccard left untouched to avoid mixing refactor with feature work). Fixture 5 (multi_operator): one campaign, two operators with distinct UKC roles. Actor A (broker, night shift): Delivery → Exploitation → Persistence → C2. Actor B (post-ex, day shift): Discovery → Lateral Movement → Collection → Exfiltration. Distinct JA3/HASSH/ASN/IPs; shared C2 + payload hash. Four tests: corpus shape (distinct fingerprints, shared C2, disjoint shifts), pipeline pass via c2_callback_clusterer, explicit harness sanity that fingerprint_clusterer cannot resolve this fixture (documents which signal carries the campaign), and adversarial shift_clusterer fragmentation. Phase-handoff edges (the real load-bearing signal per the design doc) wait for the production clusterer; this fixture will prove they're needed when it ships. --- tests/clustering/fixture_harness.py | 161 ++++++++++++++++++ .../clustering/test_multi_operator_fixture.py | 134 +++++++++++++++ .../campaigns/multi_operator.expected.yaml | 25 +++ tests/fixtures/campaigns/multi_operator.yaml | 108 ++++++++++++ 4 files changed, 428 insertions(+) create mode 100644 tests/clustering/test_multi_operator_fixture.py create mode 100644 tests/fixtures/campaigns/multi_operator.expected.yaml create mode 100644 tests/fixtures/campaigns/multi_operator.yaml diff --git a/tests/clustering/fixture_harness.py b/tests/clustering/fixture_harness.py index 04fd8d7e..b128d679 100644 --- a/tests/clustering/fixture_harness.py +++ b/tests/clustering/fixture_harness.py @@ -42,6 +42,23 @@ cluster on, not the quality of the result. other. Exists so fixtures like `paused_campaign` (fixture #4) can prove they fail a clusterer that treats short-window time proximity as a primary signal — operators pause, sleep, take weekends. + +* `c2_callback_clusterer` — union-find on overlapping C2 callback + sets. Pass-clusterer for fixture 5 (multi_operator), where two + operators with distinct tooling share a C2 endpoint as the + load-bearing campaign signal. Attackers with no C2 endpoints + become their own singleton. + +* `shift_clusterer` — deliberately-bad reference that buckets + attackers by majority session-start hour into night/day/swing. + Exists so fixture 5 can prove they fail a clusterer that treats + shift schedule as a primary signal — operators on different + schedules can still share a campaign. + +* `composite_signals_clusterer` — union-find that combines + ``(ja3, hassh)`` match OR shared C2 callback into the same + cluster. Approximates the planned similarity graph well enough + to score the combined-corpus fixture (fixture 6, noise_floor). """ from __future__ import annotations @@ -123,6 +140,150 @@ def asn_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: return {a.attacker_id: f"asn-{a.asn}" for a in corpus.attackers} +def _union_find(ids: list[str]) -> tuple[ + dict[str, str], Callable[[str], str], Callable[[str, str], None] +]: + """Return (parent, find, union) for a fresh union-find over ``ids``.""" + parent: dict[str, str] = {aid: aid for aid in ids} + + def find(x: str) -> str: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x: str, y: str) -> None: + rx, ry = find(x), find(y) + if rx != ry: + parent[rx] = ry + + return parent, find, union + + +def c2_callback_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: + """Union attackers whose session-collected C2 callback sets overlap. + + Attackers with no C2 callbacks become their own singleton (an + un-fingerprinted opportunistic scanner has no link to anyone). + """ + callbacks: dict[str, set[str]] = {} + for att in corpus.attackers: + callbacks[att.attacker_id] = { + s.c2_callback for s in att.sessions if s.c2_callback + } + + ids = list(callbacks.keys()) + _parent, find, union = _union_find(ids) + + for i, a in enumerate(ids): + sa = callbacks[a] + if not sa: + continue + for b in ids[i + 1 :]: + sb = callbacks[b] + if not sb: + continue + if sa & sb: + union(a, b) + + pred: dict[str, str] = {} + for aid in ids: + if not callbacks[aid]: + pred[aid] = f"c2-none-{aid}" + else: + pred[aid] = f"c2-{find(aid)}" + return pred + + +def shift_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: + """Bucket attackers by majority session-start hour into night / + day / swing. Deliberately-bad — see fixture 5. + + Buckets: + * night — hours [22, 23, 0, 1, 2, 3, 4, 5] + * day — hours [6, 7, 8, 9, 10, 11, 12, 13] + * swing — hours [14, 15, 16, 17, 18, 19, 20, 21] + + Attackers with no sessions become their own singleton. + """ + night = {22, 23, 0, 1, 2, 3, 4, 5} + day = {6, 7, 8, 9, 10, 11, 12, 13} + + def bucket(hour: int) -> str: + if hour in night: + return "night" + if hour in day: + return "day" + return "swing" + + pred: dict[str, str] = {} + for att in corpus.attackers: + if not att.sessions: + pred[att.attacker_id] = f"shift-none-{att.attacker_id}" + continue + counts: dict[str, int] = {} + for s in att.sessions: + b = bucket(s.started_at.hour) + counts[b] = counts.get(b, 0) + 1 + majority = max(counts, key=lambda k: counts[k]) + pred[att.attacker_id] = f"shift-{majority}" + return pred + + +def composite_signals_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: + """Union-find combining ``(ja3, hassh)`` match OR overlapping C2 + callback sets. Approximates the stable-signals + C2-overlap arms + of the planned similarity graph; used as the pass-clusterer for + fixture 6 where multiple campaigns + noise are scored together. + + Attackers with NO signals (no fingerprint, no C2) stay singleton. + """ + callbacks: dict[str, set[str]] = {} + fingerprint: dict[str, tuple[str | None, str | None] | None] = {} + for att in corpus.attackers: + callbacks[att.attacker_id] = { + s.c2_callback for s in att.sessions if s.c2_callback + } + if att.ja3 is None and att.hassh is None: + fingerprint[att.attacker_id] = None + else: + fingerprint[att.attacker_id] = (att.ja3, att.hassh) + + ids = list(callbacks.keys()) + _parent, find, union = _union_find(ids) + + # Fingerprint edges. + by_fp: dict[tuple[str | None, str | None], list[str]] = {} + for aid, fp in fingerprint.items(): + if fp is None: + continue + by_fp.setdefault(fp, []).append(aid) + for group in by_fp.values(): + anchor = group[0] + for other in group[1:]: + union(anchor, other) + + # C2 overlap edges. + for i, a in enumerate(ids): + sa = callbacks[a] + if not sa: + continue + for b in ids[i + 1 :]: + sb = callbacks[b] + if not sb: + continue + if sa & sb: + union(a, b) + + pred: dict[str, str] = {} + for aid in ids: + if fingerprint[aid] is None and not callbacks[aid]: + pred[aid] = f"composite-singleton-{aid}" + else: + pred[aid] = f"composite-{find(aid)}" + return pred + + def time_window_clusterer( corpus: GeneratedCorpus, *, gap_days: float = 1.0 ) -> dict[str, str]: diff --git a/tests/clustering/test_multi_operator_fixture.py b/tests/clustering/test_multi_operator_fixture.py new file mode 100644 index 00000000..7cdd5067 --- /dev/null +++ b/tests/clustering/test_multi_operator_fixture.py @@ -0,0 +1,134 @@ +""" +End-to-end pipeline test for fixture 5 (multi_operator). + +One campaign, two operators with distinct UKC roles, distinct +tooling (different JA3 + HASSH), distinct ASNs and IPs, on +opposite shift schedules. What ties them is shared C2 callback + +shared stage-1 payload hash — the planned similarity graph's +"payload simhash + C2 endpoint match" arms are what should resolve +them as one campaign. + +Three tests cover this: + +1. `test_multi_operator_corpus_shape` — sanity: two attackers, one + campaign, distinct fingerprints, shared C2 callback present in + both rows' sessions, distinct shift hours. + +2. `test_multi_operator_pipeline_passes_bounds` — runs + `c2_callback_clusterer` (the appropriate pass-clusterer for + this fixture, since fingerprint_clusterer would split the two + distinct operators). Folds both rows into one cluster via the + shared C2 endpoint. + +3. `test_shift_clusterer_fragments_campaign` — runs the deliberately + bad `shift_clusterer`. Actor A on night shift and Actor B on day + shift split into two clusters → completeness collapses → the + bound floor on completeness rejects the bad clusterer. This is + the canonical proof that operational-schedule overlap is NOT a + campaign signal. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from tests.clustering.fixture_harness import ( + assert_fixture_bounds, + c2_callback_clusterer, + fingerprint_clusterer, + shift_clusterer, +) +from tests.clustering.metrics import score +from tests.factories.campaign_factory import generate, load_yaml + +FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" +FIXTURE_YAML = FIXTURE_DIR / "multi_operator.yaml" +EXPECTED_YAML = FIXTURE_DIR / "multi_operator.expected.yaml" + + +def test_multi_operator_corpus_shape() -> None: + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + assert len(corpus.attackers) == 2 + truth_campaigns = {a.truth_campaign_id for a in corpus.attackers} + assert truth_campaigns == {"multi-operator-001"} + # Two distinct fingerprints — the operators are different people + # using different tools. + ja3s = {a.ja3 for a in corpus.attackers} + hasshs = {a.hassh for a in corpus.attackers} + assert len(ja3s) == 2 + assert len(hasshs) == 2 + # Shared C2 callback across both rows' sessions. + by_actor = {a.truth_actor_id: a for a in corpus.attackers} + broker = by_actor["ops-broker-night"] + postex = by_actor["ops-postex-day"] + broker_c2s = {s.c2_callback for s in broker.sessions if s.c2_callback} + postex_c2s = {s.c2_callback for s in postex.sessions if s.c2_callback} + assert "c2.shared-op.example" in broker_c2s + assert "c2.shared-op.example" in postex_c2s + # Shifts are disjoint — load-bearing for the adversarial test. + broker_hours = {s.started_at.hour for s in broker.sessions} + postex_hours = {s.started_at.hour for s in postex.sessions} + assert broker_hours <= {22, 23, 0, 1, 2, 3} + assert postex_hours <= {9, 10, 11, 12, 13} + + +def test_multi_operator_pipeline_passes_bounds() -> None: + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + metrics = assert_fixture_bounds(corpus, c2_callback_clusterer, EXPECTED_YAML) + pred = c2_callback_clusterer(corpus) + assert len(set(pred.values())) == 1, ( + "c2_callback_clusterer should fold both operators into one cluster" + ) + assert metrics["adjusted_rand_index"] == pytest.approx(1.0) + + +def test_fingerprint_clusterer_cannot_resolve_this_fixture() -> None: + """ + Sanity for the harness, NOT a test of the clusterer: with two + distinct fingerprints and one truth campaign, + `fingerprint_clusterer` produces 2 clusters → completeness + collapses. This is *why* the fixture's pass-clusterer is + `c2_callback_clusterer` instead. Documents which signal + actually carries the campaign here. + """ + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + pred = fingerprint_clusterer(corpus) + assert len(set(pred.values())) == 2 + metrics = score(corpus.truth_labels(level="campaign"), pred) + assert metrics["completeness"] == pytest.approx(0.0) + + +def test_shift_clusterer_fragments_campaign() -> None: + """ + The fixture's reason for being. Bucket attackers by shift and + the two operators land in 'night' and 'day' clusters → 2 + predicted clusters. Truth = 1 campaign → completeness collapses. + + If this test ever passes (shift_clusterer satisfies the bounds), + the fixture has lost its discrimination power. + """ + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + pred = shift_clusterer(corpus) + buckets = set(pred.values()) + assert buckets == {"shift-night", "shift-day"}, ( + f"expected one night cluster + one day cluster, got {buckets}" + ) + + metrics = score(corpus.truth_labels(level="campaign"), pred) + assert metrics["completeness"] == pytest.approx(0.0) + + bounds = { + "adjusted_rand_index": 0.85, + "homogeneity": 0.90, + "completeness": 0.80, + "singleton_recall": 0.95, + } + breaches = [k for k, floor in bounds.items() if metrics[k] < floor] + assert "completeness" in breaches, ( + f"fixture failed to catch the bad clusterer; observed metrics: {metrics}" + ) diff --git a/tests/fixtures/campaigns/multi_operator.expected.yaml b/tests/fixtures/campaigns/multi_operator.expected.yaml new file mode 100644 index 00000000..f59e8d32 --- /dev/null +++ b/tests/fixtures/campaigns/multi_operator.expected.yaml @@ -0,0 +1,25 @@ +# Bounds for fixture 5 (multi_operator). +# +# Ground truth at campaign-level: 1 campaign of 2 observation rows +# (one per DSL actor). A correct algorithm scores 1.0 across every +# metric on this fixture. +# +# Completeness is the load-bearing metric: a clusterer that splits +# the two operators by shift / by tooling / by ASN tanks +# completeness (the one true class is split across two predicted +# clusters). The adversarial shift_clusterer demonstrates this and +# the bound below rejects it. +# +# Campaign-level fixture only — the two DSL actors model two +# distinct identities (different tooling, different operators) by +# design. See the YAML header for the modeling note. +# +# Bounds are loose at v1; tighten as the algorithm matures. +adjusted_rand_index: + min: 0.85 +homogeneity: + min: 0.90 +completeness: + min: 0.80 +singleton_recall: + min: 0.95 diff --git a/tests/fixtures/campaigns/multi_operator.yaml b/tests/fixtures/campaigns/multi_operator.yaml new file mode 100644 index 00000000..f778b752 --- /dev/null +++ b/tests/fixtures/campaigns/multi_operator.yaml @@ -0,0 +1,108 @@ +# Fixture 5 (multi_operator) — see development/CAMPAIGN_CLUSTERING.md §2. +# +# One campaign, two operators with distinct UKC roles. Phase-handoff is +# the load-bearing signal; this fixture is what proves the algorithm +# needs it. +# +# Actor A (night shift, hours 22-03 UTC): +# Delivery → Exploitation → Persistence → Command-and-Control +# +# Actor B (day shift, hours 10-15 UTC): +# Discovery → Lateral Movement → Collection → Exfiltration +# +# Different IPs, different ASNs, different JA3+HASSH (different +# tools — A is the access broker, B is the post-exploitation +# operator). What ties them is shared C2 callback and shared +# stage-1 payload hash. +# +# Pass condition: a clusterer that resolves on shared C2 callback +# (or, more generally, the planned similarity graph's payload + +# C2 + phase-handoff signals) folds the two actors into one +# campaign cluster. Demonstrated by `c2_callback_clusterer`. +# +# Adversarial condition: `shift_clusterer` (group attackers by +# majority shift bucket — night/day/swing) puts A in "night" and B +# in "day", fragmenting the campaign. Completeness collapses; the +# bound floor on completeness rejects the bad clusterer. This is +# the canonical demonstration that operational-schedule overlap is +# NOT a campaign signal — different operators on different shifts +# can still be one campaign. +# +# Like fixture 4, this is a CAMPAIGN-LEVEL fixture only. The two +# DSL actors mint two distinct truth_identity_id rows by design +# (different operators, different tools — they are different +# identities even though they're one campaign). Identity-level +# scoring is fixture 2's job. +campaign: + id: multi-operator-001 + duration_days: 3 + actors: + - id: ops-broker-night + asn: 64530 + ip_pool: sticky + # Tool A's TLS stack — older OpenSSL signature. + ja3: "771,49195-49199-49196-49200-156-157-47-53,0-23-65281-10-11-35-16-5-13-18-51-45-43-27,29-23-24,0" + hassh: "ops-broker-eeeeeeee-eeeeeeee-eeeeeeee" + hours_active_utc: [22, 23, 0, 1, 2, 3] + jitter_seconds: 60 + - id: ops-postex-day + asn: 64531 + ip_pool: sticky + # Tool B's TLS stack — distinctly different from A. + ja3: "769,49162-49161-49171-49172-51-50-47,0-10-11-13-23-65281,29-23-24-25,0" + hassh: "ops-postex-ffffffff-ffffffff-ffffffff" + hours_active_utc: [9, 10, 11, 12, 13] + jitter_seconds: 60 + phases: + # Actor A — initial access path, owns the foothold. + - name: delivery + actor: ops-broker-night + tool_signature: + c2_callback: "c2.shared-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 1 + - name: exploitation + actor: ops-broker-night + tool_signature: + payload_hash: "shared-op-stage1-payload" + c2_callback: "c2.shared-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5 + - name: persistence + actor: ops-broker-night + tool_signature: + c2_callback: "c2.shared-op.example" + target_selector: { decky: previous_success, count: 1 } + dwell_seconds: 5 + - name: command_and_control + actor: ops-broker-night + tool_signature: + c2_callback: "c2.shared-op.example" + target_selector: { decky: previous_success, count: 1 } + dwell_seconds: 5 + # Actor B — picks up after A's foothold; shares C2 + payload. + - name: discovery + actor: ops-postex-day + tool_signature: + c2_callback: "c2.shared-op.example" + target_selector: { decky: previous_success, count: 2 } + dwell_seconds: 5 + - name: lateral_movement + actor: ops-postex-day + tool_signature: + c2_callback: "c2.shared-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5 + - name: collection + actor: ops-postex-day + tool_signature: + payload_hash: "shared-op-stage1-payload" + c2_callback: "c2.shared-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5 + - name: exfiltration + actor: ops-postex-day + tool_signature: + c2_callback: "c2.shared-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5