""" End-to-end pipeline test for fixture 6 (noise_floor). Composite corpus: bundles all five prior fixtures' campaigns + 10 Delivery-only noise scanners on top of lone_wolf's 8 inherited ones. The fixture exists to catch cross-corpus interference — signal collisions, factory ID re-use, clusterer ambiguity that shows up only when multiple campaigns are scored together. Each constituent fixture already ships its own in-fixture adversarial test; fixture 6 covers a different failure class. The composition is declared in `noise_floor.yaml` via an ``include_fixtures`` block (a fixture-6-specific format). The loader in this test file expands it into a full ``corpus.campaigns`` spec at runtime, so the factory itself stays unaware of the include mechanism. Three tests cover this: 1. `test_noise_floor_corpus_integrity` — every constituent fixture's campaigns + actors are present in the merged corpus with their truth labels intact, and the 10 extra noise scanners are present alongside lone_wolf's 8 (truth-singletons all). 2. `test_noise_floor_pipeline_passes_bounds` — runs `composite_signals_clusterer` against the merged corpus. Approximates the planned similarity graph well enough that every campaign resolves and every singleton stays singleton. Trips the bound floors if any cross-fixture interference creeps in (signal collisions across fixtures' JA3/HASSH/C2 strings). 3. `test_noise_floor_singleton_recall_holds` — explicit assertion that every truth-singleton (the lone wolf, the 8 inherited noise scanners, the 10 extra noise scanners — 19 total) ends up in a singleton predicted cluster. Singleton recall is the load- bearing metric for this fixture. """ from __future__ import annotations from pathlib import Path from typing import Any import pytest import yaml from tests.clustering.fixture_harness import ( assert_fixture_bounds, composite_signals_clusterer, ) from tests.clustering.metrics import score from tests.factories.campaign_factory import generate, load_yaml FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" FIXTURE_YAML = FIXTURE_DIR / "noise_floor.yaml" EXPECTED_YAML = FIXTURE_DIR / "noise_floor.expected.yaml" def _expand_noise_floor_spec() -> dict[str, Any]: """Read noise_floor.yaml's include_fixtures block, load each constituent fixture, and merge their campaigns into one corpus-shaped spec. Returns a dict the factory's ``generate()`` accepts as-is.""" declared = yaml.safe_load(FIXTURE_YAML.read_text(encoding="utf-8")) campaigns: list[dict[str, Any]] = [] inherited_noise = 0 for fname in declared["include_fixtures"]: sub = load_yaml(FIXTURE_DIR / fname) if "corpus" in sub: campaigns.extend(sub["corpus"].get("campaigns", [])) inherited_noise += int( (sub["corpus"].get("noise") or {}).get("scanner_count", 0) ) else: campaigns.append({"campaign": sub["campaign"]}) extra = int(declared.get("extra_noise_scanners", 0)) return { "corpus": { "campaigns": campaigns, "noise": {"scanner_count": inherited_noise + extra}, } } def test_noise_floor_corpus_integrity() -> None: spec = _expand_noise_floor_spec() corpus = generate(spec, seed=0) truth_campaigns = {a.truth_campaign_id for a in corpus.attackers} # Every constituent fixture's campaign id appears in the merged # corpus. Any missing id means the loader dropped a fixture. expected_campaign_ids = { "shared-wordlist-A", "shared-wordlist-B", "vpn-hopping-001", "lone-wolf-001", "paused-campaign-001", "multi-operator-001", } assert expected_campaign_ids <= truth_campaigns, ( f"missing campaign ids: {expected_campaign_ids - truth_campaigns}" ) # Noise scanner count: 8 inherited from lone_wolf + 10 added. noise_attackers = [ a for a in corpus.attackers if a.truth_campaign_id.startswith("noise-scanner-") ] assert len(noise_attackers) == 18 # Every noise scanner is its own truth-campaign (singleton). noise_truth = {a.truth_campaign_id for a in noise_attackers} assert len(noise_truth) == 18 # Real-campaign attackers: 2 (shared_wordlist) + 5 (vpn_hopping) + # 1 (lone_wolf wolf) + 2 (paused_campaign) + 2 (multi_operator) # = 12. real_attackers = [ a for a in corpus.attackers if not a.truth_campaign_id.startswith("noise-scanner-") ] assert len(real_attackers) == 12, ( f"expected 12 campaign-driven attackers, got {len(real_attackers)}" ) def test_noise_floor_pipeline_passes_bounds() -> None: spec = _expand_noise_floor_spec() corpus = generate(spec, seed=0) metrics = assert_fixture_bounds(corpus, composite_signals_clusterer, EXPECTED_YAML) # The combined corpus is heterogeneous — a perfect ARI is not # required (and the bound is loose at 0.85). Verify the harness # produced sensible numbers anyway. assert metrics["adjusted_rand_index"] >= 0.85 assert metrics["singleton_recall"] >= 0.95 def test_noise_floor_singleton_recall_holds() -> None: """Every truth-singleton (lone wolf + 18 noise) must remain singleton under the composite clusterer. Noise absorption is the failure mode that makes campaign attribution useless in practice. """ spec = _expand_noise_floor_spec() corpus = generate(spec, seed=0) pred = composite_signals_clusterer(corpus) truth = corpus.truth_labels(level="campaign") from collections import Counter truth_counts = Counter(truth.values()) pred_counts = Counter(pred.values()) true_singletons = [aid for aid, t in truth.items() if truth_counts[t] == 1] # Truth-singletons in this composite: # 1 lone wolf + 18 noise + 2 shared_wordlist actors (each # campaign has one actor; campaign size 1 means truth-singleton) # = 21. assert len(true_singletons) == 21, ( f"expected 21 truth-singletons, got {len(true_singletons)}" ) absorbed = [aid for aid in true_singletons if pred_counts[pred[aid]] != 1] assert not absorbed, ( f"composite clusterer absorbed {len(absorbed)} singletons into " f"larger clusters: {absorbed[:5]}…" ) metrics = score(truth, pred) assert metrics["singleton_recall"] == pytest.approx(1.0)