test(clustering): factory honors ip_pool: rotating + 3-level truth labels

Fifth and final commit of the identity-resolution substrate. Unblocks
fixture 2 (vpn_hopping) by making the synthetic factory match
production shape: an actor rotating across N IPs produces N
SyntheticAttacker rows that share fingerprints + truth_identity_id but
differ on ip / asn — exactly the shape the future clusterer needs to
recover via JA3/HASSH match.

Factory:
* SyntheticSession + SyntheticAttacker gain truth_identity_id field.
* DSL: ip_pool: rotating + rotation_count: N produces N observation
  rows per actor. Optional rotation_asns: [...] cycles ASN per row;
  defaults to the actor's primary asn.
* Sessions distribute round-robin across the actor's rotated rows.
* Noise scanners get truth_identity_id == truth_actor_id ==
  truth_campaign_id (each is its own singleton at every level).
* GeneratedCorpus.truth_labels(level=) accepts "campaign" (default,
  back-compat), "identity", or "actor" — picks the oracle the
  metric harness scores against.

Harness:
* assert_fixture_bounds gains truth_level kwarg (default "campaign")
  so identity-resolution fixtures can score against truth_identity_id
  without churning the campaign-clustering test files.

Tests: 9 new (rotation_count emits N rows, shared identity +
fingerprints, distinct IPs, rotation_asns distribution + cycling,
round-robin session distribution, identity-level truth labels,
sticky default unchanged, sessions inherit identity label).
598 tests green across clustering / factories / db / web / bus /
profiler / correlation.
This commit is contained in:
2026-04-26 07:19:39 -04:00
parent 4f1077be72
commit f6b83755eb
3 changed files with 303 additions and 25 deletions

View File

