From 6a4592a8f54944bb5866189a4de92688bfcdeaae Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 08:25:23 -0400 Subject: [PATCH] test(clustering): low/very-low tier safety + F1/F2 ratchets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pins down the tier-discipline contract end-to-end: - Credentials-only overlap doesn't fuse observations (F1 in miniature). - ASN-only overlap doesn't fuse observations (F2 in miniature). - All three weak tiers (medium + low + very-low) stacked still don't fuse — only a high-tier signal does. - F1 (shared_wordlist) at identity-level: no false merges, every row is its own predicted cluster, homogeneity = 1.0. - F2 (vpn_hopping): 5 distinct ASNs collapse into 1 predicted cluster, proving JA3 / HASSH dominate ASN as the design requires. The combination math itself was wired in commit 5; this commit is the failure-mode regression suite that gates future tuning of the tier weights. --- tests/clustering/test_connected_components.py | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py index 75b50be6..8f65e67c 100644 --- a/tests/clustering/test_connected_components.py +++ b/tests/clustering/test_connected_components.py @@ -321,6 +321,84 @@ def test_multi_operator_keeps_distinct_identities_with_production_clusterer(): assert metrics["homogeneity"] == pytest.approx(1.0) +def test_cluster_observations_credentials_alone_does_not_fuse(): + """Two observations sharing a credential set but nothing else + must stay distinct. Fixture 1's failure mode in miniature.""" + a = Observation( + observation_id="a", + credentials=frozenset({("root", "toor"), ("admin", "admin")}), + ) + b = Observation( + observation_id="b", + credentials=frozenset({("root", "toor"), ("admin", "admin")}), + ) + labels = cluster_observations([a, b]) + assert labels["a"] != labels["b"] + + +def test_cluster_observations_asn_alone_does_not_fuse(): + """Two observations sharing only ASN must stay distinct. + Fixture 2's failure mode in miniature — VPN/proxy hopping + fragments ASN within a single identity, and ASN sharing + across identities is common; can't drive clustering.""" + a = Observation(observation_id="a", asn=64500) + b = Observation(observation_id="b", asn=64500) + labels = cluster_observations([a, b]) + assert labels["a"] != labels["b"] + + +def test_cluster_observations_all_weak_signals_combined_does_not_fuse(): + """Even credentials + commands + ASN together don't drive + clustering — only a high-tier signal does. Stack everything + a campaign-level F1+F2 hybrid would have, confirm singletons.""" + a = Observation( + observation_id="a", + asn=64500, + credentials=frozenset({("root", "toor"), ("admin", "admin")}), + commands_by_phase={"discovery": ("ls", "id")}, + ) + b = Observation( + observation_id="b", + asn=64500, + credentials=frozenset({("root", "toor"), ("admin", "admin")}), + commands_by_phase={"discovery": ("ls", "id")}, + ) + labels = cluster_observations([a, b]) + assert labels["a"] != labels["b"] + + +def test_shared_wordlist_no_false_merge_at_identity_level(): + """F1 ratchet: even at identity level (where each row is its own + identity), the production clusterer must not fuse credential- + sharing observations. Tightens the F1 bound by asserting + completeness == 1.0 at identity-level scoring (no truth identity + is split, because every row is its own truth identity).""" + from tests.factories.campaign_factory import generate, load_yaml + from tests.clustering.metrics import score + + corpus = generate(load_yaml(FIXTURE_DIR / "shared_wordlist.yaml"), seed=0) + pred = _production_clusterer_predict(corpus) + metrics = score(corpus.truth_labels(level="identity"), pred) + # Each row must land in its own predicted cluster — anything else + # is a false merge driven by the credential-overlap signal. + assert len(set(pred.values())) == len(corpus.attackers) + assert metrics["homogeneity"] == pytest.approx(1.0) + + +def test_vpn_hopping_asn_alone_would_have_fragmented_but_doesnt(): + """F2 ratchet: vpn_hopping has 5 distinct ASNs across one identity. + A clusterer that lets ASN drive would split into 5; the production + clusterer doesn't because ASN is very-low-tier and JA3 / HASSH + are stable. Confirms tier discipline holds end-to-end.""" + from tests.factories.campaign_factory import generate, load_yaml + corpus = generate(load_yaml(FIXTURE_DIR / "vpn_hopping.yaml"), seed=0) + pred = _production_clusterer_predict(corpus) + asns = {a.asn for a in corpus.attackers} + assert len(asns) == 5, "fixture sanity: 5 distinct ASNs" + # All 5 land in one cluster, not 5. + assert len(set(pred.values())) == 1 + + def test_cluster_observations_medium_alone_does_not_fuse(): """Two observations sharing only command-sequence (medium-tier) must stay in distinct clusters — medium is a supporting signal."""