test(clustering): fixture 1 (shared_wordlist) + fixture-harness extraction
Two campaigns sharing a credential wordlist; everything else (ASN, IPs,
JA3, HASSH, active hours) divergent. Pass condition: clusterer must NOT
merge. Protects against the "credential overlap is identity" failure
mode that commodity wordlists invite.
* tests/clustering/fixture_harness.py — shared assert_fixture_bounds
helper + identity_clusterer (placeholder, trivially correct on
all-singleton fixtures) + credential_jaccard_clusterer (deliberately-
bad reference used to PROVE the fixture catches what it should).
* tests/clustering/test_shared_wordlist_fixture.py — bounds pass with
identity, bounds FAIL (homogeneity → 0) with the bad credential
clusterer. The latter is the proof the fixture earns its keep.
* tests/fixtures/campaigns/shared_wordlist.{yaml,expected.yaml}.
* tests/clustering/test_lone_wolf_fixture.py — refactored onto the
shared harness. No behavior change.
This commit is contained in:
123
tests/clustering/fixture_harness.py
Normal file
123
tests/clustering/fixture_harness.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""
|
||||
Shared helpers for fixture-driven clustering tests.
|
||||
|
||||
Each fixture lives at `tests/fixtures/campaigns/<name>.yaml` with paired
|
||||
`<name>.expected.yaml` bound file. The harness here keeps every per-
|
||||
fixture test file down to "load corpus → predict → assert bounds" without
|
||||
copy-pasting the bound-walk loop or reference clusterers across files.
|
||||
|
||||
Two reference clusterers are provided:
|
||||
|
||||
* `identity_clusterer` — every attacker is its own cluster. Trivially
|
||||
passes any fixture whose ground truth is all singletons (lone_wolf,
|
||||
shared_wordlist before merge, etc). Useful as a green baseline while
|
||||
the real connected-components algorithm is under construction.
|
||||
|
||||
* `credential_jaccard_clusterer` — deliberately-bad reference that
|
||||
merges any two attackers whose credential-attempt sets overlap above
|
||||
a threshold. Exists so fixtures like `shared_wordlist` can prove
|
||||
they fail a clusterer that relies on credential overlap alone — the
|
||||
whole point of fixture #1.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
from tests.clustering.metrics import score
|
||||
from tests.factories.campaign_factory import GeneratedCorpus
|
||||
|
||||
PredictFn = Callable[[GeneratedCorpus], dict[str, str]]
|
||||
|
||||
|
||||
def assert_fixture_bounds(
|
||||
corpus: GeneratedCorpus,
|
||||
predict: PredictFn,
|
||||
expected_path: str | Path,
|
||||
) -> dict[str, float]:
|
||||
"""
|
||||
Run `predict` against the corpus, score against ground truth, and
|
||||
assert every metric meets the floor declared in `expected_path`.
|
||||
|
||||
Returns the observed metrics dict so callers can do additional
|
||||
assertions (e.g. "homogeneity is *exactly* 1.0 for this fixture").
|
||||
"""
|
||||
bounds = yaml.safe_load(Path(expected_path).read_text(encoding="utf-8"))
|
||||
truth = corpus.truth_labels()
|
||||
pred = predict(corpus)
|
||||
metrics = score(truth, pred)
|
||||
|
||||
failures = []
|
||||
for name, bound in bounds.items():
|
||||
observed = metrics[name]
|
||||
floor = bound["min"]
|
||||
if observed < floor:
|
||||
failures.append(f"{name}={observed:.3f} < min {floor:.3f}")
|
||||
assert not failures, (
|
||||
"fixture bounds violated: " + "; ".join(failures)
|
||||
+ f" (full metrics: {metrics})"
|
||||
)
|
||||
return metrics
|
||||
|
||||
|
||||
# ─── Reference clusterers ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def identity_clusterer(corpus: GeneratedCorpus) -> dict[str, str]:
|
||||
"""Every attacker → its own cluster. Placeholder until §4 algorithm lands."""
|
||||
return {a.attacker_id: f"cluster-{a.attacker_id}" for a in corpus.attackers}
|
||||
|
||||
|
||||
def credential_jaccard_clusterer(
|
||||
corpus: GeneratedCorpus, *, threshold: float = 0.5
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
Deliberately-bad reference: union-find over attackers, edge whenever
|
||||
two attackers' credential-attempt sets have Jaccard ≥ threshold.
|
||||
|
||||
Used to demonstrate that fixtures targeting credential-overlap
|
||||
failure modes (fixture 1: shared_wordlist) actually catch a clusterer
|
||||
that leans on credential signals alone. NOT the real algorithm.
|
||||
"""
|
||||
# Build per-attacker credential sets.
|
||||
creds: dict[str, set[tuple[str, str]]] = {}
|
||||
for att in corpus.attackers:
|
||||
s: set[tuple[str, str]] = set()
|
||||
for sess in att.sessions:
|
||||
s.update(sess.credentials_tried)
|
||||
creds[att.attacker_id] = s
|
||||
|
||||
# Union-find.
|
||||
parent: dict[str, str] = {aid: aid for aid in creds}
|
||||
|
||||
def find(x: str) -> str:
|
||||
while parent[x] != x:
|
||||
parent[x] = parent[parent[x]]
|
||||
x = parent[x]
|
||||
return x
|
||||
|
||||
def union(x: str, y: str) -> None:
|
||||
rx, ry = find(x), find(y)
|
||||
if rx != ry:
|
||||
parent[rx] = ry
|
||||
|
||||
ids = list(creds.keys())
|
||||
for i, a in enumerate(ids):
|
||||
sa = creds[a]
|
||||
if not sa:
|
||||
continue
|
||||
for b in ids[i + 1 :]:
|
||||
sb = creds[b]
|
||||
if not sb:
|
||||
continue
|
||||
inter = len(sa & sb)
|
||||
union_size = len(sa | sb)
|
||||
if union_size == 0:
|
||||
continue
|
||||
jaccard = inter / union_size
|
||||
if jaccard >= threshold:
|
||||
union(a, b)
|
||||
|
||||
return {aid: find(aid) for aid in ids}
|
||||
@@ -1,50 +1,37 @@
|
||||
"""
|
||||
End-to-end pipeline test for fixture 3 (lone_wolf).
|
||||
|
||||
Loads the YAML spec, runs the synthetic generator, applies a placeholder
|
||||
identity clusterer (each attacker → its own cluster), scores against
|
||||
the expected bounds. This is the simplest of the six fixtures and is
|
||||
deliberately the first one wired up — its ground truth is all
|
||||
singletons, so an identity clusterer trivially passes, which proves the
|
||||
DSL→factory→metrics pipeline works before any real algorithm is built.
|
||||
Loads the YAML spec, runs the synthetic generator, applies the
|
||||
identity-clusterer placeholder (each attacker → its own cluster), and
|
||||
scores against the expected bounds. This is the simplest of the six
|
||||
fixtures and is deliberately the first one wired up — its ground truth
|
||||
is all singletons, so an identity clusterer trivially passes, which
|
||||
proves the DSL → factory → metrics pipeline works before any real
|
||||
algorithm is built.
|
||||
|
||||
Once the connected-components clusterer (CAMPAIGN_CLUSTERING.md §4)
|
||||
lands, this test will swap the placeholder for the real implementation
|
||||
and the same fixture must continue to pass.
|
||||
lands, the same fixture must continue to pass.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from tests.clustering.fixture_harness import (
|
||||
assert_fixture_bounds,
|
||||
identity_clusterer,
|
||||
)
|
||||
from tests.clustering.metrics import score
|
||||
from tests.factories.campaign_factory import GeneratedCorpus, generate, load_yaml
|
||||
from tests.factories.campaign_factory import generate, load_yaml
|
||||
|
||||
FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns"
|
||||
|
||||
|
||||
def _identity_clusterer(corpus: GeneratedCorpus) -> dict[str, str]:
|
||||
"""Every attacker is its own cluster. Trivially correct on lone_wolf."""
|
||||
return {a.attacker_id: f"cluster-{a.attacker_id}" for a in corpus.attackers}
|
||||
|
||||
|
||||
def test_lone_wolf_pipeline_passes_bounds() -> None:
|
||||
spec = load_yaml(FIXTURE_DIR / "lone_wolf.yaml")
|
||||
bounds = yaml.safe_load((FIXTURE_DIR / "lone_wolf.expected.yaml").read_text())
|
||||
|
||||
corpus = generate(spec, seed=0)
|
||||
truth = corpus.truth_labels()
|
||||
pred = _identity_clusterer(corpus)
|
||||
metrics = score(truth, pred)
|
||||
|
||||
failures = []
|
||||
for name, bound in bounds.items():
|
||||
observed = metrics[name]
|
||||
if observed < bound["min"]:
|
||||
failures.append(f"{name}={observed:.3f} < min {bound['min']:.3f}")
|
||||
assert not failures, "fixture bounds violated: " + "; ".join(failures)
|
||||
assert_fixture_bounds(corpus, identity_clusterer, FIXTURE_DIR / "lone_wolf.expected.yaml")
|
||||
|
||||
|
||||
def test_lone_wolf_corpus_shape() -> None:
|
||||
@@ -53,7 +40,6 @@ def test_lone_wolf_corpus_shape() -> None:
|
||||
corpus = generate(spec, seed=0)
|
||||
assert len(corpus.attackers) == 9
|
||||
assert len(corpus.sessions) == 9
|
||||
# Every attacker is a truth-singleton (its own campaign).
|
||||
truth_campaigns = {a.truth_campaign_id for a in corpus.attackers}
|
||||
assert len(truth_campaigns) == 9
|
||||
|
||||
@@ -64,7 +50,7 @@ def test_identity_clusterer_fails_on_a_real_campaign() -> None:
|
||||
multi-actor campaign should make the placeholder identity clusterer
|
||||
fail completeness, since each truth-campaign gets fragmented into
|
||||
one-member clusters. If this didn't fail, our metrics would be
|
||||
blind to false splits — and that's the entire point of fixture 4
|
||||
blind to false splits — and that's the entire point of fixtures 4
|
||||
and 5 in the design doc.
|
||||
"""
|
||||
spec = {
|
||||
@@ -82,11 +68,7 @@ def test_identity_clusterer_fails_on_a_real_campaign() -> None:
|
||||
}
|
||||
}
|
||||
corpus = generate(spec, seed=0)
|
||||
truth = corpus.truth_labels()
|
||||
pred = _identity_clusterer(corpus)
|
||||
metrics = score(truth, pred)
|
||||
# Identity clusterer splits the one true campaign across 2 clusters
|
||||
# → completeness drops below 1.0. This must hold or our metrics
|
||||
# aren't catching what they're supposed to catch.
|
||||
pred = identity_clusterer(corpus)
|
||||
metrics = score(corpus.truth_labels(), pred)
|
||||
assert metrics["completeness"] < 1.0
|
||||
assert metrics["homogeneity"] == pytest.approx(1.0) # no false merges, just splits
|
||||
assert metrics["homogeneity"] == pytest.approx(1.0)
|
||||
|
||||
117
tests/clustering/test_shared_wordlist_fixture.py
Normal file
117
tests/clustering/test_shared_wordlist_fixture.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""
|
||||
End-to-end pipeline test for fixture 1 (shared_wordlist).
|
||||
|
||||
Two campaigns. Same SSH credential wordlist. Everything else divergent
|
||||
— ASN, IPs, JA3, HASSH, active hours.
|
||||
|
||||
The fixture exists to defeat one specific failure mode: a clusterer
|
||||
that leans on credential-list overlap as a primary signal. Commodity
|
||||
wordlists (rockyou, defaults lists, top-1k common-credentials) are
|
||||
shared by hundreds of unrelated actors — credential overlap alone
|
||||
cannot identify a campaign.
|
||||
|
||||
Two tests cover this:
|
||||
|
||||
1. `test_shared_wordlist_pipeline_passes_bounds` — runs the placeholder
|
||||
identity clusterer against the fixture. Trivially green (each
|
||||
campaign has one actor → identity puts each in its own cluster).
|
||||
This is the ratchet point: when the real algorithm replaces the
|
||||
placeholder, this test must continue to pass.
|
||||
|
||||
2. `test_credential_jaccard_clusterer_fails_homogeneity` — runs a
|
||||
deliberately-bad clusterer that merges any two attackers whose
|
||||
credential sets overlap above 50% Jaccard. Proves the fixture
|
||||
actually catches what it's designed to catch: this clusterer DOES
|
||||
merge the two campaigns, and the fixture's homogeneity floor (0.90)
|
||||
is breached. If this test ever passes, our fixture or our metric
|
||||
harness is broken.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from tests.clustering.fixture_harness import (
|
||||
assert_fixture_bounds,
|
||||
credential_jaccard_clusterer,
|
||||
identity_clusterer,
|
||||
)
|
||||
from tests.clustering.metrics import score
|
||||
from tests.factories.campaign_factory import generate, load_yaml
|
||||
|
||||
FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns"
|
||||
|
||||
|
||||
def test_shared_wordlist_pipeline_passes_bounds() -> None:
|
||||
spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml")
|
||||
corpus = generate(spec, seed=0)
|
||||
assert_fixture_bounds(
|
||||
corpus, identity_clusterer, FIXTURE_DIR / "shared_wordlist.expected.yaml"
|
||||
)
|
||||
|
||||
|
||||
def test_shared_wordlist_corpus_shape() -> None:
|
||||
"""Sanity: 2 campaigns × 1 actor = 2 attackers, 4 sessions
|
||||
(delivery + credential_access × 3 sessions per campaign)."""
|
||||
spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml")
|
||||
corpus = generate(spec, seed=0)
|
||||
assert len(corpus.attackers) == 2
|
||||
truth = corpus.truth_labels()
|
||||
assert set(truth.values()) == {"shared-wordlist-A", "shared-wordlist-B"}
|
||||
# Each attacker should have at least one credential_access session
|
||||
# whose credentials_tried is the full shared list.
|
||||
for att in corpus.attackers:
|
||||
cred_sessions = [s for s in att.sessions if s.credentials_tried]
|
||||
assert cred_sessions, f"attacker {att.attacker_id} has no credential sessions"
|
||||
# All cred sessions should carry the same 8-entry wordlist.
|
||||
for s in cred_sessions:
|
||||
assert len(s.credentials_tried) == 8
|
||||
|
||||
|
||||
def test_credential_jaccard_clusterer_fails_homogeneity() -> None:
|
||||
"""
|
||||
The fixture's reason for being. A naive clusterer that merges on
|
||||
credential-set Jaccard ≥ 0.5 will fuse the two campaigns (Jaccard
|
||||
= 1.0 on shared wordlists). That fusion drives homogeneity to 0
|
||||
— exactly the failure mode the fixture protects against.
|
||||
|
||||
If this test ever PASSES (i.e. the bad clusterer scores high on
|
||||
this fixture), the fixture has lost its discrimination power and
|
||||
needs to be re-examined.
|
||||
"""
|
||||
spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml")
|
||||
corpus = generate(spec, seed=0)
|
||||
pred = credential_jaccard_clusterer(corpus, threshold=0.5)
|
||||
metrics = score(corpus.truth_labels(), pred)
|
||||
# The two campaigns must be merged by this clusterer.
|
||||
assert len(set(pred.values())) == 1, (
|
||||
"credential-Jaccard clusterer should merge both campaigns into one"
|
||||
)
|
||||
# And homogeneity must collapse — that's the signal a fixture-aware
|
||||
# CI gate would use to reject the bad clusterer.
|
||||
assert metrics["homogeneity"] == pytest.approx(0.0)
|
||||
|
||||
|
||||
def test_naive_clusterer_does_not_fool_the_fixture() -> None:
|
||||
"""
|
||||
Belt-and-braces: even though the bad clusterer collapses
|
||||
homogeneity, it might still pass *some* metrics (completeness is
|
||||
actually 1.0 — all members of each true campaign land in the
|
||||
single mega-cluster). The fixture's bound floor on homogeneity
|
||||
(0.90) must reject it.
|
||||
"""
|
||||
spec = load_yaml(FIXTURE_DIR / "shared_wordlist.yaml")
|
||||
corpus = generate(spec, seed=0)
|
||||
pred = credential_jaccard_clusterer(corpus, threshold=0.5)
|
||||
metrics = score(corpus.truth_labels(), pred)
|
||||
bounds = {
|
||||
"adjusted_rand_index": 0.85,
|
||||
"homogeneity": 0.90,
|
||||
"completeness": 0.80,
|
||||
"singleton_recall": 0.95,
|
||||
}
|
||||
breaches = [k for k, floor in bounds.items() if metrics[k] < floor]
|
||||
assert "homogeneity" in breaches, (
|
||||
f"fixture failed to catch the bad clusterer; observed metrics: {metrics}"
|
||||
)
|
||||
Reference in New Issue
Block a user