diff --git a/tests/clustering/fixture_harness.py b/tests/clustering/fixture_harness.py index b128d679..6b5abf2c 100644 --- a/tests/clustering/fixture_harness.py +++ b/tests/clustering/fixture_harness.py @@ -59,6 +59,14 @@ cluster on, not the quality of the result. ``(ja3, hassh)`` match OR shared C2 callback into the same cluster. Approximates the planned similarity graph well enough to score the combined-corpus fixture (fixture 6, noise_floor). + +* `recency_decay_clusterer` — deliberately-bad reference that + starts from the same composite signal graph but weights each + edge by ``exp(-time_distance / half_life_days)`` and drops + edges below a threshold. Adversarial reference for fixture 7 + (slow_burn): the canonical production failure mode where a + graph clusterer with recency decay fragments long-running + APT campaigns by silently expiring multi-week-old edges. """ from __future__ import annotations @@ -284,6 +292,90 @@ def composite_signals_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: return pred +def recency_decay_clusterer( + corpus: GeneratedCorpus, + *, + half_life_days: float = 14.0, + threshold: float = 0.5, +) -> dict[str, str]: + """Composite-signal graph with exponential time decay on edges. + + Same edge construction as ``composite_signals_clusterer`` + (fingerprint match OR overlapping C2), but each edge's weight + is multiplied by ``exp(-time_distance / half_life_days)`` where + ``time_distance`` is the gap (in days) between the two attackers' + session-midpoint timestamps. Edges with decayed weight below + ``threshold`` are dropped before connected components are + extracted. + + Deliberately-bad reference for fixture 7 (slow_burn): an APT + campaign that operates over months will be fragmented by any + clusterer that silently expires old edges. This is the canonical + production failure mode for recency-weighted graph clustering on + long-running threat actors. + + Attackers with no signals or no sessions stay singleton. + """ + import math + from datetime import timedelta + + callbacks: dict[str, set[str]] = {} + fingerprint: dict[str, tuple[str | None, str | None] | None] = {} + midpoint: dict[str, "object | None"] = {} + for att in corpus.attackers: + callbacks[att.attacker_id] = { + s.c2_callback for s in att.sessions if s.c2_callback + } + if att.ja3 is None and att.hassh is None: + fingerprint[att.attacker_id] = None + else: + fingerprint[att.attacker_id] = (att.ja3, att.hassh) + if att.sessions: + starts = [s.started_at for s in att.sessions] + ends = [s.started_at + timedelta(seconds=s.duration_s) for s in att.sessions] + mid = min(starts) + (max(ends) - min(starts)) / 2 + midpoint[att.attacker_id] = mid + else: + midpoint[att.attacker_id] = None + + ids = list(callbacks.keys()) + _parent, find, union = _union_find(ids) + + def edge_strength(a: str, b: str) -> float: + """Base signal strength before time decay; 1.0 on match, else 0.""" + fa, fb = fingerprint[a], fingerprint[b] + if fa is not None and fb is not None and fa == fb: + return 1.0 + sa, sb = callbacks[a], callbacks[b] + if sa and sb and (sa & sb): + return 1.0 + return 0.0 + + for i, a in enumerate(ids): + ma = midpoint[a] + if ma is None: + continue + for b in ids[i + 1 :]: + mb = midpoint[b] + if mb is None: + continue + base = edge_strength(a, b) + if base <= 0.0: + continue + gap_days = abs((ma - mb).total_seconds()) / 86400.0 + weight = base * math.exp(-gap_days / half_life_days) + if weight >= threshold: + union(a, b) + + pred: dict[str, str] = {} + for aid in ids: + if fingerprint[aid] is None and not callbacks[aid]: + pred[aid] = f"recency-singleton-{aid}" + else: + pred[aid] = f"recency-{find(aid)}" + return pred + + def time_window_clusterer( corpus: GeneratedCorpus, *, gap_days: float = 1.0 ) -> dict[str, str]: diff --git a/tests/clustering/test_slow_burn_fixture.py b/tests/clustering/test_slow_burn_fixture.py new file mode 100644 index 00000000..0d92b00b --- /dev/null +++ b/tests/clustering/test_slow_burn_fixture.py @@ -0,0 +1,128 @@ +""" +End-to-end pipeline test for fixture 7 (slow_burn). + +90-day APT campaign with three operational windows separated by +multi-week silences. Models the real operational tempo of an APT +working a deep nested topology (MazeNET-style): recon over weeks, +exploitation later, action-on-objectives later still. The unique +signal this fixture stresses is TIME-AGNOSTIC IDENTITY — a +clusterer that silently expires old edges fragments any campaign +that operates over months. + +Three tests cover this: + +1. `test_slow_burn_corpus_shape` — sanity: 3 attackers, all share + campaign id and operator fingerprint, sessions land in their + respective operational windows. + +2. `test_slow_burn_pipeline_passes_bounds` — + `composite_signals_clusterer` (fingerprint OR C2 — time-agnostic) + folds all three windows into one cluster. + +3. `test_recency_decay_clusterer_fragments_campaign` — runs the + deliberately-bad `recency_decay_clusterer` with a 14-day half- + life and a 0.5 weight threshold. Edges between adjacent + operational windows (24+ days apart) decay below threshold and + drop. The campaign splits into three clusters; completeness + collapses; the bound floor rejects the bad clusterer. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from tests.clustering.fixture_harness import ( + assert_fixture_bounds, + composite_signals_clusterer, + recency_decay_clusterer, +) +from tests.clustering.metrics import score +from tests.factories.campaign_factory import generate, load_yaml + +FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" +FIXTURE_YAML = FIXTURE_DIR / "slow_burn.yaml" +EXPECTED_YAML = FIXTURE_DIR / "slow_burn.expected.yaml" + + +def test_slow_burn_corpus_shape() -> None: + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + assert len(corpus.attackers) == 3 + truth_campaigns = {a.truth_campaign_id for a in corpus.attackers} + assert truth_campaigns == {"slow-burn-001"} + # Operator fingerprint stays stable across all three windows. + ja3s = {a.ja3 for a in corpus.attackers} + hasshs = {a.hassh for a in corpus.attackers} + assert len(ja3s) == 1 + assert len(hasshs) == 1 + # Each row's sessions land in its operational window. + by_actor = {a.truth_actor_id: a for a in corpus.attackers} + recon_days = {s.started_at.timetuple().tm_yday for s in by_actor["ops-recon"].sessions} + exploit_days = {s.started_at.timetuple().tm_yday for s in by_actor["ops-exploit"].sessions} + action_days = {s.started_at.timetuple().tm_yday for s in by_actor["ops-action"].sessions} + # Epoch is 2026-01-01 (day-of-year 1). active_days [7-11] → + # day-of-year [8-12]; [35-39] → [36-40]; [75-79] → [76-80]. + assert recon_days <= {8, 9, 10, 11, 12}, recon_days + assert exploit_days <= {36, 37, 38, 39, 40}, exploit_days + assert action_days <= {76, 77, 78, 79, 80}, action_days + + +def test_slow_burn_pipeline_passes_bounds() -> None: + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + metrics = assert_fixture_bounds(corpus, composite_signals_clusterer, EXPECTED_YAML) + pred = composite_signals_clusterer(corpus) + assert len(set(pred.values())) == 1, ( + "composite_signals_clusterer should fold all three windows into one cluster" + ) + assert metrics["adjusted_rand_index"] == pytest.approx(1.0) + + +def test_recency_decay_clusterer_fragments_campaign() -> None: + """ + The fixture's reason for being. Recency decay with a 14-day + half-life expires edges between operational windows that are + 24+ days apart, dropping their weight below the 0.5 threshold. + The campaign fragments into three clusters; completeness + collapses. + + If this test ever passes (the bad clusterer satisfies the + bounds), the fixture has lost its discrimination power. + """ + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + pred = recency_decay_clusterer(corpus, half_life_days=14.0, threshold=0.5) + assert len(set(pred.values())) == 3, ( + f"recency-decay clusterer should split into 3 clusters, " + f"got {len(set(pred.values()))}" + ) + + metrics = score(corpus.truth_labels(level="campaign"), pred) + assert metrics["completeness"] == pytest.approx(0.0) + + bounds = { + "adjusted_rand_index": 0.85, + "homogeneity": 0.90, + "completeness": 0.80, + "singleton_recall": 0.95, + } + breaches = [k for k, floor in bounds.items() if metrics[k] < floor] + assert "completeness" in breaches, ( + f"fixture failed to catch the bad clusterer; observed metrics: {metrics}" + ) + + +def test_recency_decay_clusterer_with_long_halflife_does_not_fragment() -> None: + """ + Sanity for the recency-decay reference: with a half-life longer + than the campaign duration, every edge survives the decay. The + three windows union into one. Confirms the clusterer's + behavior depends on the half-life parameter, not on something + unrelated. (Half-life 365 → edges across 40 days decay to + ~0.93, well above the 0.5 threshold.) + """ + spec = load_yaml(FIXTURE_YAML) + corpus = generate(spec, seed=0) + pred = recency_decay_clusterer(corpus, half_life_days=365.0, threshold=0.5) + assert len(set(pred.values())) == 1 diff --git a/tests/fixtures/campaigns/slow_burn.expected.yaml b/tests/fixtures/campaigns/slow_burn.expected.yaml new file mode 100644 index 00000000..5a847638 --- /dev/null +++ b/tests/fixtures/campaigns/slow_burn.expected.yaml @@ -0,0 +1,24 @@ +# Bounds for fixture 7 (slow_burn). +# +# Ground truth at campaign-level: 1 campaign of 3 observation rows +# (one per operational window — recon, exploit, action). A correct +# algorithm scores 1.0 across every metric on this fixture. +# +# Completeness is the load-bearing metric: a clusterer that lets +# multi-week silence fragment the campaign tanks completeness (the +# one true class is split across the operational windows). The +# adversarial recency_decay_clusterer demonstrates this and the +# bound below rejects it. +# +# Campaign-level fixture only — the three DSL actors model the +# operator's three operational windows by design. +# +# Bounds are loose at v1; tighten as the algorithm matures. +adjusted_rand_index: + min: 0.85 +homogeneity: + min: 0.90 +completeness: + min: 0.80 +singleton_recall: + min: 0.95 diff --git a/tests/fixtures/campaigns/slow_burn.yaml b/tests/fixtures/campaigns/slow_burn.yaml new file mode 100644 index 00000000..59d195fb --- /dev/null +++ b/tests/fixtures/campaigns/slow_burn.yaml @@ -0,0 +1,119 @@ +# Fixture 7 (slow_burn) — see development/CAMPAIGN_CLUSTERING.md §2. +# +# Multi-month APT campaign. The unique signal this fixture stresses +# is OPERATIONAL TEMPO: APTs (real ones, not skiddies) take their +# time. Recon over weeks, exploitation later, action-on-objectives +# later still. Long stretches of true silence between phases. +# Compresses-to-three-days adversaries this is not. +# +# A MazeNET-style deep nested topology (DECNET's recursive DAG mode) +# is exactly what an APT operator burns weeks against — mapping +# decoy networks, working out which subnet looks productive, only +# then committing to exploitation. This fixture encodes that tempo +# as a 90-day campaign with three operational windows: +# +# week 2 (days 7-11) Delivery, Discovery +# month 2 (days 35-39) Exploitation, Persistence +# month 3 (days 75-79) Lateral Movement, Collection, Exfiltration +# +# Modeled as three DSL actors representing the same operator's three +# operational phases (same modeling caveat as fixtures 4 and 5: the +# factory mints a separate truth_identity_id per DSL actor; this is +# a CAMPAIGN-LEVEL fixture only). All three share JA3 + HASSH + +# payload + C2 callback — the operator's toolchain stays stable +# across the campaign. +# +# Pass condition: composite_signals_clusterer (fingerprint OR C2) +# folds all three windows into one cluster regardless of when they +# happened. Time-agnostic edge construction is what makes this work. +# +# Adversarial condition: recency_decay_clusterer with a 14-day +# half-life and a 0.5 weight threshold cannot bridge the multi-week +# silences. Edges between week-2 and month-2 (≥24 days) decay to +# ~exp(-24/14) ≈ 0.18 < 0.5 → dropped. Edges between month-2 and +# month-3 (≥36 days) decay to ~exp(-36/14) ≈ 0.075 → dropped. The +# campaign fragments into three clusters; completeness collapses. +# +# This is the canonical production failure mode for graph-based +# clusterers that silently expire old edges to bound memory or +# bias toward "what's hot." Catching it in synthetic data is what +# this fixture exists for. +campaign: + id: slow-burn-001 + duration_days: 90 + actors: + - id: ops-recon + asn: 64540 + ip_pool: sticky + ja3: "771,4865-4866-4867-49195-49199-49196-49200-156-157-47-53,0-23-65281-10-11-35-16-5-13-18-51-45-43-27-17513,29-23-24,0" + hassh: "slow-burn-gggggggg-gggggggg-gggggggg" + hours_active_utc: [3, 4, 5] + jitter_seconds: 60 + active_days: [7, 8, 9, 10, 11] + - id: ops-exploit + asn: 64541 + ip_pool: sticky + ja3: "771,4865-4866-4867-49195-49199-49196-49200-156-157-47-53,0-23-65281-10-11-35-16-5-13-18-51-45-43-27-17513,29-23-24,0" + hassh: "slow-burn-gggggggg-gggggggg-gggggggg" + hours_active_utc: [3, 4, 5] + jitter_seconds: 60 + active_days: [35, 36, 37, 38, 39] + - id: ops-action + asn: 64542 + ip_pool: sticky + ja3: "771,4865-4866-4867-49195-49199-49196-49200-156-157-47-53,0-23-65281-10-11-35-16-5-13-18-51-45-43-27-17513,29-23-24,0" + hassh: "slow-burn-gggggggg-gggggggg-gggggggg" + hours_active_utc: [3, 4, 5] + jitter_seconds: 60 + active_days: [75, 76, 77, 78, 79] + phases: + # Week 2 — recon window. Delivery probes, discovery against the + # MazeNET surface to identify productive subnets. + - name: delivery + actor: ops-recon + tool_signature: + c2_callback: "c2.slow-burn.example" + target_selector: { service: any, count: 3 } + dwell_seconds: 1 + - name: discovery + actor: ops-recon + tool_signature: + c2_callback: "c2.slow-burn.example" + target_selector: { service: any, count: 3 } + dwell_seconds: 5 + # Month 2 — exploitation. Operator commits to one of the + # productive subnets identified during recon. + - name: exploitation + actor: ops-exploit + tool_signature: + payload_hash: "slow-burn-stage1-payload" + c2_callback: "c2.slow-burn.example" + target_selector: { service: ssh, count: 3 } + dwell_seconds: 10 + - name: persistence + actor: ops-exploit + tool_signature: + c2_callback: "c2.slow-burn.example" + target_selector: { decky: previous_success, count: 2 } + dwell_seconds: 10 + # Month 3 — actions on objectives. Lateral movement, collection, + # exfil — only after the operator has confidence in the foothold. + - name: lateral_movement + actor: ops-action + tool_signature: + c2_callback: "c2.slow-burn.example" + target_selector: { service: ssh, count: 3 } + dwell_seconds: 10 + - name: collection + actor: ops-action + tool_signature: + payload_hash: "slow-burn-stage1-payload" + c2_callback: "c2.slow-burn.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 10 + - name: exfiltration + actor: ops-action + tool_signature: + c2_callback: "c2.slow-burn.example" + target_selector: { service: ssh, count: 2 } + dwell_seconds: 10