test(clustering): fixture 5 multi_operator + c2/shift/composite refs

Three new reference clusterers in fixture_harness:

* c2_callback_clusterer — union-find on overlapping C2 callback
  sets across an attacker's sessions. Pass-clusterer for fixture 5
  where two operators with distinct tooling share a C2 endpoint as
  the campaign signal.

* shift_clusterer — deliberately-bad reference that buckets
  attackers by majority session-start hour into night/day/swing.
  Adversarial reference for fixture 5; proves operational schedule
  is NOT a campaign signal.

* composite_signals_clusterer — union-find combining (ja3, hassh)
  match OR overlapping C2 callback. Will serve as the pass-
  clusterer for fixture 6 (noise_floor) where multiple campaigns
  with heterogeneous signal types are scored together.

Also factored a small _union_find helper for the new clusterers
(existing time_window/credential_jaccard left untouched to avoid
mixing refactor with feature work).

Fixture 5 (multi_operator): one campaign, two operators with
distinct UKC roles. Actor A (broker, night shift): Delivery →
Exploitation → Persistence → C2. Actor B (post-ex, day shift):
Discovery → Lateral Movement → Collection → Exfiltration.
Distinct JA3/HASSH/ASN/IPs; shared C2 + payload hash.

Four tests: corpus shape (distinct fingerprints, shared C2,
disjoint shifts), pipeline pass via c2_callback_clusterer,
explicit harness sanity that fingerprint_clusterer cannot resolve
this fixture (documents which signal carries the campaign), and
adversarial shift_clusterer fragmentation.

Phase-handoff edges (the real load-bearing signal per the design
doc) wait for the production clusterer; this fixture will prove
they're needed when it ships.
This commit is contained in:
2026-04-26 07:46:14 -04:00
parent 304592abfe
commit 27f7de9886
4 changed files with 428 additions and 0 deletions

View File

