From 304592abfeba467ead50f0f9ff58717705c65c52 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 07:39:46 -0400 Subject: [PATCH] test(clustering): fixture 4 paused_campaign + active_days/time_window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the actor.active_days primitive to the campaign factory so a DSL actor can be bound to specific day indexes. Falls back to the non-paused day pool when absent (existing fixtures unchanged). Intersects with pause_windows so the campaign-wide silence still wins if both are set. Adds time_window_clusterer reference to fixture_harness — union-find over attackers, edge if their session time-ranges are within gap_days of each other. Deliberately-bad reference for fixture 4: multi-day silent stretches fragment a single campaign because the clusterer has no signal that bridges the gap. Fixture 4 (paused_campaign): one campaign modeled as two DSL actors representing the operator's two operational windows (active days 1-2 and 6-7), separated by a silent stretch (days 3-5). Both share JA3 + HASSH + payload + C2 callback; only their active_days differ. Five tests: corpus shape (rows in their windows, shared signals), pipeline pass via fingerprint_clusterer at level=campaign, adversarial fragmentation via time_window_clusterer (1-day union threshold cannot bridge the 4-day silence → completeness collapses), huge-gap sanity (gap_days=10 unions both halves), silent-stretch invariant (no session leaks into the configured pause window). Identity-level scoring is fixture 2's job; this fixture is campaign-level only — modeling caveat documented in the YAML. --- tests/clustering/fixture_harness.py | 65 ++++++++ .../test_paused_campaign_fixture.py | 140 ++++++++++++++++++ tests/factories/campaign_factory.py | 31 ++-- .../campaigns/paused_campaign.expected.yaml | 24 +++ tests/fixtures/campaigns/paused_campaign.yaml | 85 +++++++++++ 5 files changed, 334 insertions(+), 11 deletions(-) create mode 100644 tests/clustering/test_paused_campaign_fixture.py create mode 100644 tests/fixtures/campaigns/paused_campaign.expected.yaml create mode 100644 tests/fixtures/campaigns/paused_campaign.yaml diff --git a/tests/clustering/fixture_harness.py b/tests/clustering/fixture_harness.py index a2a95df7..04fd8d7e 100644 --- a/tests/clustering/fixture_harness.py +++ b/tests/clustering/fixture_harness.py @@ -36,6 +36,12 @@ cluster on, not the quality of the result. can prove they fail a clusterer that treats ASN match as a high-weight signal — VPN/proxy hopping shatters ASN within a single identity and a clusterer that leans on it tanks completeness. + +* `time_window_clusterer` — deliberately-bad reference that unions + attackers whose session time-ranges are within ``gap_days`` of each + other. Exists so fixtures like `paused_campaign` (fixture #4) can + prove they fail a clusterer that treats short-window time proximity + as a primary signal — operators pause, sleep, take weekends. """ from __future__ import annotations @@ -117,6 +123,65 @@ def asn_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: return {a.attacker_id: f"asn-{a.asn}" for a in corpus.attackers} +def time_window_clusterer( + corpus: GeneratedCorpus, *, gap_days: float = 1.0 +) -> dict[str, str]: + """Union-find over attackers, edge if their session time-ranges + overlap or are within ``gap_days`` of each other. + + Deliberately-bad reference for fixture 4 (paused_campaign): a + campaign that goes silent for several days will be split into + "before pause" and "after pause" clusters by this clusterer, + breaching completeness. The real algorithm must not lean on + short-window time proximity as a primary signal — operators + pause, sleep, switch shifts, take weekends. Time bursts are a + weak hint, not a hard partition. + + Attackers with no sessions become their own singleton cluster. + """ + from datetime import timedelta + + gap = timedelta(days=gap_days) + ids = [a.attacker_id for a in corpus.attackers] + ranges: dict[str, tuple] = {} + for att in corpus.attackers: + if not att.sessions: + continue + starts = [s.started_at for s in att.sessions] + ends = [s.started_at + timedelta(seconds=s.duration_s) for s in att.sessions] + ranges[att.attacker_id] = (min(starts), max(ends)) + + parent: dict[str, str] = {aid: aid for aid in ids} + + def find(x: str) -> str: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x: str, y: str) -> None: + rx, ry = find(x), find(y) + if rx != ry: + parent[rx] = ry + + keys = list(ranges.keys()) + for i, a in enumerate(keys): + a_start, a_end = ranges[a] + for b in keys[i + 1 :]: + b_start, b_end = ranges[b] + # Time-distance between the two ranges (0 if they overlap). + if a_end < b_start: + separation = b_start - a_end + elif b_end < a_start: + separation = a_start - b_end + else: + separation = timedelta(0) + if separation <= gap: + union(a, b) + + return {aid: find(aid) for aid in ids} + + def credential_jaccard_clusterer( corpus: GeneratedCorpus, *, threshold: float = 0.5 ) -> dict[str, str]: diff --git a/tests/clustering/test_paused_campaign_fixture.py b/tests/clustering/test_paused_campaign_fixture.py new file mode 100644 index 00000000..e212615d --- /dev/null +++ b/tests/clustering/test_paused_campaign_fixture.py @@ -0,0 +1,140 @@ +""" +End-to-end pipeline test for fixture 4 (paused_campaign). + +One campaign, two operational windows separated by a multi-day +silent stretch (days 3-5, 0-indexed [2, 4]). Modeled as two DSL +actors sharing JA3 + HASSH + payload + C2 callback — the +fingerprint-stable signals a real clusterer should resolve on. +Their ``active_days`` differ so each row's sessions land in +disjoint time ranges; this is what gives the adversarial +``time_window_clusterer`` something to fragment. + +Three tests cover this: + +1. `test_paused_campaign_corpus_shape` — sanity: 2 attackers, both + share campaign id, sessions are time-disjoint across the pause + window. + +2. `test_paused_campaign_pipeline_passes_bounds` — + `fingerprint_clusterer` reference folds both rows into one + cluster (shared JA3 + HASSH). Trivially green at campaign-level + scoring; the test is a ratchet point for the real algorithm. + +3. `test_time_window_clusterer_fragments_campaign` — runs the + deliberately-bad `time_window_clusterer`. With a 4-day silent + stretch and a 1-day union threshold, the two halves cannot be + bridged → 2 clusters → completeness collapses → bound rejected. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from tests.clustering.fixture_harness import ( + assert_fixture_bounds, + fingerprint_clusterer, + time_window_clusterer, +) +from tests.clustering.metrics import score +from tests.factories.campaign_factory import generate, load_yaml + +FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" +FIXTURE_YAML = FIXTURE_DIR / "paused_campaign.yaml" +EXPECTED_YAML = FIXTURE_DIR / "paused_campaign.expected.yaml" + + +def test_paused_campaign_corpus_shape() -> None: + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + assert len(corpus.attackers) == 2 + truth_campaigns = {a.truth_campaign_id for a in corpus.attackers} + assert truth_campaigns == {"paused-campaign-001"} + # Both rows share the operator's JA3 and HASSH — load-bearing + # signal for fingerprint_clusterer to fold them. + ja3s = {a.ja3 for a in corpus.attackers} + hasshs = {a.hassh for a in corpus.attackers} + assert len(ja3s) == 1 + assert len(hasshs) == 1 + # Each row's session timeline lives in its actor's active_days. + rows_by_actor = {a.truth_actor_id: a for a in corpus.attackers} + sprint_1 = rows_by_actor["ops-sprint-1"] + sprint_2 = rows_by_actor["ops-sprint-2"] + sprint_1_days = {s.started_at.day for s in sprint_1.sessions} + sprint_2_days = {s.started_at.day for s in sprint_2.sessions} + # Epoch is 2026-01-01; active_days [0,1] → calendar days 1,2; + # active_days [5,6] → calendar days 6,7. + assert sprint_1_days <= {1, 2}, f"sprint-1 leaked outside its window: {sprint_1_days}" + assert sprint_2_days <= {6, 7}, f"sprint-2 leaked outside its window: {sprint_2_days}" + + +def test_paused_campaign_pipeline_passes_bounds() -> None: + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + metrics = assert_fixture_bounds(corpus, fingerprint_clusterer, EXPECTED_YAML) + # Both rows share fingerprints → one predicted cluster. + pred = fingerprint_clusterer(corpus) + assert len(set(pred.values())) == 1 + # Truth = 1 campaign of 2 rows; pred = 1 cluster of 2 rows → ARI 1.0. + assert metrics["adjusted_rand_index"] == pytest.approx(1.0) + + +def test_time_window_clusterer_fragments_campaign() -> None: + """ + The fixture's reason for being. With a 4-day silence between + the two operational windows and a 1-day union threshold, the + bad clusterer cannot bridge the gap. The campaign splits in + two and completeness collapses. + + If this test ever passes (time_window_clusterer satisfies the + bounds), the fixture has lost its discrimination power. + """ + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + pred = time_window_clusterer(corpus, gap_days=1.0) + assert len(set(pred.values())) == 2, ( + f"time-window clusterer should split into 2 clusters, got {len(set(pred.values()))}" + ) + + metrics = score(corpus.truth_labels(level="campaign"), pred) + assert metrics["completeness"] == pytest.approx(0.0) + + bounds = { + "adjusted_rand_index": 0.85, + "homogeneity": 0.90, + "completeness": 0.80, + "singleton_recall": 0.95, + } + breaches = [k for k, floor in bounds.items() if metrics[k] < floor] + assert "completeness" in breaches, ( + f"fixture failed to catch the bad clusterer; observed metrics: {metrics}" + ) + + +def test_time_window_clusterer_with_huge_gap_does_not_fragment() -> None: + """ + Sanity for the time-window reference: with a gap larger than + the campaign's silent stretch, the two halves union into one. + Confirms the clusterer's behavior depends on the threshold, + not on something unrelated. (Pause is days 3-5 → max separation + between session ranges is ≈4 days; gap_days=10 must bridge.) + """ + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + pred = time_window_clusterer(corpus, gap_days=10.0) + assert len(set(pred.values())) == 1 + + +def test_silent_stretch_actually_silent() -> None: + """No session may land inside the configured pause window.""" + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + pause_calendar_days = {3, 4, 5} # 1-indexed; pause_windows [[2,4]] in 0-indexed + leaked = [ + s for s in corpus.sessions + if s.started_at.day in pause_calendar_days + ] + assert not leaked, ( + f"sessions leaked into the silent stretch: " + f"{[(s.session_id, s.started_at) for s in leaked]}" + ) diff --git a/tests/factories/campaign_factory.py b/tests/factories/campaign_factory.py index 62556422..ab734f8a 100644 --- a/tests/factories/campaign_factory.py +++ b/tests/factories/campaign_factory.py @@ -331,21 +331,30 @@ def _emit_campaign( decky_choices = decky_pool # Schedule sessions across the campaign window, respecting the - # actor's hours_active_utc and pause_windows. + # actor's hours_active_utc, pause_windows, and (if specified) + # the actor's active_days. ``active_days`` (per-actor list of + # day indexes) lets a fixture bind an actor to specific days + # without affecting siblings — used by fixture 4 to model an + # operator who pauses operations between sprints. active_hours = actor_spec.get("hours_active_utc", list(range(24))) jitter = int(actor_spec.get("jitter_seconds", 60)) + non_paused = [ + d for d in range(duration_days) + if not any(s <= d <= e for s, e in pause_windows) + ] + actor_active_days = actor_spec.get("active_days") + if actor_active_days is not None: + # Intersect with non-paused so pause_windows still wins + # globally if the fixture sets both (defensive). + day_pool = [d for d in actor_active_days if d in non_paused] + else: + day_pool = non_paused + for s_idx in range(n_sessions): - day = rng.randint(0, max(0, duration_days - 1)) - if any(start <= day <= end for start, end in pause_windows): - # Skip into post-pause day. - later_days = [ - d for d in range(duration_days) - if not any(s <= d <= e for s, e in pause_windows) - ] - if not later_days: - continue - day = rng.choice(later_days) + if not day_pool: + continue + day = rng.choice(day_pool) hour = rng.choice(active_hours) day_start = epoch + timedelta(days=day) started_at = _hour_to_offset(rng, day_start, hour, jitter) diff --git a/tests/fixtures/campaigns/paused_campaign.expected.yaml b/tests/fixtures/campaigns/paused_campaign.expected.yaml new file mode 100644 index 00000000..6083334a --- /dev/null +++ b/tests/fixtures/campaigns/paused_campaign.expected.yaml @@ -0,0 +1,24 @@ +# Bounds for fixture 4 (paused_campaign). +# +# Ground truth at campaign-level: 1 campaign of 2 observation rows +# (one per DSL actor — modeling the operator's two operational +# windows). A correct algorithm scores 1.0 on every metric. +# +# Completeness is the load-bearing metric: a clusterer that lets a +# multi-day silent period split the campaign tanks completeness +# (the one true class is split across two predicted clusters, +# matching the gap). The adversarial time_window_clusterer +# demonstrates this and the bound below rejects it. +# +# This fixture is CAMPAIGN-LEVEL ONLY (see the fixture YAML for +# why). No identity-level scoring. +# +# Bounds are loose at v1; tighten as the algorithm matures. +adjusted_rand_index: + min: 0.85 +homogeneity: + min: 0.90 +completeness: + min: 0.80 +singleton_recall: + min: 0.95 diff --git a/tests/fixtures/campaigns/paused_campaign.yaml b/tests/fixtures/campaigns/paused_campaign.yaml new file mode 100644 index 00000000..ed1b4dc1 --- /dev/null +++ b/tests/fixtures/campaigns/paused_campaign.yaml @@ -0,0 +1,85 @@ +# Fixture 4 (paused_campaign) — see development/CAMPAIGN_CLUSTERING.md §2. +# +# One campaign that operates in two sprints with a multi-day silence +# between them: +# +# active days 1-2 (0-indexed [0, 1]) — Delivery, Exploitation +# silent days 3-5 (0-indexed [2, 3, 4]) — pause window +# active days 6-7 (0-indexed [5, 6]) — Discovery, Lateral Movement, +# Exfiltration +# +# Modeled as TWO DSL actors representing the same operator's two +# operational windows. Both share JA3, HASSH, payload, and C2 +# callback — the stable signals a fingerprint-driven clusterer +# resolves on. Their ``active_days`` differ so each operator-half +# emits sessions in disjoint time ranges, which is what makes the +# adversarial time-window clusterer fragment the campaign. +# +# Two-actor modeling caveat: the factory mints a separate +# ``truth_identity_id`` per DSL actor by design (see IDENTITY_ +# RESOLUTION.md — identities are recovered from signals, not +# declared in the DSL). This is a CAMPAIGN-LEVEL fixture only; +# identity-level scoring is fixture 2's job. The bound floors below +# apply at level=campaign. +# +# Pass condition: a fingerprint-driven clusterer must fold both +# operational windows into one cluster (shared JA3 + HASSH + +# payload). A clusterer that lets a multi-day quiet period split +# the campaign fails the completeness floor. +# +# Adversarial condition: ``time_window_clusterer`` (union sessions +# within ≤1 day of each other) is unable to bridge the 4-day silent +# stretch and splits the campaign into "before pause" and "after +# pause" clusters. Completeness collapses; the bound floor rejects +# this clusterer. +campaign: + id: paused-campaign-001 + duration_days: 7 + pause_windows: + - [2, 4] # campaign-wide silence days 3-5 (0-indexed) + actors: + - id: ops-sprint-1 + asn: 64520 + ip_pool: sticky + ja3: "771,4865-4866-4867-49195-49199-49196-49200,0-23-65281-10-11-35-16-5-13-18-51-45-43-27,29-23-24,0" + hassh: "paused-op-dddddddd-dddddddd-dddddddd" + hours_active_utc: [9, 10, 11, 12, 13, 14, 15, 16] + jitter_seconds: 60 + active_days: [0, 1] + - id: ops-sprint-2 + asn: 64520 # same ASN — operator stays on same egress + ip_pool: sticky + ja3: "771,4865-4866-4867-49195-49199-49196-49200,0-23-65281-10-11-35-16-5-13-18-51-45-43-27,29-23-24,0" + hassh: "paused-op-dddddddd-dddddddd-dddddddd" + hours_active_utc: [9, 10, 11, 12, 13, 14, 15, 16] + jitter_seconds: 60 + active_days: [5, 6] + phases: + - name: delivery + actor: ops-sprint-1 + target_selector: { service: ssh, count: 2 } + dwell_seconds: 1 + - name: exploitation + actor: ops-sprint-1 + tool_signature: + payload_hash: "paused-op-stage1-payload" + c2_callback: "c2.paused-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5 + - name: discovery + actor: ops-sprint-2 + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5 + - name: lateral_movement + actor: ops-sprint-2 + tool_signature: + payload_hash: "paused-op-stage1-payload" + c2_callback: "c2.paused-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5 + - name: exfiltration + actor: ops-sprint-2 + tool_signature: + c2_callback: "c2.paused-op.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 5