@@ -45,10 +45,19 @@ class SyntheticSession:
c2_callback: str | None
truth_campaign_id: str
truth_actor_id: str
truth_identity_id: str = ""
@dataclass
class SyntheticAttacker:
"""One per-IP observation row. Multiple rows per DSL actor when
``ip_pool: rotating`` — they all share ``truth_identity_id`` /
``truth_actor_id`` / ``truth_campaign_id`` plus ``ja3`` / ``hassh``,
differ on ``ip`` and (optionally) ``asn``. This matches production
shape: DECNET creates one ``Attacker`` row per source IP, and the
clusterer recovers identity by joining on stable fingerprints.
See development/IDENTITY_RESOLUTION.md.
"""
attacker_id: str
ip: str
asn: int
@@ -58,6 +67,7 @@ class SyntheticAttacker:
last_seen: datetime
truth_campaign_id: str
truth_actor_id: str
truth_identity_id: str = ""
sessions: list[SyntheticSession] = field(default_factory=list)
@@ -68,9 +78,27 @@ class GeneratedCorpus:
# Convenience: flat list of every session across every attacker.
sessions: list[SyntheticSession]
def truth_labels(self) -> dict[str, str]:
"""attacker_id -> truth_campaign_id, the oracle the clusterer is scored against."""
return {a.attacker_id: a.truth_campaign_id for a in self.attackers}
def truth_labels(self, *, level: str = "campaign") -> dict[str, str]:
"""``attacker_id -> truth-{level}-id`` oracle the clusterer is scored against.
``level``:
- ``"campaign"`` (default) — campaign-clustering oracle.
- ``"identity"`` — identity-resolution oracle. Multiple
observations from a single rotating actor share an identity
label; campaign-level still groups them with whatever else
is in their campaign.
- ``"actor"`` — for completeness; equivalent to identity for
the single-campaign single-actor case but distinguishes
multi-actor campaigns where each operator is its own
identity (e.g. fixture 5 multi_operator).
"""
if level == "campaign":
return {a.attacker_id: a.truth_campaign_id for a in self.attackers}
if level == "identity":
return {a.attacker_id: a.truth_identity_id for a in self.attackers}
if level == "actor":
return {a.attacker_id: a.truth_actor_id for a in self.attackers}
raise ValueError(f"unknown truth-label level: {level!r}")
# ─── Phase defaults ─────────────────────────────────────────────────────────
@@ -229,26 +257,50 @@ def _emit_campaign(
# across runs regardless of wall clock.
epoch = datetime(2026, 1, 1, tzinfo=timezone.utc)
# One attacker record per actor — captures the cross-session identity
# the clusterer is supposed to recover. IPs may rotate per session
# for rotating ip_pool actors; we record the first/last observed IP
# on the attacker row and let session-level fields carry the rest.
actor_attackers: dict[str, SyntheticAttacker] = {}
# Per-actor SyntheticAttacker rows. One per actor for ``ip_pool:
# sticky`` (default); ``rotation_count`` rows for ``ip_pool:
# rotating`` — distinct IPs (and optionally distinct ASNs via
# ``rotation_asns``) but a SHARED ``truth_identity_id`` so the
# clusterer's job is to recover them as one. This matches
# production: an actor rotating across N IPs produces N ``Attacker``
# observation rows that the clusterer needs to fold into one
# ``AttackerIdentity``.
#
# ``actor_rows[actor_id]`` is the list the session scheduler
# round-robins over so an actor's sessions distribute across the
# rotated IPs naturally.
actor_rows: dict[str, list[SyntheticAttacker]] = {}
for actor in c["actors"]:
a_id = _stable_uuid(rng, "att")
att = SyntheticAttacker(
attacker_id=a_id,
ip=_stable_ip(rng),
asn=int(actor.get("asn", 0)),
ja3=actor.get("ja3"),
hassh=actor.get("hassh"),
first_seen=epoch,
last_seen=epoch,
truth_campaign_id=campaign_id,
truth_actor_id=actor["id"],
)
actor_attackers[actor["id"]] = att
attackers.append(att)
# One identity per DSL actor — shared across all rotated rows.
identity_id = _stable_uuid(rng, "id")
ip_pool = actor.get("ip_pool", "sticky")
rotation_count = int(actor.get("rotation_count", 1)) if ip_pool == "rotating" else 1
rotation_asns: list[int] = list(actor.get("rotation_asns", []) or [])
primary_asn = int(actor.get("asn", 0))
rows: list[SyntheticAttacker] = []
for r in range(rotation_count):
# Cycle rotation_asns if shorter than rotation_count; fall
# back to the actor's primary asn if no pool is given.
asn_for_row = (
rotation_asns[r % len(rotation_asns)]
if rotation_asns else primary_asn
)
row = SyntheticAttacker(
attacker_id=_stable_uuid(rng, "att"),
ip=_stable_ip(rng),
asn=asn_for_row,
ja3=actor.get("ja3"),
hassh=actor.get("hassh"),
first_seen=epoch,
last_seen=epoch,
truth_campaign_id=campaign_id,
truth_actor_id=actor["id"],
truth_identity_id=identity_id,
)
rows.append(row)
attackers.append(row)
actor_rows[actor["id"]] = rows
# Walk phases in declared order. Each phase produces N sessions
# against random deckies (or a sticky one if previous_success).
@@ -261,7 +313,7 @@ def _emit_campaign(
continue # pre-target phase; emit nothing
actor_id = ph.get("actor") or c["actors"][0]["id"]
att = actor_attackers[actor_id]
rows = actor_rows[actor_id]
actor_spec = next(a for a in c["actors"] if a["id"] == actor_id)
sig = ph.get("tool_signature", {}) or {}
@@ -299,6 +351,13 @@ def _emit_campaign(
started_at = _hour_to_offset(rng, day_start, hour, jitter)
duration_s = float(ph.get("dwell_seconds", 5))
# Distribute sessions across the actor's rotated rows by
# round-robin. With rotation_count=1 (sticky) every session
# lands on the same row — back-compat preserved. With N>1,
# sessions interleave so the clusterer sees N distinct
# observation rows each with their own session timeline,
# all sharing the actor's stable fingerprints.
att = rows[s_idx % len(rows)]
sess = SyntheticSession(
session_id=_stable_uuid(rng, "sess"),
attacker_id=att.attacker_id,
@@ -312,6 +371,7 @@ def _emit_campaign(
c2_callback=c2,
truth_campaign_id=campaign_id,
truth_actor_id=actor_id,
truth_identity_id=att.truth_identity_id,
)
sessions.append(sess)
att.sessions.append(sess)
@@ -338,6 +398,9 @@ def _emit_noise(
epoch = datetime(2026, 1, 1, tzinfo=timezone.utc)
for i in range(n_scanners):
scanner_id = f"noise-scanner-{i:04d}"
# Each noise scanner is its own truth-campaign AND its own
# truth-identity — opportunistic singletons share nothing with
# anyone, including each other.
att = SyntheticAttacker(
attacker_id=_stable_uuid(rng, "att"),
ip=_stable_ip(rng),
@@ -346,8 +409,9 @@ def _emit_noise(
hassh=None,
first_seen=epoch,
last_seen=epoch,
truth_campaign_id=scanner_id, # each scanner is its own truth-campaign
truth_campaign_id=scanner_id,
truth_actor_id=scanner_id,
truth_identity_id=scanner_id,
)
attackers.append(att)
# One Delivery-phase session, no follow-up.
@@ -365,6 +429,7 @@ def _emit_noise(
c2_callback=None,
truth_campaign_id=scanner_id,
truth_actor_id=scanner_id,
truth_identity_id=scanner_id,
)
sessions.append(sess)
att.sessions.append(sess)