test(clustering): F7 slow-burn time-agnostic invariant

Fixture 7 ratchet: one campaign across 3 multi-week operational
windows with stable JA3 + HASSH + C2. The production clusterer must
fold all 3 into one cluster despite multi-week silence between
windows; completeness = 1.0.

Time-shift invariance test: applying a +90 day delta to every
session start (and the per-attacker first/last seen) must produce
the same cluster membership as the baseline. This is the runtime
counterpart of the static no-time-fields check on Observation. If
either check ever fails, the clusterer has accidentally grown a
recency-aware edge — fixture 7's whole reason for existing.
This commit is contained in:
2026-04-26 08:26:23 -04:00
parent 6a4592a8f5
commit 7923006203

View File

@@ -414,6 +414,57 @@ def test_cluster_observations_medium_alone_does_not_fuse():
assert labels["a"] != labels["b"]
def test_slow_burn_passes_with_production_clusterer():
"""Fixture 7 (slow_burn): one campaign across 3 multi-week operational
windows. Shared JA3 + HASSH + C2 across all 3 actors. The production
clusterer must fold them into one cluster — *despite* the multi-week
silence between windows. Time-agnostic invariant in action."""
from tests.clustering.fixture_harness import assert_fixture_bounds
from tests.factories.campaign_factory import generate, load_yaml
corpus = generate(load_yaml(FIXTURE_DIR / "slow_burn.yaml"), seed=0)
metrics = assert_fixture_bounds(
corpus, _production_clusterer_predict,
FIXTURE_DIR / "slow_burn.expected.yaml",
)
pred = _production_clusterer_predict(corpus)
# All three operational windows in one cluster — the F7 contract.
assert len(set(pred.values())) == 1
assert metrics["completeness"] == pytest.approx(1.0)
def test_slow_burn_time_shift_invariance():
"""Time-agnostic invariant in execution: shifting every observation's
session timestamps by an arbitrary delta must not change the
predicted clusters. This is the runtime counterpart of the
Observation-no-time-fields static check in test_similarity.py."""
from datetime import timedelta
from tests.factories.campaign_factory import generate, load_yaml
corpus = generate(load_yaml(FIXTURE_DIR / "slow_burn.yaml"), seed=0)
baseline = _production_clusterer_predict(corpus)
# Shift every session by +90 days (a full multi-month gap) and
# re-cluster. Predicted membership must be identical.
for att in corpus.attackers:
att.first_seen += timedelta(days=90)
att.last_seen += timedelta(days=90)
for s in att.sessions:
s.started_at += timedelta(days=90)
shifted = _production_clusterer_predict(corpus)
# Cluster ids may differ as opaque labels but membership groupings
# must match. Convert each prediction to canonical form: a set of
# frozensets of co-clustered observation_ids.
def _canonical(pred: dict[str, str]) -> set[frozenset[str]]:
groups: dict[str, set[str]] = {}
for oid, cid in pred.items():
groups.setdefault(cid, set()).add(oid)
return {frozenset(g) for g in groups.values()}
assert _canonical(baseline) == _canonical(shifted)
def test_vpn_hopping_passes_at_identity_level_with_production_clusterer():
"""Fixture 2: one rotating actor with stable JA3 + HASSH across
5 ASNs. The production clusterer must fold all 5 observations into