feat(clustering): fingerprint-disagreement veto for fixture 5

Two operators cooperating on one campaign can share C2 endpoints +
stage-1 payloads while running distinct tooling — fixture 5
(multi_operator) is the canonical demonstration. The identity
clusterer must NOT fuse them: shared infra is a campaign-level
signal, not an identity-level one. The campaign clusterer (downstream
work) handles that grouping over identities.

Mechanism: when two observations have non-null fingerprints AND the
fingerprints fully disagree, the high-weight tier drops the payload
and C2 contributions to zero. JA3 / HASSH agreement still returns
1.0 directly — no veto applies when something agrees. Partial
agreement (one slot agrees, another disagrees) is treated as
agreement, since stable-tool partial overlap is more consistent
with one identity than two.

The veto only triggers when there is actual disagreement evidence —
two un-fingerprinted observations sharing a C2 still cluster, since
the absence of fingerprints is not the same as disagreement on them.

Fixture 5 production-clusterer assertion added at identity level:
ARI = 1.0, homogeneity = 1.0, exactly 2 predicted clusters from
2 truth identities. Phase-handoff edges (from the TODO) belong to
the downstream campaign clusterer, not this identity clusterer.
This commit is contained in:
2026-04-26 08:24:22 -04:00
parent f7da33726c
commit ed323581fe
3 changed files with 123 additions and 0 deletions

View File

@@ -302,6 +302,25 @@ def test_paused_campaign_passes_with_production_clusterer():
)
def test_multi_operator_keeps_distinct_identities_with_production_clusterer():
"""Fixture 5 at identity-level: two operators with distinct
JA3 + HASSH, sharing C2 + payload. The production clusterer's
fingerprint-disagreement veto must keep them as 2 identities."""
from tests.factories.campaign_factory import generate, load_yaml
from tests.clustering.metrics import score
corpus = generate(load_yaml(FIXTURE_DIR / "multi_operator.yaml"), seed=0)
pred = _production_clusterer_predict(corpus)
# Two distinct truth identities; the production clusterer must
# produce two distinct predicted clusters (no merge across
# fingerprint-disagreeing operators).
assert len(set(pred.values())) == 2
metrics = score(corpus.truth_labels(level="identity"), pred)
# Perfect identity-level recovery: ARI = 1.0, homogeneity = 1.0.
assert metrics["adjusted_rand_index"] == pytest.approx(1.0)
assert metrics["homogeneity"] == pytest.approx(1.0)
def test_cluster_observations_medium_alone_does_not_fuse():
"""Two observations sharing only command-sequence (medium-tier)
must stay in distinct clusters — medium is a supporting signal."""

View File

@@ -68,6 +68,67 @@ def test_high_weight_both_null_ja3_does_not_match():
assert high_weight_edge(a, b) == 0.0
# ─── fingerprint-disagreement veto on payload / C2 ──────────────────────────
def test_high_weight_veto_on_fingerprint_disagreement_with_shared_c2():
"""Fixture 5 protection: two operators with distinct JA3 + HASSH
sharing a C2 endpoint must NOT score as identity match."""
a = _obs(ja3="ja3-A", hassh="hassh-A",
c2_endpoints=frozenset({"c2.shared.example"}))
b = _obs(ja3="ja3-B", hassh="hassh-B",
c2_endpoints=frozenset({"c2.shared.example"}))
assert high_weight_edge(a, b) == 0.0
def test_high_weight_veto_on_fingerprint_disagreement_with_shared_payload():
"""Same shape, payload signal — also vetoed."""
a = _obs(ja3="ja3-A", hassh="hassh-A",
payload_hashes=frozenset({"stage1"}))
b = _obs(ja3="ja3-B", hassh="hassh-B",
payload_hashes=frozenset({"stage1"}))
assert high_weight_edge(a, b) == 0.0
def test_high_weight_no_veto_when_fingerprints_unknown():
"""Two un-fingerprinted observations sharing C2 still cluster —
we don't veto without evidence of disagreement."""
a = _obs(c2_endpoints=frozenset({"c2.shared.example"}))
b = _obs(c2_endpoints=frozenset({"c2.shared.example"}))
assert high_weight_edge(a, b) == 1.0
def test_high_weight_no_veto_when_one_side_unknown():
"""One observation without fingerprints + one with — no
disagreement evidence, so shared C2 still clusters."""
a = _obs(ja3="ja3-A", hassh="hassh-A",
c2_endpoints=frozenset({"c2.shared.example"}))
b = _obs(c2_endpoints=frozenset({"c2.shared.example"}))
assert high_weight_edge(a, b) == 1.0
def test_high_weight_partial_fingerprint_agreement_no_veto():
"""JA3 agrees, HASSH disagrees → some agreement → no veto. The
veto only triggers on FULL disagreement."""
a = _obs(ja3="ja3-shared", hassh="hassh-A",
c2_endpoints=frozenset({"c2.shared.example"}))
b = _obs(ja3="ja3-shared", hassh="hassh-B",
c2_endpoints=frozenset({"c2.shared.example"}))
# JA3 agreement returns 1.0 immediately; veto never reached.
assert high_weight_edge(a, b) == 1.0
def test_high_weight_partial_disagreement_one_slot_only_vetoes():
"""One slot comparable + disagrees, other slot uncomparable
(one side null) → veto triggers (only available evidence is
disagreement)."""
a = _obs(ja3="ja3-A", hassh=None,
c2_endpoints=frozenset({"c2.shared.example"}))
b = _obs(ja3="ja3-B", hassh=None,
c2_endpoints=frozenset({"c2.shared.example"}))
assert high_weight_edge(a, b) == 0.0
def test_high_weight_empty_sets_no_match():
a = _obs(payload_hashes=frozenset(), c2_endpoints=frozenset())
b = _obs(payload_hashes=frozenset(), c2_endpoints=frozenset())