test(clustering): fixture 6 noise_floor (composite + cross-corpus)

Bundles all five prior fixtures' campaigns into one corpus alongside
10 fresh Delivery-only noise scanners (on top of lone_wolf's 8
inherited). The fixture covers cross-corpus interference — signal
collisions across fixtures' JA3/HASSH/C2 strings, factory ID re-use,
clusterer ambiguity that only manifests when multiple campaigns
score together. Each constituent fixture already ships its own
in-fixture adversarial test; this one is the control for the class
of failures that single-corpus fixtures cannot catch.

Composition is declared via a fixture-6-specific include_fixtures
block in noise_floor.yaml. The test file's loader expands it into
a full corpus.campaigns spec at runtime so the factory itself stays
unaware — no factory primitive added for what only this fixture
needs. The 8 noise scanners declared by lone_wolf flow through
naturally; the extra_noise_scanners count adds 10 more.

composite_signals_clusterer (added in the fixture-5 commit) is the
pass clusterer — union-find combining (ja3, hassh) match OR
overlapping C2 callback. Approximates the planned similarity graph
well enough that every campaign resolves and every singleton stays
singleton in the merged corpus.

Three tests: corpus integrity (every campaign id present, 12
campaign-driven attackers + 18 noise = 30 total), pipeline pass
against the global bounds, and an explicit singleton-recall
assertion (21 truth-singletons — 1 lone wolf, 18 noise, 2
shared_wordlist actors whose campaigns are size 1 — all kept
singleton by the composite clusterer). Singleton recall is the
load-bearing metric here: noise absorption is the failure mode
that makes campaign attribution useless in practice.
This commit is contained in:
2026-04-26 07:49:36 -04:00
parent 27f7de9886
commit 7021fda0e6
3 changed files with 225 additions and 0 deletions

View File

