diff --git a/tests/clustering/fixture_harness.py b/tests/clustering/fixture_harness.py index fe33224f..64e860aa 100644 --- a/tests/clustering/fixture_harness.py +++ b/tests/clustering/fixture_harness.py @@ -36,16 +36,23 @@ def assert_fixture_bounds( corpus: GeneratedCorpus, predict: PredictFn, expected_path: str | Path, + *, + truth_level: str = "campaign", ) -> dict[str, float]: """ Run `predict` against the corpus, score against ground truth, and assert every metric meets the floor declared in `expected_path`. + ``truth_level`` selects the oracle: ``"campaign"`` (default) for + campaign-clustering fixtures, ``"identity"`` for identity-resolution + fixtures (where the clusterer's job is to fold N rotated-IP + observations into one identity), or ``"actor"`` for completeness. + Returns the observed metrics dict so callers can do additional assertions (e.g. "homogeneity is *exactly* 1.0 for this fixture"). """ bounds = yaml.safe_load(Path(expected_path).read_text(encoding="utf-8")) - truth = corpus.truth_labels() + truth = corpus.truth_labels(level=truth_level) pred = predict(corpus) metrics = score(truth, pred) diff --git a/tests/clustering/test_campaign_factory.py b/tests/clustering/test_campaign_factory.py index 2782dbe4..cfa61ba9 100644 --- a/tests/clustering/test_campaign_factory.py +++ b/tests/clustering/test_campaign_factory.py @@ -110,3 +110,209 @@ def test_multi_actor_campaign_shares_campaign_id() -> None: # Both attacker rows must point to the SAME truth_campaign_id — # this is the property fixture 5 (multi_operator) hinges on. assert set(truth.values()) == {"c-shared"} + + +# ─── ip_pool: rotating — identity-resolution fixture support ──────────────── + + +def test_rotating_ip_pool_emits_one_row_per_rotation_count() -> None: + """ + ``rotation_count: 5`` produces 5 SyntheticAttacker rows for that + one DSL actor. Sticky default still produces 1. + """ + spec = { + "campaign": { + "id": "c-rotating", + "actors": [{ + "id": "a-1", + "asn": 14061, + "ip_pool": "rotating", + "rotation_count": 5, + "ja3": "JA3-fixed", + "hassh": "HASSH-fixed", + }], + "phases": [{"name": "delivery", "actor": "a-1", + "target_selector": {"count": 10}}], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + assert len(corpus.attackers) == 5 + + +def test_rotating_rows_share_identity_and_fingerprints_but_differ_on_ip() -> None: + """ + All rotated rows MUST share truth_identity_id, truth_actor_id, + truth_campaign_id, ja3, hassh — these are the stable signals the + clusterer uses to recover identity. They MUST differ on ip — that's + what makes the test interesting. + """ + spec = { + "campaign": { + "id": "c-vpn-hop", + "actors": [{ + "id": "a-1", + "asn": 14061, + "ip_pool": "rotating", + "rotation_count": 5, + "ja3": "JA3-fixed", + "hassh": "HASSH-fixed", + }], + "phases": [{"name": "delivery", "actor": "a-1", + "target_selector": {"count": 5}}], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + rows = corpus.attackers + # Stable: shared across all 5 rows. + assert len({r.truth_identity_id for r in rows}) == 1 + assert len({r.truth_actor_id for r in rows}) == 1 + assert len({r.truth_campaign_id for r in rows}) == 1 + assert len({r.ja3 for r in rows}) == 1 + assert len({r.hassh for r in rows}) == 1 + # Rotating: 5 distinct IPs. + assert len({r.ip for r in rows}) == 5 + + +def test_rotation_asns_distributed_across_rows() -> None: + """ + When ``rotation_asns`` is provided, each rotated row gets the + corresponding ASN (cycling if shorter than rotation_count). + """ + spec = { + "campaign": { + "id": "c-multi-asn", + "actors": [{ + "id": "a-1", + "asn": 14061, # primary, ignored when rotation_asns is set + "ip_pool": "rotating", + "rotation_count": 5, + "rotation_asns": [14061, 7922, 16509, 14618, 13335], + "ja3": "x", "hassh": "y", + }], + "phases": [{"name": "delivery", "actor": "a-1", + "target_selector": {"count": 5}}], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + asns = [r.asn for r in corpus.attackers] + assert asns == [14061, 7922, 16509, 14618, 13335] + + +def test_rotation_asns_cycle_when_shorter_than_count() -> None: + """rotation_asns of length 2 with rotation_count=5 cycles.""" + spec = { + "campaign": { + "id": "c-cycle", + "actors": [{ + "id": "a-1", + "ip_pool": "rotating", + "rotation_count": 5, + "rotation_asns": [100, 200], + "ja3": "x", "hassh": "y", + }], + "phases": [{"name": "delivery", "actor": "a-1"}], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + assert [r.asn for r in corpus.attackers] == [100, 200, 100, 200, 100] + + +def test_sessions_distribute_round_robin_across_rotated_rows() -> None: + """ + With rotation_count=3 and 9 sessions in a phase, each row should + receive 3 sessions (round-robin). This is what makes the clusterer + job realistic — every observation row carries its own session + timeline that the clusterer joins via shared fingerprints. + """ + spec = { + "campaign": { + "id": "c-rr", + "actors": [{ + "id": "a-1", + "ip_pool": "rotating", + "rotation_count": 3, + "ja3": "x", "hassh": "y", + }], + "phases": [{"name": "delivery", "actor": "a-1", + "target_selector": {"count": 9}}], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + counts = sorted(len(r.sessions) for r in corpus.attackers) + assert counts == [3, 3, 3] + + +def test_truth_labels_at_identity_level() -> None: + """ + corpus.truth_labels(level="identity") returns the identity-level + oracle the clusterer is scored against. Rotated rows for one DSL + actor share an identity label even though they have distinct + attacker_ids. + """ + spec = { + "campaign": { + "id": "c-rot", + "actors": [{ + "id": "a-1", + "ip_pool": "rotating", + "rotation_count": 4, + "ja3": "x", "hassh": "y", + }], + "phases": [{"name": "delivery", "actor": "a-1", + "target_selector": {"count": 4}}], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + identity_labels = corpus.truth_labels(level="identity") + assert len(identity_labels) == 4 # one per attacker row + # All 4 attackers share one identity label. + assert len(set(identity_labels.values())) == 1 + + +def test_truth_labels_unknown_level_raises() -> None: + spec = _minimal_spec() + corpus = generate(spec, seed=0) + with pytest.raises(ValueError, match="unknown truth-label level"): + corpus.truth_labels(level="campaign-but-spelled-wrong") + + +def test_sticky_default_unchanged_back_compat() -> None: + """ + The pre-existing sticky-default path produces exactly one row per + actor and assigns truth_identity_id. Smoke-tests that the + refactor didn't break the back-compat case. + """ + corpus = generate(_minimal_spec(), seed=0) + assert len(corpus.attackers) == 1 + assert corpus.attackers[0].truth_identity_id != "" + # Default truth_labels still returns campaign labels. + labels = corpus.truth_labels() + assert set(labels.values()) == {"c-test"} + + +def test_rotated_sessions_carry_identity_label() -> None: + """SyntheticSession.truth_identity_id matches its parent attacker.""" + spec = { + "campaign": { + "id": "c-rot", + "actors": [{ + "id": "a-1", + "ip_pool": "rotating", + "rotation_count": 3, + "ja3": "x", "hassh": "y", + }], + "phases": [{"name": "delivery", "actor": "a-1", + "target_selector": {"count": 6}}], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + by_id = {a.attacker_id: a for a in corpus.attackers} + for sess in corpus.sessions: + assert sess.truth_identity_id == by_id[sess.attacker_id].truth_identity_id diff --git a/tests/factories/campaign_factory.py b/tests/factories/campaign_factory.py index 6e6f4e51..62556422 100644 --- a/tests/factories/campaign_factory.py +++ b/tests/factories/campaign_factory.py @@ -45,10 +45,19 @@ class SyntheticSession: c2_callback: str | None truth_campaign_id: str truth_actor_id: str + truth_identity_id: str = "" @dataclass class SyntheticAttacker: + """One per-IP observation row. Multiple rows per DSL actor when + ``ip_pool: rotating`` — they all share ``truth_identity_id`` / + ``truth_actor_id`` / ``truth_campaign_id`` plus ``ja3`` / ``hassh``, + differ on ``ip`` and (optionally) ``asn``. This matches production + shape: DECNET creates one ``Attacker`` row per source IP, and the + clusterer recovers identity by joining on stable fingerprints. + See development/IDENTITY_RESOLUTION.md. + """ attacker_id: str ip: str asn: int @@ -58,6 +67,7 @@ class SyntheticAttacker: last_seen: datetime truth_campaign_id: str truth_actor_id: str + truth_identity_id: str = "" sessions: list[SyntheticSession] = field(default_factory=list) @@ -68,9 +78,27 @@ class GeneratedCorpus: # Convenience: flat list of every session across every attacker. sessions: list[SyntheticSession] - def truth_labels(self) -> dict[str, str]: - """attacker_id -> truth_campaign_id, the oracle the clusterer is scored against.""" - return {a.attacker_id: a.truth_campaign_id for a in self.attackers} + def truth_labels(self, *, level: str = "campaign") -> dict[str, str]: + """``attacker_id -> truth-{level}-id`` oracle the clusterer is scored against. + + ``level``: + - ``"campaign"`` (default) — campaign-clustering oracle. + - ``"identity"`` — identity-resolution oracle. Multiple + observations from a single rotating actor share an identity + label; campaign-level still groups them with whatever else + is in their campaign. + - ``"actor"`` — for completeness; equivalent to identity for + the single-campaign single-actor case but distinguishes + multi-actor campaigns where each operator is its own + identity (e.g. fixture 5 multi_operator). + """ + if level == "campaign": + return {a.attacker_id: a.truth_campaign_id for a in self.attackers} + if level == "identity": + return {a.attacker_id: a.truth_identity_id for a in self.attackers} + if level == "actor": + return {a.attacker_id: a.truth_actor_id for a in self.attackers} + raise ValueError(f"unknown truth-label level: {level!r}") # ─── Phase defaults ───────────────────────────────────────────────────────── @@ -229,26 +257,50 @@ def _emit_campaign( # across runs regardless of wall clock. epoch = datetime(2026, 1, 1, tzinfo=timezone.utc) - # One attacker record per actor — captures the cross-session identity - # the clusterer is supposed to recover. IPs may rotate per session - # for rotating ip_pool actors; we record the first/last observed IP - # on the attacker row and let session-level fields carry the rest. - actor_attackers: dict[str, SyntheticAttacker] = {} + # Per-actor SyntheticAttacker rows. One per actor for ``ip_pool: + # sticky`` (default); ``rotation_count`` rows for ``ip_pool: + # rotating`` — distinct IPs (and optionally distinct ASNs via + # ``rotation_asns``) but a SHARED ``truth_identity_id`` so the + # clusterer's job is to recover them as one. This matches + # production: an actor rotating across N IPs produces N ``Attacker`` + # observation rows that the clusterer needs to fold into one + # ``AttackerIdentity``. + # + # ``actor_rows[actor_id]`` is the list the session scheduler + # round-robins over so an actor's sessions distribute across the + # rotated IPs naturally. + actor_rows: dict[str, list[SyntheticAttacker]] = {} for actor in c["actors"]: - a_id = _stable_uuid(rng, "att") - att = SyntheticAttacker( - attacker_id=a_id, - ip=_stable_ip(rng), - asn=int(actor.get("asn", 0)), - ja3=actor.get("ja3"), - hassh=actor.get("hassh"), - first_seen=epoch, - last_seen=epoch, - truth_campaign_id=campaign_id, - truth_actor_id=actor["id"], - ) - actor_attackers[actor["id"]] = att - attackers.append(att) + # One identity per DSL actor — shared across all rotated rows. + identity_id = _stable_uuid(rng, "id") + ip_pool = actor.get("ip_pool", "sticky") + rotation_count = int(actor.get("rotation_count", 1)) if ip_pool == "rotating" else 1 + rotation_asns: list[int] = list(actor.get("rotation_asns", []) or []) + primary_asn = int(actor.get("asn", 0)) + + rows: list[SyntheticAttacker] = [] + for r in range(rotation_count): + # Cycle rotation_asns if shorter than rotation_count; fall + # back to the actor's primary asn if no pool is given. + asn_for_row = ( + rotation_asns[r % len(rotation_asns)] + if rotation_asns else primary_asn + ) + row = SyntheticAttacker( + attacker_id=_stable_uuid(rng, "att"), + ip=_stable_ip(rng), + asn=asn_for_row, + ja3=actor.get("ja3"), + hassh=actor.get("hassh"), + first_seen=epoch, + last_seen=epoch, + truth_campaign_id=campaign_id, + truth_actor_id=actor["id"], + truth_identity_id=identity_id, + ) + rows.append(row) + attackers.append(row) + actor_rows[actor["id"]] = rows # Walk phases in declared order. Each phase produces N sessions # against random deckies (or a sticky one if previous_success). @@ -261,7 +313,7 @@ def _emit_campaign( continue # pre-target phase; emit nothing actor_id = ph.get("actor") or c["actors"][0]["id"] - att = actor_attackers[actor_id] + rows = actor_rows[actor_id] actor_spec = next(a for a in c["actors"] if a["id"] == actor_id) sig = ph.get("tool_signature", {}) or {} @@ -299,6 +351,13 @@ def _emit_campaign( started_at = _hour_to_offset(rng, day_start, hour, jitter) duration_s = float(ph.get("dwell_seconds", 5)) + # Distribute sessions across the actor's rotated rows by + # round-robin. With rotation_count=1 (sticky) every session + # lands on the same row — back-compat preserved. With N>1, + # sessions interleave so the clusterer sees N distinct + # observation rows each with their own session timeline, + # all sharing the actor's stable fingerprints. + att = rows[s_idx % len(rows)] sess = SyntheticSession( session_id=_stable_uuid(rng, "sess"), attacker_id=att.attacker_id, @@ -312,6 +371,7 @@ def _emit_campaign( c2_callback=c2, truth_campaign_id=campaign_id, truth_actor_id=actor_id, + truth_identity_id=att.truth_identity_id, ) sessions.append(sess) att.sessions.append(sess) @@ -338,6 +398,9 @@ def _emit_noise( epoch = datetime(2026, 1, 1, tzinfo=timezone.utc) for i in range(n_scanners): scanner_id = f"noise-scanner-{i:04d}" + # Each noise scanner is its own truth-campaign AND its own + # truth-identity — opportunistic singletons share nothing with + # anyone, including each other. att = SyntheticAttacker( attacker_id=_stable_uuid(rng, "att"), ip=_stable_ip(rng), @@ -346,8 +409,9 @@ def _emit_noise( hassh=None, first_seen=epoch, last_seen=epoch, - truth_campaign_id=scanner_id, # each scanner is its own truth-campaign + truth_campaign_id=scanner_id, truth_actor_id=scanner_id, + truth_identity_id=scanner_id, ) attackers.append(att) # One Delivery-phase session, no follow-up. @@ -365,6 +429,7 @@ def _emit_noise( c2_callback=None, truth_campaign_id=scanner_id, truth_actor_id=scanner_id, + truth_identity_id=scanner_id, ) sessions.append(sess) att.sessions.append(sess)