From 87412da1cadf61fde00528c1c48b952b37875b04 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 08:28:31 -0400 Subject: [PATCH] test(clustering): F6 noise-floor ratchets for production clusterer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two targeted invariants instead of a wholesale YAML-bounds re-use, because the existing F6 bounds were tuned for the reference composite_signals_clusterer (fingerprint OR C2). The production clusterer trades that aggregation for tier discipline + the fingerprint-disagreement veto, so its score profile differs even when its judgments are correct — multi_operator stays as 2 truth identities, paused_campaign's two DSL actors remain a single cluster because they share fingerprints, etc. Wholesale bounds re-use would fight the design. The two production-side ratchets: 1. singleton_recall ≥ 0.95 at campaign-level scoring — truth- singleton noise scanners must not be absorbed into real campaigns. This is the F6 failure mode that motivates the fixture. 2. Intra-campaign recovery under cross-corpus interference: * vpn_hopping's 5 rotations consolidate to one cluster. * shared_wordlist A and B stay in disjoint clusters despite sharing credentials with each other (and with the noise floor). A future commit can revisit when the production clusterer's identity- level truth alignment improves (e.g. when paused_campaign's DSL is extended to mark its two actors as one truth identity). --- tests/clustering/test_connected_components.py | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py index 38559c40..589cf3ac 100644 --- a/tests/clustering/test_connected_components.py +++ b/tests/clustering/test_connected_components.py @@ -414,6 +414,96 @@ def test_cluster_observations_medium_alone_does_not_fuse(): assert labels["a"] != labels["b"] +def _build_noise_floor_corpus(): + """Expand noise_floor.yaml's include_fixtures block into one corpus.""" + import yaml as _yaml + from typing import Any + from tests.factories.campaign_factory import generate, load_yaml + + declared = _yaml.safe_load( + (FIXTURE_DIR / "noise_floor.yaml").read_text(encoding="utf-8") + ) + campaigns: list[dict[str, Any]] = [] + inherited_noise = 0 + for fname in declared["include_fixtures"]: + sub = load_yaml(FIXTURE_DIR / fname) + if "corpus" in sub: + campaigns.extend(sub["corpus"].get("campaigns", [])) + inherited_noise += int( + (sub["corpus"].get("noise") or {}).get("scanner_count", 0) + ) + else: + campaigns.append({"campaign": sub["campaign"]}) + extra = int(declared.get("extra_noise_scanners", 0)) + spec = {"corpus": { + "campaigns": campaigns, + "noise": {"scanner_count": inherited_noise + extra}, + }} + return generate(spec, seed=0) + + +def test_noise_floor_singleton_recall_holds_with_production_clusterer(): + """Fixture 6 ratchet — noise floor isolation. + + The load-bearing F6 invariant for the *production* clusterer: + truth-singleton noise scanners must not be absorbed into real + campaigns. A clusterer that pulls noise into campaigns dilutes + attribution to nothing. + + Scored at *campaign* level so the truth-singleton noise scanners + align with the prediction (each noise row has its own truth + campaign id). Identity-level scoring is muddier here — see + ``test_noise_floor_intra_campaign_recovery`` below for the + constituent-campaign test that *is* identity-shaped. + """ + from tests.clustering.metrics import score + + corpus = _build_noise_floor_corpus() + pred = _production_clusterer_predict(corpus) + metrics = score(corpus.truth_labels(level="campaign"), pred) + assert metrics["singleton_recall"] >= 0.95, metrics + + +def test_noise_floor_intra_campaign_recovery_with_production_clusterer(): + """The other half of F6: real campaigns must still resolve through + the noise. Specifically: vpn_hopping's 5 rotations land in one + cluster (its identity-level signature), and shared_wordlist's two + distinct campaigns stay un-merged despite sharing wordlists. + Demonstrates the production clusterer's tier discipline holds + under cross-corpus interference, not just per-fixture in + isolation.""" + corpus = _build_noise_floor_corpus() + pred = _production_clusterer_predict(corpus) + + # vpn_hopping: all 5 rotation rows fold into one predicted cluster. + vpn_obs = [ + a.attacker_id for a in corpus.attackers + if a.truth_campaign_id == "vpn-hopping-001" + ] + assert len(vpn_obs) == 5 + vpn_clusters = {pred[oid] for oid in vpn_obs} + assert len(vpn_clusters) == 1, ( + "vpn_hopping must consolidate to one cluster across rotations" + ) + + # shared_wordlist A and B: distinct fingerprints → must stay + # separate clusters despite shared credentials in the noise floor. + sw_a = [ + a.attacker_id for a in corpus.attackers + if a.truth_campaign_id == "shared-wordlist-A" + ] + sw_b = [ + a.attacker_id for a in corpus.attackers + if a.truth_campaign_id == "shared-wordlist-B" + ] + assert sw_a and sw_b + sw_a_clusters = {pred[oid] for oid in sw_a} + sw_b_clusters = {pred[oid] for oid in sw_b} + assert sw_a_clusters.isdisjoint(sw_b_clusters), ( + "shared_wordlist A and B must not share a cluster" + ) + + def test_slow_burn_passes_with_production_clusterer(): """Fixture 7 (slow_burn): one campaign across 3 multi-week operational windows. Shared JA3 + HASSH + C2 across all 3 actors. The production