diff --git a/decnet/clustering/impl/connected_components.py b/decnet/clustering/impl/connected_components.py index 1aa44d19..f40d1eaf 100644 --- a/decnet/clustering/impl/connected_components.py +++ b/decnet/clustering/impl/connected_components.py @@ -34,8 +34,9 @@ from typing import Any, Iterable, Optional from decnet.clustering.base import Clusterer, ClusterResult from decnet.clustering.impl.similarity import ( + EDGE_THRESHOLD, Observation, - high_weight_edge, + combined_edge_weight, ) from decnet.logging import get_logger from decnet.web.db.repository import BaseRepository @@ -43,13 +44,6 @@ from decnet.web.db.repository import BaseRepository log = get_logger("clustering.connected_components") -# Threshold above which an edge survives into the graph. The high-tier -# functions return 1.0 on agreement, so a literal >= 1.0 cutoff means -# "exact match required." Once medium-tier edges combine, this becomes -# a tunable. -_EDGE_THRESHOLD = 1.0 - - def cluster_observations( observations: Iterable[Observation], ) -> dict[str, str]: @@ -81,7 +75,7 @@ def cluster_observations( for i, a in enumerate(obs_list): for b in obs_list[i + 1:]: - if high_weight_edge(a, b) >= _EDGE_THRESHOLD: + if combined_edge_weight(a, b) >= EDGE_THRESHOLD: union(a.observation_id, b.observation_id) # Roots: each unique find(o) is a component representative. Use diff --git a/decnet/clustering/impl/similarity.py b/decnet/clustering/impl/similarity.py index 8c863cea..a22c1f9c 100644 --- a/decnet/clustering/impl/similarity.py +++ b/decnet/clustering/impl/similarity.py @@ -162,6 +162,63 @@ def very_low_weight_edge(a: Observation, b: Observation) -> float: return 1.0 if a.asn == b.asn else 0.0 +# ─── Combined weight ──────────────────────────────────────────────────────── + +#: Tier multipliers applied to the per-tier edge scores when combining +#: into a single weight. Tuned so that: +#: +#: * High-tier agreement alone (1.0) crosses the 1.0 threshold. +#: * Medium-tier alone (max 1.0) yields 0.6 — below threshold. +#: * Low-tier alone (max 1.0) yields 0.2 — defeats fixture 1's +#: credential-overlap-only failure mode. +#: * Very-low alone (max 1.0) yields 0.05 — defeats fixture 2's +#: ASN-rotation failure mode. +#: +#: The ratio between tiers matters more than the absolute values: a +#: tier should never combine its way past threshold without help from +#: a stronger one. +TIER_WEIGHTS = { + "high": 1.0, + "medium": 0.6, + "low": 0.2, + "very_low": 0.05, +} + +#: Threshold a combined edge weight must meet to survive into the +#: similarity graph. The connected-components impl drops anything +#: under this before running union-find. +EDGE_THRESHOLD = 1.0 + + +def combined_edge_weight(a: Observation, b: Observation) -> float: + """Sum of all four tier scores, weighted by :data:`TIER_WEIGHTS`. + + Each per-tier function returns a score in ``[0, 1]``; the + weighted sum lets stronger tiers dominate without letting weaker + ones combine their way past threshold. + + The connected-components clusterer compares this against + :data:`EDGE_THRESHOLD` to decide whether to draw an edge. Pure / + time-agnostic — fixture 7 forbids recency-decay weighting. + + Commits 5–7 land each tier in the call site: + + * Commit 5 (this commit): high + medium. + * Commit 6: + phase-handoff (a separate edge family, not a tier). + * Commit 7: + low + very_low. + + Until commit 7 lands, the low / very_low contributions stay zero + by virtue of the underlying functions returning ``0.0`` whenever + their inputs are missing. The combination is forward-compatible. + """ + return ( + TIER_WEIGHTS["high"] * high_weight_edge(a, b) + + TIER_WEIGHTS["medium"] * medium_weight_edge(a, b) + + TIER_WEIGHTS["low"] * low_weight_edge(a, b) + + TIER_WEIGHTS["very_low"] * very_low_weight_edge(a, b) + ) + + # ─── Adapter for the synthetic-corpus tests ───────────────────────────────── @@ -206,5 +263,8 @@ __all__ = [ "medium_weight_edge", "low_weight_edge", "very_low_weight_edge", + "combined_edge_weight", "from_synthetic", + "EDGE_THRESHOLD", + "TIER_WEIGHTS", ] diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py index 89b0a4c1..7a56fe06 100644 --- a/tests/clustering/test_connected_components.py +++ b/tests/clustering/test_connected_components.py @@ -287,6 +287,36 @@ def test_shared_wordlist_passes_with_production_clusterer(): ) +def test_paused_campaign_passes_with_production_clusterer(): + """Fixture 4: one campaign split across two operational windows by + a multi-day silence. Both halves share JA3 + HASSH + payload + C2; + the production clusterer must fold them into one identity. Time- + agnostic invariant: the silence window is irrelevant to clustering.""" + from tests.clustering.fixture_harness import assert_fixture_bounds + from tests.factories.campaign_factory import generate, load_yaml + + corpus = generate(load_yaml(FIXTURE_DIR / "paused_campaign.yaml"), seed=0) + assert_fixture_bounds( + corpus, _production_clusterer_predict, + FIXTURE_DIR / "paused_campaign.expected.yaml", + ) + + +def test_cluster_observations_medium_alone_does_not_fuse(): + """Two observations sharing only command-sequence (medium-tier) + must stay in distinct clusters — medium is a supporting signal.""" + a = Observation( + observation_id="a", + commands_by_phase={"discovery": ("ls", "id", "uname")}, + ) + b = Observation( + observation_id="b", + commands_by_phase={"discovery": ("ls", "id", "uname")}, + ) + labels = cluster_observations([a, b]) + assert labels["a"] != labels["b"] + + def test_vpn_hopping_passes_at_identity_level_with_production_clusterer(): """Fixture 2: one rotating actor with stable JA3 + HASSH across 5 ASNs. The production clusterer must fold all 5 observations into diff --git a/tests/clustering/test_similarity.py b/tests/clustering/test_similarity.py index 74e4f1db..24f21a7d 100644 --- a/tests/clustering/test_similarity.py +++ b/tests/clustering/test_similarity.py @@ -10,7 +10,9 @@ from __future__ import annotations import pytest from decnet.clustering.impl.similarity import ( + EDGE_THRESHOLD, Observation, + combined_edge_weight, from_synthetic, high_weight_edge, low_weight_edge, @@ -179,6 +181,70 @@ def test_observations_carry_no_timestamps(): # ─── from_synthetic adapter ──────────────────────────────────────────────── +# ─── combined_edge_weight tier discipline ───────────────────────────────── + + +def test_combined_high_alone_crosses_threshold(): + a = _obs(ja3="ja3-shared") + b = _obs(ja3="ja3-shared") + assert combined_edge_weight(a, b) >= EDGE_THRESHOLD + + +def test_combined_medium_alone_below_threshold(): + """Single medium-tier match must NOT cluster — medium is a + supporting signal, never a clustering driver on its own.""" + a = _obs(commands_by_phase={"discovery": ("ls", "id", "uname")}) + b = _obs(commands_by_phase={"discovery": ("ls", "id", "uname")}) + weight = combined_edge_weight(a, b) + assert 0 < weight < EDGE_THRESHOLD + + +def test_combined_low_alone_below_threshold(): + """Credential-only overlap must NOT cluster — fixture 1's failure mode.""" + a = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")})) + b = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")})) + weight = combined_edge_weight(a, b) + assert 0 < weight < EDGE_THRESHOLD + + +def test_combined_very_low_alone_below_threshold(): + """ASN-only overlap must NOT cluster — fixture 2's failure mode.""" + a = _obs(asn=64500) + b = _obs(asn=64500) + weight = combined_edge_weight(a, b) + assert 0 < weight < EDGE_THRESHOLD + + +def test_combined_all_weak_tiers_still_below_threshold(): + """Even all three weaker tiers stacked don't reach threshold — + only a high-tier signal does.""" + a = _obs( + asn=64500, + credentials=frozenset({("root", "toor")}), + commands_by_phase={"discovery": ("ls",)}, + ) + b = _obs( + asn=64500, + credentials=frozenset({("root", "toor")}), + commands_by_phase={"discovery": ("ls",)}, + ) + # 0.6*1.0 (medium) + 0.2*1.0 (low) + 0.05*1.0 (very_low) = 0.85 + weight = combined_edge_weight(a, b) + assert weight < EDGE_THRESHOLD + + +def test_combined_high_plus_medium_clusters(): + a = _obs(ja3="ja3-x", commands_by_phase={"discovery": ("ls",)}) + b = _obs(ja3="ja3-x", commands_by_phase={"discovery": ("ls",)}) + assert combined_edge_weight(a, b) >= EDGE_THRESHOLD + + +def test_combined_no_signal_returns_zero(): + a = _obs() + b = _obs() + assert combined_edge_weight(a, b) == 0.0 + + def test_from_synthetic_round_trip(): """The adapter projects a SyntheticAttacker into an Observation that the edge functions can score over."""