@@ -0,0 +1,167 @@
"""
End-to-end pipeline test for fixture 6 (noise_floor).
Composite corpus: bundles all five prior fixtures' campaigns + 10
Delivery-only noise scanners on top of lone_wolf's 8 inherited
ones. The fixture exists to catch cross-corpus interference —
signal collisions, factory ID re-use, clusterer ambiguity that
shows up only when multiple campaigns are scored together. Each
constituent fixture already ships its own in-fixture adversarial
test; fixture 6 covers a different failure class.
The composition is declared in `noise_floor.yaml` via an
``include_fixtures`` block (a fixture-6-specific format). The
loader in this test file expands it into a full
``corpus.campaigns`` spec at runtime, so the factory itself stays
unaware of the include mechanism.
Three tests cover this:
1. `test_noise_floor_corpus_integrity` — every constituent
fixture's campaigns + actors are present in the merged corpus
with their truth labels intact, and the 10 extra noise scanners
are present alongside lone_wolf's 8 (truth-singletons all).
2. `test_noise_floor_pipeline_passes_bounds` — runs
`composite_signals_clusterer` against the merged corpus.
Approximates the planned similarity graph well enough that
every campaign resolves and every singleton stays singleton.
Trips the bound floors if any cross-fixture interference creeps
in (signal collisions across fixtures' JA3/HASSH/C2 strings).
3. `test_noise_floor_singleton_recall_holds` — explicit assertion
that every truth-singleton (the lone wolf, the 8 inherited noise
scanners, the 10 extra noise scanners — 19 total) ends up in a
singleton predicted cluster. Singleton recall is the load-
bearing metric for this fixture.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import pytest
import yaml
from tests.clustering.fixture_harness import (
assert_fixture_bounds,
composite_signals_clusterer,
)
from tests.clustering.metrics import score
from tests.factories.campaign_factory import generate, load_yaml
FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns"
FIXTURE_YAML = FIXTURE_DIR / "noise_floor.yaml"
EXPECTED_YAML = FIXTURE_DIR / "noise_floor.expected.yaml"
def _expand_noise_floor_spec() -> dict[str, Any]:
"""Read noise_floor.yaml's include_fixtures block, load each
constituent fixture, and merge their campaigns into one
corpus-shaped spec. Returns a dict the factory's ``generate()``
accepts as-is."""
declared = yaml.safe_load(FIXTURE_YAML.read_text(encoding="utf-8"))
campaigns: list[dict[str, Any]] = []
inherited_noise = 0
for fname in declared["include_fixtures"]:
sub = load_yaml(FIXTURE_DIR / fname)
if "corpus" in sub:
campaigns.extend(sub["corpus"].get("campaigns", []))
inherited_noise += int(
(sub["corpus"].get("noise") or {}).get("scanner_count", 0)
)
else:
campaigns.append({"campaign": sub["campaign"]})
extra = int(declared.get("extra_noise_scanners", 0))
return {
"corpus": {
"campaigns": campaigns,
"noise": {"scanner_count": inherited_noise + extra},
}
}
def test_noise_floor_corpus_integrity() -> None:
spec = _expand_noise_floor_spec()
corpus = generate(spec, seed=0)
truth_campaigns = {a.truth_campaign_id for a in corpus.attackers}
# Every constituent fixture's campaign id appears in the merged
# corpus. Any missing id means the loader dropped a fixture.
expected_campaign_ids = {
"shared-wordlist-A",
"shared-wordlist-B",
"vpn-hopping-001",
"lone-wolf-001",
"paused-campaign-001",
"multi-operator-001",
}
assert expected_campaign_ids <= truth_campaigns, (
f"missing campaign ids: {expected_campaign_ids - truth_campaigns}"
)
# Noise scanner count: 8 inherited from lone_wolf + 10 added.
noise_attackers = [
a for a in corpus.attackers
if a.truth_campaign_id.startswith("noise-scanner-")
]
assert len(noise_attackers) == 18
# Every noise scanner is its own truth-campaign (singleton).
noise_truth = {a.truth_campaign_id for a in noise_attackers}
assert len(noise_truth) == 18
# Real-campaign attackers: 2 (shared_wordlist) + 5 (vpn_hopping) +
# 1 (lone_wolf wolf) + 2 (paused_campaign) + 2 (multi_operator)
# = 12.
real_attackers = [
a for a in corpus.attackers
if not a.truth_campaign_id.startswith("noise-scanner-")
]
assert len(real_attackers) == 12, (
f"expected 12 campaign-driven attackers, got {len(real_attackers)}"
)
def test_noise_floor_pipeline_passes_bounds() -> None:
spec = _expand_noise_floor_spec()
corpus = generate(spec, seed=0)
metrics = assert_fixture_bounds(corpus, composite_signals_clusterer, EXPECTED_YAML)
# The combined corpus is heterogeneous — a perfect ARI is not
# required (and the bound is loose at 0.85). Verify the harness
# produced sensible numbers anyway.
assert metrics["adjusted_rand_index"] >= 0.85
assert metrics["singleton_recall"] >= 0.95
def test_noise_floor_singleton_recall_holds() -> None:
"""Every truth-singleton (lone wolf + 18 noise) must remain
singleton under the composite clusterer. Noise absorption is the
failure mode that makes campaign attribution useless in practice.
"""
spec = _expand_noise_floor_spec()
corpus = generate(spec, seed=0)
pred = composite_signals_clusterer(corpus)
truth = corpus.truth_labels(level="campaign")
from collections import Counter
truth_counts = Counter(truth.values())
pred_counts = Counter(pred.values())
true_singletons = [aid for aid, t in truth.items() if truth_counts[t] == 1]
# Truth-singletons in this composite:
# 1 lone wolf + 18 noise + 2 shared_wordlist actors (each
# campaign has one actor; campaign size 1 means truth-singleton)
# = 21.
assert len(true_singletons) == 21, (
f"expected 21 truth-singletons, got {len(true_singletons)}"
)
absorbed = [aid for aid in true_singletons if pred_counts[pred[aid]] != 1]
assert not absorbed, (
f"composite clusterer absorbed {len(absorbed)} singletons into "
f"larger clusters: {absorbed[:5]}"
)
metrics = score(truth, pred)
assert metrics["singleton_recall"] == pytest.approx(1.0)