test(clustering): F6 noise-floor ratchets for production clusterer

Two targeted invariants instead of a wholesale YAML-bounds re-use,
because the existing F6 bounds were tuned for the reference
composite_signals_clusterer (fingerprint OR C2). The production
clusterer trades that aggregation for tier discipline + the
fingerprint-disagreement veto, so its score profile differs even
when its judgments are correct — multi_operator stays as 2 truth
identities, paused_campaign's two DSL actors remain a single cluster
because they share fingerprints, etc. Wholesale bounds re-use would
fight the design.

The two production-side ratchets:

1. singleton_recall ≥ 0.95 at campaign-level scoring — truth-
   singleton noise scanners must not be absorbed into real campaigns.
   This is the F6 failure mode that motivates the fixture.

2. Intra-campaign recovery under cross-corpus interference:
   * vpn_hopping's 5 rotations consolidate to one cluster.
   * shared_wordlist A and B stay in disjoint clusters despite
     sharing credentials with each other (and with the noise floor).

A future commit can revisit when the production clusterer's identity-
level truth alignment improves (e.g. when paused_campaign's DSL is
extended to mark its two actors as one truth identity).
This commit is contained in:
2026-04-26 08:28:31 -04:00
parent 7923006203
commit 87412da1ca

View File

@@ -414,6 +414,96 @@ def test_cluster_observations_medium_alone_does_not_fuse():
assert labels["a"] != labels["b"]
def _build_noise_floor_corpus():
"""Expand noise_floor.yaml's include_fixtures block into one corpus."""
import yaml as _yaml
from typing import Any
from tests.factories.campaign_factory import generate, load_yaml
declared = _yaml.safe_load(
(FIXTURE_DIR / "noise_floor.yaml").read_text(encoding="utf-8")
)
campaigns: list[dict[str, Any]] = []
inherited_noise = 0
for fname in declared["include_fixtures"]:
sub = load_yaml(FIXTURE_DIR / fname)
if "corpus" in sub:
campaigns.extend(sub["corpus"].get("campaigns", []))
inherited_noise += int(
(sub["corpus"].get("noise") or {}).get("scanner_count", 0)
)
else:
campaigns.append({"campaign": sub["campaign"]})
extra = int(declared.get("extra_noise_scanners", 0))
spec = {"corpus": {
"campaigns": campaigns,
"noise": {"scanner_count": inherited_noise + extra},
}}
return generate(spec, seed=0)
def test_noise_floor_singleton_recall_holds_with_production_clusterer():
"""Fixture 6 ratchet — noise floor isolation.
The load-bearing F6 invariant for the *production* clusterer:
truth-singleton noise scanners must not be absorbed into real
campaigns. A clusterer that pulls noise into campaigns dilutes
attribution to nothing.
Scored at *campaign* level so the truth-singleton noise scanners
align with the prediction (each noise row has its own truth
campaign id). Identity-level scoring is muddier here — see
``test_noise_floor_intra_campaign_recovery`` below for the
constituent-campaign test that *is* identity-shaped.
"""
from tests.clustering.metrics import score
corpus = _build_noise_floor_corpus()
pred = _production_clusterer_predict(corpus)
metrics = score(corpus.truth_labels(level="campaign"), pred)
assert metrics["singleton_recall"] >= 0.95, metrics
def test_noise_floor_intra_campaign_recovery_with_production_clusterer():
"""The other half of F6: real campaigns must still resolve through
the noise. Specifically: vpn_hopping's 5 rotations land in one
cluster (its identity-level signature), and shared_wordlist's two
distinct campaigns stay un-merged despite sharing wordlists.
Demonstrates the production clusterer's tier discipline holds
under cross-corpus interference, not just per-fixture in
isolation."""
corpus = _build_noise_floor_corpus()
pred = _production_clusterer_predict(corpus)
# vpn_hopping: all 5 rotation rows fold into one predicted cluster.
vpn_obs = [
a.attacker_id for a in corpus.attackers
if a.truth_campaign_id == "vpn-hopping-001"
]
assert len(vpn_obs) == 5
vpn_clusters = {pred[oid] for oid in vpn_obs}
assert len(vpn_clusters) == 1, (
"vpn_hopping must consolidate to one cluster across rotations"
)
# shared_wordlist A and B: distinct fingerprints → must stay
# separate clusters despite shared credentials in the noise floor.
sw_a = [
a.attacker_id for a in corpus.attackers
if a.truth_campaign_id == "shared-wordlist-A"
]
sw_b = [
a.attacker_id for a in corpus.attackers
if a.truth_campaign_id == "shared-wordlist-B"
]
assert sw_a and sw_b
sw_a_clusters = {pred[oid] for oid in sw_a}
sw_b_clusters = {pred[oid] for oid in sw_b}
assert sw_a_clusters.isdisjoint(sw_b_clusters), (
"shared_wordlist A and B must not share a cluster"
)
def test_slow_burn_passes_with_production_clusterer():
"""Fixture 7 (slow_burn): one campaign across 3 multi-week operational
windows. Shared JA3 + HASSH + C2 across all 3 actors. The production