Files
DECNET/tests/clustering/test_slow_burn_fixture.py

129 lines
5.2 KiB
Python

"""
End-to-end pipeline test for fixture 7 (slow_burn).
90-day APT campaign with three operational windows separated by
multi-week silences. Models the real operational tempo of an APT
working a deep nested topology (MazeNET-style): recon over weeks,
exploitation later, action-on-objectives later still. The unique
signal this fixture stresses is TIME-AGNOSTIC IDENTITY — a
clusterer that silently expires old edges fragments any campaign
that operates over months.
Three tests cover this:
1. `test_slow_burn_corpus_shape` — sanity: 3 attackers, all share
campaign id and operator fingerprint, sessions land in their
respective operational windows.
2. `test_slow_burn_pipeline_passes_bounds` —
`composite_signals_clusterer` (fingerprint OR C2 — time-agnostic)
folds all three windows into one cluster.
3. `test_recency_decay_clusterer_fragments_campaign` — runs the
deliberately-bad `recency_decay_clusterer` with a 14-day half-
life and a 0.5 weight threshold. Edges between adjacent
operational windows (24+ days apart) decay below threshold and
drop. The campaign splits into three clusters; completeness
collapses; the bound floor rejects the bad clusterer.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from tests.clustering.fixture_harness import (
assert_fixture_bounds,
composite_signals_clusterer,
recency_decay_clusterer,
)
from tests.clustering.metrics import score
from tests.factories.campaign_factory import generate, load_yaml
FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns"
FIXTURE_YAML = FIXTURE_DIR / "slow_burn.yaml"
EXPECTED_YAML = FIXTURE_DIR / "slow_burn.expected.yaml"
def test_slow_burn_corpus_shape() -> None:
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
assert len(corpus.attackers) == 3
truth_campaigns = {a.truth_campaign_id for a in corpus.attackers}
assert truth_campaigns == {"slow-burn-001"}
# Operator fingerprint stays stable across all three windows.
ja3s = {a.ja3 for a in corpus.attackers}
hasshs = {a.hassh for a in corpus.attackers}
assert len(ja3s) == 1
assert len(hasshs) == 1
# Each row's sessions land in its operational window.
by_actor = {a.truth_actor_id: a for a in corpus.attackers}
recon_days = {s.started_at.timetuple().tm_yday for s in by_actor["ops-recon"].sessions}
exploit_days = {s.started_at.timetuple().tm_yday for s in by_actor["ops-exploit"].sessions}
action_days = {s.started_at.timetuple().tm_yday for s in by_actor["ops-action"].sessions}
# Epoch is 2026-01-01 (day-of-year 1). active_days [7-11] →
# day-of-year [8-12]; [35-39] → [36-40]; [75-79] → [76-80].
assert recon_days <= {8, 9, 10, 11, 12}, recon_days
assert exploit_days <= {36, 37, 38, 39, 40}, exploit_days
assert action_days <= {76, 77, 78, 79, 80}, action_days
def test_slow_burn_pipeline_passes_bounds() -> None:
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
metrics = assert_fixture_bounds(corpus, composite_signals_clusterer, EXPECTED_YAML)
pred = composite_signals_clusterer(corpus)
assert len(set(pred.values())) == 1, (
"composite_signals_clusterer should fold all three windows into one cluster"
)
assert metrics["adjusted_rand_index"] == pytest.approx(1.0)
def test_recency_decay_clusterer_fragments_campaign() -> None:
"""
The fixture's reason for being. Recency decay with a 14-day
half-life expires edges between operational windows that are
24+ days apart, dropping their weight below the 0.5 threshold.
The campaign fragments into three clusters; completeness
collapses.
If this test ever passes (the bad clusterer satisfies the
bounds), the fixture has lost its discrimination power.
"""
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
pred = recency_decay_clusterer(corpus, half_life_days=14.0, threshold=0.5)
assert len(set(pred.values())) == 3, (
f"recency-decay clusterer should split into 3 clusters, "
f"got {len(set(pred.values()))}"
)
metrics = score(corpus.truth_labels(level="campaign"), pred)
assert metrics["completeness"] == pytest.approx(0.0)
bounds = {
"adjusted_rand_index": 0.85,
"homogeneity": 0.90,
"completeness": 0.80,
"singleton_recall": 0.95,
}
breaches = [k for k, floor in bounds.items() if metrics[k] < floor]
assert "completeness" in breaches, (
f"fixture failed to catch the bad clusterer; observed metrics: {metrics}"
)
def test_recency_decay_clusterer_with_long_halflife_does_not_fragment() -> None:
"""
Sanity for the recency-decay reference: with a half-life longer
than the campaign duration, every edge survives the decay. The
three windows union into one. Confirms the clusterer's
behavior depends on the half-life parameter, not on something
unrelated. (Half-life 365 → edges across 40 days decay to
~0.93, well above the 0.5 threshold.)
"""
spec = load_yaml(FIXTURE_YAML)
corpus = generate(spec, seed=0)
pred = recency_decay_clusterer(corpus, half_life_days=365.0, threshold=0.5)
assert len(set(pred.values())) == 1