168 lines
6.3 KiB
Python
168 lines
6.3 KiB
Python
"""
|
|
End-to-end pipeline test for fixture 6 (noise_floor).
|
|
|
|
Composite corpus: bundles all five prior fixtures' campaigns + 10
|
|
Delivery-only noise scanners on top of lone_wolf's 8 inherited
|
|
ones. The fixture exists to catch cross-corpus interference —
|
|
signal collisions, factory ID re-use, clusterer ambiguity that
|
|
shows up only when multiple campaigns are scored together. Each
|
|
constituent fixture already ships its own in-fixture adversarial
|
|
test; fixture 6 covers a different failure class.
|
|
|
|
The composition is declared in `noise_floor.yaml` via an
|
|
``include_fixtures`` block (a fixture-6-specific format). The
|
|
loader in this test file expands it into a full
|
|
``corpus.campaigns`` spec at runtime, so the factory itself stays
|
|
unaware of the include mechanism.
|
|
|
|
Three tests cover this:
|
|
|
|
1. `test_noise_floor_corpus_integrity` — every constituent
|
|
fixture's campaigns + actors are present in the merged corpus
|
|
with their truth labels intact, and the 10 extra noise scanners
|
|
are present alongside lone_wolf's 8 (truth-singletons all).
|
|
|
|
2. `test_noise_floor_pipeline_passes_bounds` — runs
|
|
`composite_signals_clusterer` against the merged corpus.
|
|
Approximates the planned similarity graph well enough that
|
|
every campaign resolves and every singleton stays singleton.
|
|
Trips the bound floors if any cross-fixture interference creeps
|
|
in (signal collisions across fixtures' JA3/HASSH/C2 strings).
|
|
|
|
3. `test_noise_floor_singleton_recall_holds` — explicit assertion
|
|
that every truth-singleton (the lone wolf, the 8 inherited noise
|
|
scanners, the 10 extra noise scanners — 19 total) ends up in a
|
|
singleton predicted cluster. Singleton recall is the load-
|
|
bearing metric for this fixture.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
from tests.clustering.fixture_harness import (
|
|
assert_fixture_bounds,
|
|
composite_signals_clusterer,
|
|
)
|
|
from tests.clustering.metrics import score
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns"
|
|
FIXTURE_YAML = FIXTURE_DIR / "noise_floor.yaml"
|
|
EXPECTED_YAML = FIXTURE_DIR / "noise_floor.expected.yaml"
|
|
|
|
|
|
def _expand_noise_floor_spec() -> dict[str, Any]:
|
|
"""Read noise_floor.yaml's include_fixtures block, load each
|
|
constituent fixture, and merge their campaigns into one
|
|
corpus-shaped spec. Returns a dict the factory's ``generate()``
|
|
accepts as-is."""
|
|
declared = yaml.safe_load(FIXTURE_YAML.read_text(encoding="utf-8"))
|
|
campaigns: list[dict[str, Any]] = []
|
|
inherited_noise = 0
|
|
for fname in declared["include_fixtures"]:
|
|
sub = load_yaml(FIXTURE_DIR / fname)
|
|
if "corpus" in sub:
|
|
campaigns.extend(sub["corpus"].get("campaigns", []))
|
|
inherited_noise += int(
|
|
(sub["corpus"].get("noise") or {}).get("scanner_count", 0)
|
|
)
|
|
else:
|
|
campaigns.append({"campaign": sub["campaign"]})
|
|
extra = int(declared.get("extra_noise_scanners", 0))
|
|
return {
|
|
"corpus": {
|
|
"campaigns": campaigns,
|
|
"noise": {"scanner_count": inherited_noise + extra},
|
|
}
|
|
}
|
|
|
|
|
|
def test_noise_floor_corpus_integrity() -> None:
|
|
spec = _expand_noise_floor_spec()
|
|
corpus = generate(spec, seed=0)
|
|
|
|
truth_campaigns = {a.truth_campaign_id for a in corpus.attackers}
|
|
|
|
# Every constituent fixture's campaign id appears in the merged
|
|
# corpus. Any missing id means the loader dropped a fixture.
|
|
expected_campaign_ids = {
|
|
"shared-wordlist-A",
|
|
"shared-wordlist-B",
|
|
"vpn-hopping-001",
|
|
"lone-wolf-001",
|
|
"paused-campaign-001",
|
|
"multi-operator-001",
|
|
}
|
|
assert expected_campaign_ids <= truth_campaigns, (
|
|
f"missing campaign ids: {expected_campaign_ids - truth_campaigns}"
|
|
)
|
|
|
|
# Noise scanner count: 8 inherited from lone_wolf + 10 added.
|
|
noise_attackers = [
|
|
a for a in corpus.attackers
|
|
if a.truth_campaign_id.startswith("noise-scanner-")
|
|
]
|
|
assert len(noise_attackers) == 18
|
|
|
|
# Every noise scanner is its own truth-campaign (singleton).
|
|
noise_truth = {a.truth_campaign_id for a in noise_attackers}
|
|
assert len(noise_truth) == 18
|
|
|
|
# Real-campaign attackers: 2 (shared_wordlist) + 5 (vpn_hopping) +
|
|
# 1 (lone_wolf wolf) + 2 (paused_campaign) + 2 (multi_operator)
|
|
# = 12.
|
|
real_attackers = [
|
|
a for a in corpus.attackers
|
|
if not a.truth_campaign_id.startswith("noise-scanner-")
|
|
]
|
|
assert len(real_attackers) == 12, (
|
|
f"expected 12 campaign-driven attackers, got {len(real_attackers)}"
|
|
)
|
|
|
|
|
|
def test_noise_floor_pipeline_passes_bounds() -> None:
|
|
spec = _expand_noise_floor_spec()
|
|
corpus = generate(spec, seed=0)
|
|
metrics = assert_fixture_bounds(corpus, composite_signals_clusterer, EXPECTED_YAML)
|
|
# The combined corpus is heterogeneous — a perfect ARI is not
|
|
# required (and the bound is loose at 0.85). Verify the harness
|
|
# produced sensible numbers anyway.
|
|
assert metrics["adjusted_rand_index"] >= 0.85
|
|
assert metrics["singleton_recall"] >= 0.95
|
|
|
|
|
|
def test_noise_floor_singleton_recall_holds() -> None:
|
|
"""Every truth-singleton (lone wolf + 18 noise) must remain
|
|
singleton under the composite clusterer. Noise absorption is the
|
|
failure mode that makes campaign attribution useless in practice.
|
|
"""
|
|
spec = _expand_noise_floor_spec()
|
|
corpus = generate(spec, seed=0)
|
|
pred = composite_signals_clusterer(corpus)
|
|
|
|
truth = corpus.truth_labels(level="campaign")
|
|
from collections import Counter
|
|
truth_counts = Counter(truth.values())
|
|
pred_counts = Counter(pred.values())
|
|
|
|
true_singletons = [aid for aid, t in truth.items() if truth_counts[t] == 1]
|
|
# Truth-singletons in this composite:
|
|
# 1 lone wolf + 18 noise + 2 shared_wordlist actors (each
|
|
# campaign has one actor; campaign size 1 means truth-singleton)
|
|
# = 21.
|
|
assert len(true_singletons) == 21, (
|
|
f"expected 21 truth-singletons, got {len(true_singletons)}"
|
|
)
|
|
absorbed = [aid for aid in true_singletons if pred_counts[pred[aid]] != 1]
|
|
assert not absorbed, (
|
|
f"composite clusterer absorbed {len(absorbed)} singletons into "
|
|
f"larger clusters: {absorbed[:5]}…"
|
|
)
|
|
|
|
metrics = score(truth, pred)
|
|
assert metrics["singleton_recall"] == pytest.approx(1.0)
|