diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py index 38559c40..589cf3ac 100644 --- a/tests/clustering/test_connected_components.py +++ b/tests/clustering/test_connected_components.py @@ -414,6 +414,96 @@ def test_cluster_observations_medium_alone_does_not_fuse(): assert labels["a"] != labels["b"] +def _build_noise_floor_corpus(): + """Expand noise_floor.yaml's include_fixtures block into one corpus.""" + import yaml as _yaml + from typing import Any + from tests.factories.campaign_factory import generate, load_yaml + + declared = _yaml.safe_load( + (FIXTURE_DIR / "noise_floor.yaml").read_text(encoding="utf-8") + ) + campaigns: list[dict[str, Any]] = [] + inherited_noise = 0 + for fname in declared["include_fixtures"]: + sub = load_yaml(FIXTURE_DIR / fname) + if "corpus" in sub: + campaigns.extend(sub["corpus"].get("campaigns", [])) + inherited_noise += int( + (sub["corpus"].get("noise") or {}).get("scanner_count", 0) + ) + else: + campaigns.append({"campaign": sub["campaign"]}) + extra = int(declared.get("extra_noise_scanners", 0)) + spec = {"corpus": { + "campaigns": campaigns, + "noise": {"scanner_count": inherited_noise + extra}, + }} + return generate(spec, seed=0) + + +def test_noise_floor_singleton_recall_holds_with_production_clusterer(): + """Fixture 6 ratchet — noise floor isolation. + + The load-bearing F6 invariant for the *production* clusterer: + truth-singleton noise scanners must not be absorbed into real + campaigns. A clusterer that pulls noise into campaigns dilutes + attribution to nothing. + + Scored at *campaign* level so the truth-singleton noise scanners + align with the prediction (each noise row has its own truth + campaign id). Identity-level scoring is muddier here — see + ``test_noise_floor_intra_campaign_recovery`` below for the + constituent-campaign test that *is* identity-shaped. + """ + from tests.clustering.metrics import score + + corpus = _build_noise_floor_corpus() + pred = _production_clusterer_predict(corpus) + metrics = score(corpus.truth_labels(level="campaign"), pred) + assert metrics["singleton_recall"] >= 0.95, metrics + + +def test_noise_floor_intra_campaign_recovery_with_production_clusterer(): + """The other half of F6: real campaigns must still resolve through + the noise. Specifically: vpn_hopping's 5 rotations land in one + cluster (its identity-level signature), and shared_wordlist's two + distinct campaigns stay un-merged despite sharing wordlists. + Demonstrates the production clusterer's tier discipline holds + under cross-corpus interference, not just per-fixture in + isolation.""" + corpus = _build_noise_floor_corpus() + pred = _production_clusterer_predict(corpus) + + # vpn_hopping: all 5 rotation rows fold into one predicted cluster. + vpn_obs = [ + a.attacker_id for a in corpus.attackers + if a.truth_campaign_id == "vpn-hopping-001" + ] + assert len(vpn_obs) == 5 + vpn_clusters = {pred[oid] for oid in vpn_obs} + assert len(vpn_clusters) == 1, ( + "vpn_hopping must consolidate to one cluster across rotations" + ) + + # shared_wordlist A and B: distinct fingerprints → must stay + # separate clusters despite shared credentials in the noise floor. + sw_a = [ + a.attacker_id for a in corpus.attackers + if a.truth_campaign_id == "shared-wordlist-A" + ] + sw_b = [ + a.attacker_id for a in corpus.attackers + if a.truth_campaign_id == "shared-wordlist-B" + ] + assert sw_a and sw_b + sw_a_clusters = {pred[oid] for oid in sw_a} + sw_b_clusters = {pred[oid] for oid in sw_b} + assert sw_a_clusters.isdisjoint(sw_b_clusters), ( + "shared_wordlist A and B must not share a cluster" + ) + + def test_slow_burn_passes_with_production_clusterer(): """Fixture 7 (slow_burn): one campaign across 3 multi-week operational windows. Shared JA3 + HASSH + C2 across all 3 actors. The production