From e80f3eec543e00f278f5b060064224dfc7c2b239 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 06:38:17 -0400 Subject: [PATCH] test(clustering): fixture 1 (shared_wordlist) + fixture-harness extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two campaigns sharing a credential wordlist; everything else (ASN, IPs, JA3, HASSH, active hours) divergent. Pass condition: clusterer must NOT merge. Protects against the "credential overlap is identity" failure mode that commodity wordlists invite. * tests/clustering/fixture_harness.py — shared assert_fixture_bounds helper + identity_clusterer (placeholder, trivially correct on all-singleton fixtures) + credential_jaccard_clusterer (deliberately- bad reference used to PROVE the fixture catches what it should). * tests/clustering/test_shared_wordlist_fixture.py — bounds pass with identity, bounds FAIL (homogeneity → 0) with the bad credential clusterer. The latter is the proof the fixture earns its keep. * tests/fixtures/campaigns/shared_wordlist.{yaml,expected.yaml}. * tests/clustering/test_lone_wolf_fixture.py — refactored onto the shared harness. No behavior change. --- tests/clustering/fixture_harness.py | 123 ++++++++++++++++++ tests/clustering/test_lone_wolf_fixture.py | 54 +++----- .../test_shared_wordlist_fixture.py | 117 +++++++++++++++++ .../campaigns/shared_wordlist.expected.yaml | 21 +++ tests/fixtures/campaigns/shared_wordlist.yaml | 84 ++++++++++++ 5 files changed, 363 insertions(+), 36 deletions(-) create mode 100644 tests/clustering/fixture_harness.py create mode 100644 tests/clustering/test_shared_wordlist_fixture.py create mode 100644 tests/fixtures/campaigns/shared_wordlist.expected.yaml create mode 100644 tests/fixtures/campaigns/shared_wordlist.yaml diff --git a/tests/clustering/fixture_harness.py b/tests/clustering/fixture_harness.py new file mode 100644 index 00000000..fe33224f --- /dev/null +++ b/tests/clustering/fixture_harness.py @@ -0,0 +1,123 @@ +""" +Shared helpers for fixture-driven clustering tests. + +Each fixture lives at `tests/fixtures/campaigns/.yaml` with paired +`.expected.yaml` bound file. The harness here keeps every per- +fixture test file down to "load corpus → predict → assert bounds" without +copy-pasting the bound-walk loop or reference clusterers across files. + +Two reference clusterers are provided: + +* `identity_clusterer` — every attacker is its own cluster. Trivially + passes any fixture whose ground truth is all singletons (lone_wolf, + shared_wordlist before merge, etc). Useful as a green baseline while + the real connected-components algorithm is under construction. + +* `credential_jaccard_clusterer` — deliberately-bad reference that + merges any two attackers whose credential-attempt sets overlap above + a threshold. Exists so fixtures like `shared_wordlist` can prove + they fail a clusterer that relies on credential overlap alone — the + whole point of fixture #1. +""" +from __future__ import annotations + +from collections.abc import Callable +from pathlib import Path + +import yaml + +from tests.clustering.metrics import score +from tests.factories.campaign_factory import GeneratedCorpus + +PredictFn = Callable[[GeneratedCorpus], dict[str, str]] + + +def assert_fixture_bounds( + corpus: GeneratedCorpus, + predict: PredictFn, + expected_path: str | Path, +) -> dict[str, float]: + """ + Run `predict` against the corpus, score against ground truth, and + assert every metric meets the floor declared in `expected_path`. + + Returns the observed metrics dict so callers can do additional + assertions (e.g. "homogeneity is *exactly* 1.0 for this fixture"). + """ + bounds = yaml.safe_load(Path(expected_path).read_text(encoding="utf-8")) + truth = corpus.truth_labels() + pred = predict(corpus) + metrics = score(truth, pred) + + failures = [] + for name, bound in bounds.items(): + observed = metrics[name] + floor = bound["min"] + if observed < floor: + failures.append(f"{name}={observed:.3f} < min {floor:.3f}") + assert not failures, ( + "fixture bounds violated: " + "; ".join(failures) + + f" (full metrics: {metrics})" + ) + return metrics + + +# ─── Reference clusterers ─────────────────────────────────────────────────── + + +def identity_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: + """Every attacker → its own cluster. Placeholder until §4 algorithm lands.""" + return {a.attacker_id: f"cluster-{a.attacker_id}" for a in corpus.attackers} + + +def credential_jaccard_clusterer( + corpus: GeneratedCorpus, *, threshold: float = 0.5 +) -> dict[str, str]: + """ + Deliberately-bad reference: union-find over attackers, edge whenever + two attackers' credential-attempt sets have Jaccard ≥ threshold. + + Used to demonstrate that fixtures targeting credential-overlap + failure modes (fixture 1: shared_wordlist) actually catch a clusterer + that leans on credential signals alone. NOT the real algorithm. + """ + # Build per-attacker credential sets. + creds: dict[str, set[tuple[str, str]]] = {} + for att in corpus.attackers: + s: set[tuple[str, str]] = set() + for sess in att.sessions: + s.update(sess.credentials_tried) + creds[att.attacker_id] = s + + # Union-find. + parent: dict[str, str] = {aid: aid for aid in creds} + + def find(x: str) -> str: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x: str, y: str) -> None: + rx, ry = find(x), find(y) + if rx != ry: + parent[rx] = ry + + ids = list(creds.keys()) + for i, a in enumerate(ids): + sa = creds[a] + if not sa: + continue + for b in ids[i + 1 :]: + sb = creds[b] + if not sb: + continue + inter = len(sa & sb) + union_size = len(sa | sb) + if union_size == 0: + continue + jaccard = inter / union_size + if jaccard >= threshold: + union(a, b) + + return {aid: find(aid) for aid in ids} diff --git a/tests/clustering/test_lone_wolf_fixture.py b/tests/clustering/test_lone_wolf_fixture.py index b2126a23..d92c4c71 100644 --- a/tests/clustering/test_lone_wolf_fixture.py +++ b/tests/clustering/test_lone_wolf_fixture.py @@ -1,50 +1,37 @@ """ End-to-end pipeline test for fixture 3 (lone_wolf). -Loads the YAML spec, runs the synthetic generator, applies a placeholder -identity clusterer (each attacker → its own cluster), scores against -the expected bounds. This is the simplest of the six fixtures and is -deliberately the first one wired up — its ground truth is all -singletons, so an identity clusterer trivially passes, which proves the -DSL→factory→metrics pipeline works before any real algorithm is built. +Loads the YAML spec, runs the synthetic generator, applies the +identity-clusterer placeholder (each attacker → its own cluster), and +scores against the expected bounds. This is the simplest of the six +fixtures and is deliberately the first one wired up — its ground truth +is all singletons, so an identity clusterer trivially passes, which +proves the DSL → factory → metrics pipeline works before any real +algorithm is built. Once the connected-components clusterer (CAMPAIGN_CLUSTERING.md §4) -lands, this test will swap the placeholder for the real implementation -and the same fixture must continue to pass. +lands, the same fixture must continue to pass. """ from __future__ import annotations from pathlib import Path import pytest -import yaml +from tests.clustering.fixture_harness import ( + assert_fixture_bounds, + identity_clusterer, +) from tests.clustering.metrics import score -from tests.factories.campaign_factory import GeneratedCorpus, generate, load_yaml +from tests.factories.campaign_factory import generate, load_yaml FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" -def _identity_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: - """Every attacker is its own cluster. Trivially correct on lone_wolf.""" - return {a.attacker_id: f"cluster-{a.attacker_id}" for a in corpus.attackers} - - def test_lone_wolf_pipeline_passes_bounds() -> None: spec = load_yaml(FIXTURE_DIR / "lone_wolf.yaml") - bounds = yaml.safe_load((FIXTURE_DIR / "lone_wolf.expected.yaml").read_text()) - corpus = generate(spec, seed=0) - truth = corpus.truth_labels() - pred = _identity_clusterer(corpus) - metrics = score(truth, pred) - - failures = [] - for name, bound in bounds.items(): - observed = metrics[name] - if observed < bound["min"]: - failures.append(f"{name}={observed:.3f} < min {bound['min']:.3f}") - assert not failures, "fixture bounds violated: " + "; ".join(failures) + assert_fixture_bounds(corpus, identity_clusterer, FIXTURE_DIR / "lone_wolf.expected.yaml") def test_lone_wolf_corpus_shape() -> None: @@ -53,7 +40,6 @@ def test_lone_wolf_corpus_shape() -> None: corpus = generate(spec, seed=0) assert len(corpus.attackers) == 9 assert len(corpus.sessions) == 9 - # Every attacker is a truth-singleton (its own campaign). truth_campaigns = {a.truth_campaign_id for a in corpus.attackers} assert len(truth_campaigns) == 9 @@ -64,7 +50,7 @@ def test_identity_clusterer_fails_on_a_real_campaign() -> None: multi-actor campaign should make the placeholder identity clusterer fail completeness, since each truth-campaign gets fragmented into one-member clusters. If this didn't fail, our metrics would be - blind to false splits — and that's the entire point of fixture 4 + blind to false splits — and that's the entire point of fixtures 4 and 5 in the design doc. """ spec = { @@ -82,11 +68,7 @@ def test_identity_clusterer_fails_on_a_real_campaign() -> None: } } corpus = generate(spec, seed=0) - truth = corpus.truth_labels() - pred = _identity_clusterer(corpus) - metrics = score(truth, pred) - # Identity clusterer splits the one true campaign across 2 clusters - # → completeness drops below 1.0. This must hold or our metrics - # aren't catching what they're supposed to catch. + pred = identity_clusterer(corpus) + metrics = score(corpus.truth_labels(), pred) assert metrics["completeness"] < 1.0 - assert metrics["homogeneity"] == pytest.approx(1.0) # no false merges, just splits + assert metrics["homogeneity"] == pytest.approx(1.0) diff --git a/tests/clustering/test_shared_wordlist_fixture.py b/tests/clustering/test_shared_wordlist_fixture.py new file mode 100644 index 00000000..3822b726 --- /dev/null +++ b/tests/clustering/test_shared_wordlist_fixture.py @@ -0,0 +1,117 @@ +""" +End-to-end pipeline test for fixture 1 (shared_wordlist). + +Two campaigns. Same SSH credential wordlist. Everything else divergent +— ASN, IPs, JA3, HASSH, active hours. + +The fixture exists to defeat one specific failure mode: a clusterer +that leans on credential-list overlap as a primary signal. Commodity +wordlists (rockyou, defaults lists, top-1k common-credentials) are +shared by hundreds of unrelated actors — credential overlap alone +cannot identify a campaign. + +Two tests cover this: + +1. `test_shared_wordlist_pipeline_passes_bounds` — runs the placeholder + identity clusterer against the fixture. Trivially green (each + campaign has one actor → identity puts each in its own cluster). + This is the ratchet point: when the real algorithm replaces the + placeholder, this test must continue to pass. + +2. `test_credential_jaccard_clusterer_fails_homogeneity` — runs a + deliberately-bad clusterer that merges any two attackers whose + credential sets overlap above 50% Jaccard. Proves the fixture + actually catches what it's designed to catch: this clusterer DOES + merge the two campaigns, and the fixture's homogeneity floor (0.90) + is breached. If this test ever passes, our fixture or our metric + harness is broken. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from tests.clustering.fixture_harness import ( + assert_fixture_bounds, + credential_jaccard_clusterer, + identity_clusterer, +) +from tests.clustering.metrics import score +from tests.factories.campaign_factory import generate, load_yaml + +FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" + + +def test_shared_wordlist_pipeline_passes_bounds() -> None: + spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml") + corpus = generate(spec, seed=0) + assert_fixture_bounds( + corpus, identity_clusterer, FIXTURE_DIR / "shared_wordlist.expected.yaml" + ) + + +def test_shared_wordlist_corpus_shape() -> None: + """Sanity: 2 campaigns × 1 actor = 2 attackers, 4 sessions + (delivery + credential_access × 3 sessions per campaign).""" + spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml") + corpus = generate(spec, seed=0) + assert len(corpus.attackers) == 2 + truth = corpus.truth_labels() + assert set(truth.values()) == {"shared-wordlist-A", "shared-wordlist-B"} + # Each attacker should have at least one credential_access session + # whose credentials_tried is the full shared list. + for att in corpus.attackers: + cred_sessions = [s for s in att.sessions if s.credentials_tried] + assert cred_sessions, f"attacker {att.attacker_id} has no credential sessions" + # All cred sessions should carry the same 8-entry wordlist. + for s in cred_sessions: + assert len(s.credentials_tried) == 8 + + +def test_credential_jaccard_clusterer_fails_homogeneity() -> None: + """ + The fixture's reason for being. A naive clusterer that merges on + credential-set Jaccard ≥ 0.5 will fuse the two campaigns (Jaccard + = 1.0 on shared wordlists). That fusion drives homogeneity to 0 + — exactly the failure mode the fixture protects against. + + If this test ever PASSES (i.e. the bad clusterer scores high on + this fixture), the fixture has lost its discrimination power and + needs to be re-examined. + """ + spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml") + corpus = generate(spec, seed=0) + pred = credential_jaccard_clusterer(corpus, threshold=0.5) + metrics = score(corpus.truth_labels(), pred) + # The two campaigns must be merged by this clusterer. + assert len(set(pred.values())) == 1, ( + "credential-Jaccard clusterer should merge both campaigns into one" + ) + # And homogeneity must collapse — that's the signal a fixture-aware + # CI gate would use to reject the bad clusterer. + assert metrics["homogeneity"] == pytest.approx(0.0) + + +def test_naive_clusterer_does_not_fool_the_fixture() -> None: + """ + Belt-and-braces: even though the bad clusterer collapses + homogeneity, it might still pass *some* metrics (completeness is + actually 1.0 — all members of each true campaign land in the + single mega-cluster). The fixture's bound floor on homogeneity + (0.90) must reject it. + """ + spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml") + corpus = generate(spec, seed=0) + pred = credential_jaccard_clusterer(corpus, threshold=0.5) + metrics = score(corpus.truth_labels(), pred) + bounds = { + "adjusted_rand_index": 0.85, + "homogeneity": 0.90, + "completeness": 0.80, + "singleton_recall": 0.95, + } + breaches = [k for k, floor in bounds.items() if metrics[k] < floor] + assert "homogeneity" in breaches, ( + f"fixture failed to catch the bad clusterer; observed metrics: {metrics}" + ) diff --git a/tests/fixtures/campaigns/shared_wordlist.expected.yaml b/tests/fixtures/campaigns/shared_wordlist.expected.yaml new file mode 100644 index 00000000..91b39da2 --- /dev/null +++ b/tests/fixtures/campaigns/shared_wordlist.expected.yaml @@ -0,0 +1,21 @@ +# Bounds for fixture 1 (shared_wordlist). +# +# Ground truth: two distinct campaigns, one actor each → 2 truth-labels +# of size 1. The clusterer must keep them separate. A correct algorithm +# scores 1.0 across every metric on this fixture. +# +# Homogeneity is the load-bearing metric here: a clusterer that merges +# the two campaigns based on shared credentials will tank homogeneity +# (one predicted cluster contains members of two true campaigns). +# +# Bounds are loose at v1; tighten as the algorithm matures. Loosening +# any bound to make CI pass requires PR-comment justification (per +# CAMPAIGN_CLUSTERING.md §2). +adjusted_rand_index: + min: 0.85 +homogeneity: + min: 0.90 +completeness: + min: 0.80 +singleton_recall: + min: 0.95 diff --git a/tests/fixtures/campaigns/shared_wordlist.yaml b/tests/fixtures/campaigns/shared_wordlist.yaml new file mode 100644 index 00000000..165fa58e --- /dev/null +++ b/tests/fixtures/campaigns/shared_wordlist.yaml @@ -0,0 +1,84 @@ +# Fixture 1 (shared_wordlist) — see development/CAMPAIGN_CLUSTERING.md §2. +# +# Two distinct campaigns, both bruteforcing SSH with the SAME credential +# wordlist (rockyou-top1k flavor). EVERYTHING ELSE diverges: +# - different ASNs (DigitalOcean vs Comcast residential) +# - different IP ranges (sticky pools, generated separately) +# - different JA3 / HASSH (different SSH client toolchains) +# - different active hours (UTC-day vs UTC-night) +# +# Pass condition: the clusterer must NOT merge these into one campaign. +# Credential overlap alone is not enough signal — commodity wordlists are +# shared by hundreds of unrelated actors. A clusterer that leans on +# credential-list Jaccard alone will fail this fixture (we prove this in +# the test file with a deliberately-bad credential-Jaccard reference +# clusterer). +corpus: + campaigns: + - campaign: + id: shared-wordlist-A + actors: + - id: actor-A + asn: 14061 # DigitalOcean + ip_pool: sticky + ja3: "771,4865-4866-4867-49195-49199-49196-49200,0-23-65281-10-11-35-16-5-13-18-51-45-43-27-17513,29-23-24,0" + hassh: "alpha-aaaaaaaa-aaaaaaaa-aaaaaaaa" + hours_active_utc: [10, 11, 12, 13, 14] + jitter_seconds: 60 + phases: + - name: delivery + actor: actor-A + target_selector: { service: ssh, count: 1 } + dwell_seconds: 1 + - name: credential_access + actor: actor-A + tool_signature: + commands: [] + credentials: + - [admin, admin] + - [admin, password] + - [admin, "12345"] + - [root, root] + - [root, toor] + - [root, "123456"] + - [user, user] + - [test, test] + target_selector: { service: ssh, count: 3 } + dwell_seconds: 5 + duration_days: 1 + + - campaign: + id: shared-wordlist-B + actors: + - id: actor-B + asn: 7922 # Comcast residential + ip_pool: sticky + ja3: "769,49195-49199-156-49162-49161-49171-49172-51-50-47,0-10-11-13-23-65281,29-23-24-25,0" + hassh: "beta-bbbbbbbb-bbbbbbbb-bbbbbbbb" + hours_active_utc: [22, 23, 0, 1, 2] + jitter_seconds: 60 + phases: + - name: delivery + actor: actor-B + target_selector: { service: ssh, count: 1 } + dwell_seconds: 1 + - name: credential_access + actor: actor-B + tool_signature: + commands: [] + # IDENTICAL wordlist to campaign A — this is the trap. + credentials: + - [admin, admin] + - [admin, password] + - [admin, "12345"] + - [root, root] + - [root, toor] + - [root, "123456"] + - [user, user] + - [test, test] + target_selector: { service: ssh, count: 3 } + dwell_seconds: 5 + duration_days: 1 + + noise: + scanner_count: 0