@@ -42,6 +42,23 @@ cluster on, not the quality of the result.
other. Exists so fixtures like `paused_campaign` (fixture #4) can
prove they fail a clusterer that treats short-window time proximity
as a primary signal — operators pause, sleep, take weekends.
* `c2_callback_clusterer` — union-find on overlapping C2 callback
sets. Pass-clusterer for fixture 5 (multi_operator), where two
operators with distinct tooling share a C2 endpoint as the
load-bearing campaign signal. Attackers with no C2 endpoints
become their own singleton.
* `shift_clusterer` — deliberately-bad reference that buckets
attackers by majority session-start hour into night/day/swing.
Exists so fixture 5 can prove they fail a clusterer that treats
shift schedule as a primary signal — operators on different
schedules can still share a campaign.
* `composite_signals_clusterer` — union-find that combines
``(ja3, hassh)`` match OR shared C2 callback into the same
cluster. Approximates the planned similarity graph well enough
to score the combined-corpus fixture (fixture 6, noise_floor).
"""
from __future__ import annotations
@@ -123,6 +140,150 @@ def asn_clusterer(corpus: GeneratedCorpus) -> dict[str, str]:
return {a.attacker_id: f"asn-{a.asn}" for a in corpus.attackers}
def _union_find(ids: list[str]) -> tuple[
dict[str, str], Callable[[str], str], Callable[[str, str], None]
]:
"""Return (parent, find, union) for a fresh union-find over ``ids``."""
parent: dict[str, str] = {aid: aid for aid in ids}
def find(x: str) -> str:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(x: str, y: str) -> None:
rx, ry = find(x), find(y)
if rx != ry:
parent[rx] = ry
return parent, find, union
def c2_callback_clusterer(corpus: GeneratedCorpus) -> dict[str, str]:
"""Union attackers whose session-collected C2 callback sets overlap.
Attackers with no C2 callbacks become their own singleton (an
un-fingerprinted opportunistic scanner has no link to anyone).
"""
callbacks: dict[str, set[str]] = {}
for att in corpus.attackers:
callbacks[att.attacker_id] = {
s.c2_callback for s in att.sessions if s.c2_callback
}
ids = list(callbacks.keys())
_parent, find, union = _union_find(ids)
for i, a in enumerate(ids):
sa = callbacks[a]
if not sa:
continue
for b in ids[i + 1 :]:
sb = callbacks[b]
if not sb:
continue
if sa & sb:
union(a, b)
pred: dict[str, str] = {}
for aid in ids:
if not callbacks[aid]:
pred[aid] = f"c2-none-{aid}"
else:
pred[aid] = f"c2-{find(aid)}"
return pred
def shift_clusterer(corpus: GeneratedCorpus) -> dict[str, str]:
"""Bucket attackers by majority session-start hour into night /
day / swing. Deliberately-bad — see fixture 5.
Buckets:
* night — hours [22, 23, 0, 1, 2, 3, 4, 5]
* day — hours [6, 7, 8, 9, 10, 11, 12, 13]
* swing — hours [14, 15, 16, 17, 18, 19, 20, 21]
Attackers with no sessions become their own singleton.
"""
night = {22, 23, 0, 1, 2, 3, 4, 5}
day = {6, 7, 8, 9, 10, 11, 12, 13}
def bucket(hour: int) -> str:
if hour in night:
return "night"
if hour in day:
return "day"
return "swing"
pred: dict[str, str] = {}
for att in corpus.attackers:
if not att.sessions:
pred[att.attacker_id] = f"shift-none-{att.attacker_id}"
continue
counts: dict[str, int] = {}
for s in att.sessions:
b = bucket(s.started_at.hour)
counts[b] = counts.get(b, 0) + 1
majority = max(counts, key=lambda k: counts[k])
pred[att.attacker_id] = f"shift-{majority}"
return pred
def composite_signals_clusterer(corpus: GeneratedCorpus) -> dict[str, str]:
"""Union-find combining ``(ja3, hassh)`` match OR overlapping C2
callback sets. Approximates the stable-signals + C2-overlap arms
of the planned similarity graph; used as the pass-clusterer for
fixture 6 where multiple campaigns + noise are scored together.
Attackers with NO signals (no fingerprint, no C2) stay singleton.
"""
callbacks: dict[str, set[str]] = {}
fingerprint: dict[str, tuple[str | None, str | None] | None] = {}
for att in corpus.attackers:
callbacks[att.attacker_id] = {
s.c2_callback for s in att.sessions if s.c2_callback
}
if att.ja3 is None and att.hassh is None:
fingerprint[att.attacker_id] = None
else:
fingerprint[att.attacker_id] = (att.ja3, att.hassh)
ids = list(callbacks.keys())
_parent, find, union = _union_find(ids)
# Fingerprint edges.
by_fp: dict[tuple[str | None, str | None], list[str]] = {}
for aid, fp in fingerprint.items():
if fp is None:
continue
by_fp.setdefault(fp, []).append(aid)
for group in by_fp.values():
anchor = group[0]
for other in group[1:]:
union(anchor, other)
# C2 overlap edges.
for i, a in enumerate(ids):
sa = callbacks[a]
if not sa:
continue
for b in ids[i + 1 :]:
sb = callbacks[b]
if not sb:
continue
if sa & sb:
union(a, b)
pred: dict[str, str] = {}
for aid in ids:
if fingerprint[aid] is None and not callbacks[aid]:
pred[aid] = f"composite-singleton-{aid}"
else:
pred[aid] = f"composite-{find(aid)}"
return pred
def time_window_clusterer(
corpus: GeneratedCorpus, *, gap_days: float = 1.0
) -> dict[str, str]:

View File

@@ -0,0 +1,134 @@
"""
End-to-end pipeline test for fixture 5 (multi_operator).
One campaign, two operators with distinct UKC roles, distinct
tooling (different JA3 + HASSH), distinct ASNs and IPs, on
opposite shift schedules. What ties them is shared C2 callback +
shared stage-1 payload hash — the planned similarity graph's
"payload simhash + C2 endpoint match" arms are what should resolve
them as one campaign.
Three tests cover this:
1. `test_multi_operator_corpus_shape` — sanity: two attackers, one
campaign, distinct fingerprints, shared C2 callback present in
both rows' sessions, distinct shift hours.
2. `test_multi_operator_pipeline_passes_bounds` — runs
`c2_callback_clusterer` (the appropriate pass-clusterer for
this fixture, since fingerprint_clusterer would split the two
distinct operators). Folds both rows into one cluster via the
shared C2 endpoint.
3. `test_shift_clusterer_fragments_campaign` — runs the deliberately
bad `shift_clusterer`. Actor A on night shift and Actor B on day
shift split into two clusters → completeness collapses → the
bound floor on completeness rejects the bad clusterer. This is
the canonical proof that operational-schedule overlap is NOT a
campaign signal.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from tests.clustering.fixture_harness import (
assert_fixture_bounds,
c2_callback_clusterer,
fingerprint_clusterer,
shift_clusterer,
)
from tests.clustering.metrics import score
from tests.factories.campaign_factory import generate, load_yaml
FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns"
FIXTURE_YAML = FIXTURE_DIR / "multi_operator.yaml"
EXPECTED_YAML = FIXTURE_DIR / "multi_operator.expected.yaml"
def test_multi_operator_corpus_shape() -> None:
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
assert len(corpus.attackers) == 2
truth_campaigns = {a.truth_campaign_id for a in corpus.attackers}
assert truth_campaigns == {"multi-operator-001"}
# Two distinct fingerprints — the operators are different people
# using different tools.
ja3s = {a.ja3 for a in corpus.attackers}
hasshs = {a.hassh for a in corpus.attackers}
assert len(ja3s) == 2
assert len(hasshs) == 2
# Shared C2 callback across both rows' sessions.
by_actor = {a.truth_actor_id: a for a in corpus.attackers}
broker = by_actor["ops-broker-night"]
postex = by_actor["ops-postex-day"]
broker_c2s = {s.c2_callback for s in broker.sessions if s.c2_callback}
postex_c2s = {s.c2_callback for s in postex.sessions if s.c2_callback}
assert "c2.shared-op.example" in broker_c2s
assert "c2.shared-op.example" in postex_c2s
# Shifts are disjoint — load-bearing for the adversarial test.
broker_hours = {s.started_at.hour for s in broker.sessions}
postex_hours = {s.started_at.hour for s in postex.sessions}
assert broker_hours <= {22, 23, 0, 1, 2, 3}
assert postex_hours <= {9, 10, 11, 12, 13}
def test_multi_operator_pipeline_passes_bounds() -> None:
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
metrics = assert_fixture_bounds(corpus, c2_callback_clusterer, EXPECTED_YAML)
pred = c2_callback_clusterer(corpus)
assert len(set(pred.values())) == 1, (
"c2_callback_clusterer should fold both operators into one cluster"
)
assert metrics["adjusted_rand_index"] == pytest.approx(1.0)
def test_fingerprint_clusterer_cannot_resolve_this_fixture() -> None:
"""
Sanity for the harness, NOT a test of the clusterer: with two
distinct fingerprints and one truth campaign,
`fingerprint_clusterer` produces 2 clusters → completeness
collapses. This is *why* the fixture's pass-clusterer is
`c2_callback_clusterer` instead. Documents which signal
actually carries the campaign here.
"""
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
pred = fingerprint_clusterer(corpus)
assert len(set(pred.values())) == 2
metrics = score(corpus.truth_labels(level="campaign"), pred)
assert metrics["completeness"] == pytest.approx(0.0)
def test_shift_clusterer_fragments_campaign() -> None:
"""
The fixture's reason for being. Bucket attackers by shift and
the two operators land in 'night' and 'day' clusters → 2
predicted clusters. Truth = 1 campaign → completeness collapses.
If this test ever passes (shift_clusterer satisfies the bounds),
the fixture has lost its discrimination power.
"""
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
pred = shift_clusterer(corpus)
buckets = set(pred.values())
assert buckets == {"shift-night", "shift-day"}, (
f"expected one night cluster + one day cluster, got {buckets}"
)
metrics = score(corpus.truth_labels(level="campaign"), pred)
assert metrics["completeness"] == pytest.approx(0.0)
bounds = {
"adjusted_rand_index": 0.85,
"homogeneity": 0.90,
"completeness": 0.80,
"singleton_recall": 0.95,
}
breaches = [k for k, floor in bounds.items() if metrics[k] < floor]
assert "completeness" in breaches, (
f"fixture failed to catch the bad clusterer; observed metrics: {metrics}"
)