test(clustering): low/very-low tier safety + F1/F2 ratchets
Pins down the tier-discipline contract end-to-end: - Credentials-only overlap doesn't fuse observations (F1 in miniature). - ASN-only overlap doesn't fuse observations (F2 in miniature). - All three weak tiers (medium + low + very-low) stacked still don't fuse — only a high-tier signal does. - F1 (shared_wordlist) at identity-level: no false merges, every row is its own predicted cluster, homogeneity = 1.0. - F2 (vpn_hopping): 5 distinct ASNs collapse into 1 predicted cluster, proving JA3 / HASSH dominate ASN as the design requires. The combination math itself was wired in commit 5; this commit is the failure-mode regression suite that gates future tuning of the tier weights.
This commit is contained in:
@@ -321,6 +321,84 @@ def test_multi_operator_keeps_distinct_identities_with_production_clusterer():
|
|||||||
assert metrics["homogeneity"] == pytest.approx(1.0)
|
assert metrics["homogeneity"] == pytest.approx(1.0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_cluster_observations_credentials_alone_does_not_fuse():
|
||||||
|
"""Two observations sharing a credential set but nothing else
|
||||||
|
must stay distinct. Fixture 1's failure mode in miniature."""
|
||||||
|
a = Observation(
|
||||||
|
observation_id="a",
|
||||||
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
||||||
|
)
|
||||||
|
b = Observation(
|
||||||
|
observation_id="b",
|
||||||
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
||||||
|
)
|
||||||
|
labels = cluster_observations([a, b])
|
||||||
|
assert labels["a"] != labels["b"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_cluster_observations_asn_alone_does_not_fuse():
|
||||||
|
"""Two observations sharing only ASN must stay distinct.
|
||||||
|
Fixture 2's failure mode in miniature — VPN/proxy hopping
|
||||||
|
fragments ASN within a single identity, and ASN sharing
|
||||||
|
across identities is common; can't drive clustering."""
|
||||||
|
a = Observation(observation_id="a", asn=64500)
|
||||||
|
b = Observation(observation_id="b", asn=64500)
|
||||||
|
labels = cluster_observations([a, b])
|
||||||
|
assert labels["a"] != labels["b"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_cluster_observations_all_weak_signals_combined_does_not_fuse():
|
||||||
|
"""Even credentials + commands + ASN together don't drive
|
||||||
|
clustering — only a high-tier signal does. Stack everything
|
||||||
|
a campaign-level F1+F2 hybrid would have, confirm singletons."""
|
||||||
|
a = Observation(
|
||||||
|
observation_id="a",
|
||||||
|
asn=64500,
|
||||||
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
||||||
|
commands_by_phase={"discovery": ("ls", "id")},
|
||||||
|
)
|
||||||
|
b = Observation(
|
||||||
|
observation_id="b",
|
||||||
|
asn=64500,
|
||||||
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
||||||
|
commands_by_phase={"discovery": ("ls", "id")},
|
||||||
|
)
|
||||||
|
labels = cluster_observations([a, b])
|
||||||
|
assert labels["a"] != labels["b"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_shared_wordlist_no_false_merge_at_identity_level():
|
||||||
|
"""F1 ratchet: even at identity level (where each row is its own
|
||||||
|
identity), the production clusterer must not fuse credential-
|
||||||
|
sharing observations. Tightens the F1 bound by asserting
|
||||||
|
completeness == 1.0 at identity-level scoring (no truth identity
|
||||||
|
is split, because every row is its own truth identity)."""
|
||||||
|
from tests.factories.campaign_factory import generate, load_yaml
|
||||||
|
from tests.clustering.metrics import score
|
||||||
|
|
||||||
|
corpus = generate(load_yaml(FIXTURE_DIR / "shared_wordlist.yaml"), seed=0)
|
||||||
|
pred = _production_clusterer_predict(corpus)
|
||||||
|
metrics = score(corpus.truth_labels(level="identity"), pred)
|
||||||
|
# Each row must land in its own predicted cluster — anything else
|
||||||
|
# is a false merge driven by the credential-overlap signal.
|
||||||
|
assert len(set(pred.values())) == len(corpus.attackers)
|
||||||
|
assert metrics["homogeneity"] == pytest.approx(1.0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_vpn_hopping_asn_alone_would_have_fragmented_but_doesnt():
|
||||||
|
"""F2 ratchet: vpn_hopping has 5 distinct ASNs across one identity.
|
||||||
|
A clusterer that lets ASN drive would split into 5; the production
|
||||||
|
clusterer doesn't because ASN is very-low-tier and JA3 / HASSH
|
||||||
|
are stable. Confirms tier discipline holds end-to-end."""
|
||||||
|
from tests.factories.campaign_factory import generate, load_yaml
|
||||||
|
corpus = generate(load_yaml(FIXTURE_DIR / "vpn_hopping.yaml"), seed=0)
|
||||||
|
pred = _production_clusterer_predict(corpus)
|
||||||
|
asns = {a.asn for a in corpus.attackers}
|
||||||
|
assert len(asns) == 5, "fixture sanity: 5 distinct ASNs"
|
||||||
|
# All 5 land in one cluster, not 5.
|
||||||
|
assert len(set(pred.values())) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_cluster_observations_medium_alone_does_not_fuse():
|
def test_cluster_observations_medium_alone_does_not_fuse():
|
||||||
"""Two observations sharing only command-sequence (medium-tier)
|
"""Two observations sharing only command-sequence (medium-tier)
|
||||||
must stay in distinct clusters — medium is a supporting signal."""
|
must stay in distinct clusters — medium is a supporting signal."""
|
||||||
|
|||||||
Reference in New Issue
Block a user