diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py index 8f65e67c..38559c40 100644 --- a/tests/clustering/test_connected_components.py +++ b/tests/clustering/test_connected_components.py @@ -414,6 +414,57 @@ def test_cluster_observations_medium_alone_does_not_fuse(): assert labels["a"] != labels["b"] +def test_slow_burn_passes_with_production_clusterer(): + """Fixture 7 (slow_burn): one campaign across 3 multi-week operational + windows. Shared JA3 + HASSH + C2 across all 3 actors. The production + clusterer must fold them into one cluster — *despite* the multi-week + silence between windows. Time-agnostic invariant in action.""" + from tests.clustering.fixture_harness import assert_fixture_bounds + from tests.factories.campaign_factory import generate, load_yaml + + corpus = generate(load_yaml(FIXTURE_DIR / "slow_burn.yaml"), seed=0) + metrics = assert_fixture_bounds( + corpus, _production_clusterer_predict, + FIXTURE_DIR / "slow_burn.expected.yaml", + ) + pred = _production_clusterer_predict(corpus) + # All three operational windows in one cluster — the F7 contract. + assert len(set(pred.values())) == 1 + assert metrics["completeness"] == pytest.approx(1.0) + + +def test_slow_burn_time_shift_invariance(): + """Time-agnostic invariant in execution: shifting every observation's + session timestamps by an arbitrary delta must not change the + predicted clusters. This is the runtime counterpart of the + Observation-no-time-fields static check in test_similarity.py.""" + from datetime import timedelta + from tests.factories.campaign_factory import generate, load_yaml + + corpus = generate(load_yaml(FIXTURE_DIR / "slow_burn.yaml"), seed=0) + baseline = _production_clusterer_predict(corpus) + + # Shift every session by +90 days (a full multi-month gap) and + # re-cluster. Predicted membership must be identical. + for att in corpus.attackers: + att.first_seen += timedelta(days=90) + att.last_seen += timedelta(days=90) + for s in att.sessions: + s.started_at += timedelta(days=90) + + shifted = _production_clusterer_predict(corpus) + # Cluster ids may differ as opaque labels but membership groupings + # must match. Convert each prediction to canonical form: a set of + # frozensets of co-clustered observation_ids. + def _canonical(pred: dict[str, str]) -> set[frozenset[str]]: + groups: dict[str, set[str]] = {} + for oid, cid in pred.items(): + groups.setdefault(cid, set()).add(oid) + return {frozenset(g) for g in groups.values()} + + assert _canonical(baseline) == _canonical(shifted) + + def test_vpn_hopping_passes_at_identity_level_with_production_clusterer(): """Fixture 2: one rotating actor with stable JA3 + HASSH across 5 ASNs. The production clusterer must fold all 5 observations into