diff --git a/decnet/clustering/impl/similarity.py b/decnet/clustering/impl/similarity.py new file mode 100644 index 00000000..8c863cea --- /dev/null +++ b/decnet/clustering/impl/similarity.py @@ -0,0 +1,210 @@ +"""Similarity-graph primitives for the connected-components clusterer. + +Each function takes two :class:`Observation` projections and returns a +similarity score in ``[0.0, 1.0]``. The connected-components impl +(landing in subsequent commits) decides how to combine these into a +single edge weight, applies a threshold, and runs union-find. + +**Time-agnostic.** Edges MUST NOT depend on observation timestamps. +Fixture 7 (``slow_burn``) proves recency-decay clustering fragments +multi-month APT campaigns; the production graph cannot silently expire +old edges. Timestamps are still useful for *audit* (the ``first_seen`` +on the resulting identity row) but never for *similarity*. + +**Weight tiers** (from `development/IDENTITY_RESOLUTION.md`): + +* High — JA3 / HASSH / payload-hash / C2-callback exact match. Stable + signals an attacker can't cheaply rotate. A single high-tier match + supports identity strongly. +* Medium — command-sequence Jaccard, bucketed by UKC phase. Tooling + habits leak through command order; phase-bucketing avoids comparing + a Discovery cmd-list to an Exploitation one. +* Low — credential-attempt-set Jaccard. Defeated alone by fixture 1 + (``shared_wordlist``) where two campaigns share rockyou but diverge + on infra. +* Very low — ASN match. Defeated alone by fixture 2 (``vpn_hopping``) + where one identity rotates across many ASNs. + +The functions are pure (no DB, no I/O); the worker maps observations +into :class:`Observation` once per tick and feeds these into the +graph builder. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Mapping, Optional + +# ─── Observation projection ───────────────────────────────────────────────── + + +@dataclass(frozen=True) +class Observation: + """Minimal projection of a per-IP attacker observation. + + Built once per ``Attacker`` row by the worker (or per + ``SyntheticAttacker`` in tests via :func:`from_synthetic`). + Keeping the projection tight isolates the graph code from schema + drift on either side. + + All set-typed fields are :class:`frozenset` so they hash and so + callers don't accidentally mutate them mid-pass. + """ + + observation_id: str + """Stable ID — for production, the ``Attacker.uuid``; for tests, + the ``SyntheticAttacker.attacker_id``.""" + + ja3: Optional[str] = None + hassh: Optional[str] = None + asn: Optional[int] = None + + payload_hashes: frozenset[str] = field(default_factory=frozenset) + c2_endpoints: frozenset[str] = field(default_factory=frozenset) + credentials: frozenset[tuple[str, str]] = field(default_factory=frozenset) + + commands_by_phase: Mapping[str, tuple[str, ...]] = field(default_factory=dict) + """``UKCPhase.value`` → ordered command sequence observed in that + phase. Empty dict when no command-bearing sessions were seen.""" + + +# ─── Edge functions ───────────────────────────────────────────────────────── + + +def high_weight_edge(a: Observation, b: Observation) -> float: + """JA3 / HASSH / payload-hash / C2-endpoint exact match. + + Returns ``1.0`` if any of the four exact-match signals agrees + (non-null on both sides), ``0.0`` otherwise. Single-signal high-tier + agreement is by design enough to support identity — these are the + signals the design doc calls out as "stable signals an attacker + can't cheaply rotate." + + JA4 will join this tier as a sibling of JA3 once the prober emits + it (``ATTACKER_FINGERPRINTED`` already carries a JA4 slot in + ``AttackerIdentity``); the function shape doesn't change. + """ + if a.ja3 is not None and a.ja3 == b.ja3: + return 1.0 + if a.hassh is not None and a.hassh == b.hassh: + return 1.0 + if a.payload_hashes and b.payload_hashes and (a.payload_hashes & b.payload_hashes): + return 1.0 + if a.c2_endpoints and b.c2_endpoints and (a.c2_endpoints & b.c2_endpoints): + return 1.0 + return 0.0 + + +def medium_weight_edge(a: Observation, b: Observation) -> float: + """Phase-bucketed command-sequence Jaccard. + + For each UKC phase observed on both sides, computes the Jaccard + similarity of the command sets (multisets collapsed to sets — the + *order* signal is reserved for a future feature, this commit is + the scaffolding). Returns the **maximum** Jaccard across shared + phases, so a single strong phase match isn't averaged away by a + different phase where the actors diverge. + + Phase-bucketing matters: comparing a Discovery cmd-list to an + Exploitation one is meaningless. Both actors had to be in the + same phase for the comparison to count. + + Returns ``0.0`` when no phase is observed on both sides. + """ + shared_phases = set(a.commands_by_phase) & set(b.commands_by_phase) + if not shared_phases: + return 0.0 + best = 0.0 + for phase in shared_phases: + sa = set(a.commands_by_phase[phase]) + sb = set(b.commands_by_phase[phase]) + if not sa and not sb: + continue + union = sa | sb + if not union: + continue + j = len(sa & sb) / len(union) + if j > best: + best = j + return best + + +def low_weight_edge(a: Observation, b: Observation) -> float: + """Credential-attempt-set Jaccard. + + Returns the Jaccard of ``(username, password)`` tuples. Two campaigns + burning the same wordlist will score high here — fixture 1 proves + this signal is dangerous in isolation. The connected-components + impl combines this with other signals; alone it must not push a + pair over threshold. + + Returns ``0.0`` when either side attempted no credentials, or when + the union is empty. + """ + if not a.credentials or not b.credentials: + return 0.0 + union = a.credentials | b.credentials + if not union: + return 0.0 + return len(a.credentials & b.credentials) / len(union) + + +def very_low_weight_edge(a: Observation, b: Observation) -> float: + """ASN equality. + + Returns ``1.0`` iff both observations have a non-null ASN and they + match. Fixture 2 (``vpn_hopping``) proves ASN-only clustering is + a failure mode — one identity legitimately rotates across many + ASNs. The combination logic in the connected-components impl + weights this so that ASN agreement alone never crosses threshold. + """ + if a.asn is None or b.asn is None: + return 0.0 + return 1.0 if a.asn == b.asn else 0.0 + + +# ─── Adapter for the synthetic-corpus tests ───────────────────────────────── + + +def from_synthetic(att) -> Observation: # type: ignore[no-untyped-def] + """Build an :class:`Observation` from a ``SyntheticAttacker``. + + Lives here so test code doesn't import the factory shape into the + production module — the adapter is a documented integration point. + Imported lazily by callers; the production worker uses a parallel + adapter from :class:`Attacker` rows once that lands. + """ + payload_hashes: set[str] = set() + c2_endpoints: set[str] = set() + credentials: set[tuple[str, str]] = set() + commands_by_phase: dict[str, list[str]] = {} + + for s in att.sessions: + if s.payload_hash: + payload_hashes.add(s.payload_hash) + if s.c2_callback: + c2_endpoints.add(s.c2_callback) + for cred in s.credentials_tried: + credentials.add(tuple(cred)) + if s.commands: + commands_by_phase.setdefault(s.phase.value, []).extend(s.commands) + + return Observation( + observation_id=att.attacker_id, + ja3=att.ja3, + hassh=att.hassh, + asn=att.asn, + payload_hashes=frozenset(payload_hashes), + c2_endpoints=frozenset(c2_endpoints), + credentials=frozenset(credentials), + commands_by_phase={k: tuple(v) for k, v in commands_by_phase.items()}, + ) + + +__all__ = [ + "Observation", + "high_weight_edge", + "medium_weight_edge", + "low_weight_edge", + "very_low_weight_edge", + "from_synthetic", +] diff --git a/tests/clustering/test_similarity.py b/tests/clustering/test_similarity.py new file mode 100644 index 00000000..74e4f1db --- /dev/null +++ b/tests/clustering/test_similarity.py @@ -0,0 +1,221 @@ +"""Unit tests for the similarity-graph primitives. + +Each edge function is tested in isolation: agreement → high score, +disagreement → zero, missing-data → zero. Combination logic + +thresholds live in the connected-components impl and are covered by +the fixture suite once those land. +""" +from __future__ import annotations + +import pytest + +from decnet.clustering.impl.similarity import ( + Observation, + from_synthetic, + high_weight_edge, + low_weight_edge, + medium_weight_edge, + very_low_weight_edge, +) + + +def _obs(**kwargs) -> Observation: + """Build an Observation with sensible defaults for tests.""" + kwargs.setdefault("observation_id", "obs-x") + return Observation(**kwargs) + + +# ─── high_weight_edge ────────────────────────────────────────────────────── + + +def test_high_weight_ja3_match(): + a = _obs(ja3="ja3-stable") + b = _obs(ja3="ja3-stable") + assert high_weight_edge(a, b) == 1.0 + + +def test_high_weight_hassh_match(): + a = _obs(hassh="hassh-stable") + b = _obs(hassh="hassh-stable") + assert high_weight_edge(a, b) == 1.0 + + +def test_high_weight_payload_hash_overlap(): + a = _obs(payload_hashes=frozenset({"pl-1", "pl-2"})) + b = _obs(payload_hashes=frozenset({"pl-2", "pl-3"})) + assert high_weight_edge(a, b) == 1.0 + + +def test_high_weight_c2_overlap(): + a = _obs(c2_endpoints=frozenset({"c2.example.com"})) + b = _obs(c2_endpoints=frozenset({"c2.example.com", "c2-alt.example.com"})) + assert high_weight_edge(a, b) == 1.0 + + +def test_high_weight_no_match(): + a = _obs(ja3="ja3-a", hassh="hassh-a", payload_hashes=frozenset({"x"})) + b = _obs(ja3="ja3-b", hassh="hassh-b", payload_hashes=frozenset({"y"})) + assert high_weight_edge(a, b) == 0.0 + + +def test_high_weight_both_null_ja3_does_not_match(): + """Both-null JA3 must not be treated as 'agreement' — that would + fuse every un-fingerprinted noise scanner into one mega-cluster.""" + a = _obs(ja3=None, hassh=None) + b = _obs(ja3=None, hassh=None) + assert high_weight_edge(a, b) == 0.0 + + +def test_high_weight_empty_sets_no_match(): + a = _obs(payload_hashes=frozenset(), c2_endpoints=frozenset()) + b = _obs(payload_hashes=frozenset(), c2_endpoints=frozenset()) + assert high_weight_edge(a, b) == 0.0 + + +# ─── medium_weight_edge ──────────────────────────────────────────────────── + + +def test_medium_weight_jaccard_full_match_in_one_phase(): + a = _obs(commands_by_phase={"discovery": ("ls", "id", "uname -a")}) + b = _obs(commands_by_phase={"discovery": ("ls", "id", "uname -a")}) + assert medium_weight_edge(a, b) == pytest.approx(1.0) + + +def test_medium_weight_jaccard_partial_match(): + a = _obs(commands_by_phase={"discovery": ("ls", "id", "uname -a", "whoami")}) + b = _obs(commands_by_phase={"discovery": ("ls", "id")}) + # |A∩B|=2, |A∪B|=4 → 0.5 + assert medium_weight_edge(a, b) == pytest.approx(0.5) + + +def test_medium_weight_picks_max_across_phases(): + a = _obs(commands_by_phase={ + "discovery": ("ls",), + "exploitation": ("./payload", "chmod +x payload"), + }) + b = _obs(commands_by_phase={ + "discovery": ("ps",), # 0.0 + "exploitation": ("./payload", "chmod +x payload"), # 1.0 + }) + assert medium_weight_edge(a, b) == pytest.approx(1.0) + + +def test_medium_weight_no_shared_phase_returns_zero(): + a = _obs(commands_by_phase={"discovery": ("ls",)}) + b = _obs(commands_by_phase={"exploitation": ("./payload",)}) + assert medium_weight_edge(a, b) == 0.0 + + +def test_medium_weight_disjoint_commands_in_shared_phase(): + a = _obs(commands_by_phase={"discovery": ("ls",)}) + b = _obs(commands_by_phase={"discovery": ("ps",)}) + # |A∩B|=0, |A∪B|=2 + assert medium_weight_edge(a, b) == 0.0 + + +def test_medium_weight_empty_corpora_returns_zero(): + a = _obs() + b = _obs() + assert medium_weight_edge(a, b) == 0.0 + + +# ─── low_weight_edge ─────────────────────────────────────────────────────── + + +def test_low_weight_credential_jaccard_match(): + a = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")})) + b = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")})) + assert low_weight_edge(a, b) == pytest.approx(1.0) + + +def test_low_weight_credential_partial_overlap(): + a = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")})) + b = _obs(credentials=frozenset({("root", "toor"), ("user", "user")})) + assert low_weight_edge(a, b) == pytest.approx(1 / 3) + + +def test_low_weight_no_credentials_returns_zero(): + a = _obs() + b = _obs(credentials=frozenset({("root", "toor")})) + assert low_weight_edge(a, b) == 0.0 + + +# ─── very_low_weight_edge ────────────────────────────────────────────────── + + +def test_very_low_weight_asn_match(): + a = _obs(asn=64500) + b = _obs(asn=64500) + assert very_low_weight_edge(a, b) == 1.0 + + +def test_very_low_weight_asn_mismatch(): + a = _obs(asn=64500) + b = _obs(asn=64501) + assert very_low_weight_edge(a, b) == 0.0 + + +def test_very_low_weight_asn_null_returns_zero(): + a = _obs(asn=None) + b = _obs(asn=64500) + assert very_low_weight_edge(a, b) == 0.0 + + +# ─── time-agnostic invariant ─────────────────────────────────────────────── + + +def test_observations_carry_no_timestamps(): + """Compile-time guarantee: Observation has no time fields, so no + edge function can accidentally start using them. Fixture 7 forbids + recency-decay clustering.""" + field_names = set(Observation.__dataclass_fields__.keys()) + forbidden = {"first_seen", "last_seen", "started_at", "session_midpoint", "timestamp"} + assert field_names.isdisjoint(forbidden), ( + f"Observation grew time fields: {field_names & forbidden}. " + "Fixture 7 (slow_burn) forbids recency-aware clustering." + ) + + +# ─── from_synthetic adapter ──────────────────────────────────────────────── + + +def test_from_synthetic_round_trip(): + """The adapter projects a SyntheticAttacker into an Observation + that the edge functions can score over.""" + from datetime import datetime, timezone + from tests.factories.campaign_factory import ( + SyntheticAttacker, SyntheticSession, + ) + from decnet.clustering.ukc import UKCPhase + + now = datetime.now(timezone.utc) + sess = SyntheticSession( + session_id="s1", + attacker_id="a1", + decky_id="d1", + started_at=now, + duration_s=10.0, + phase=UKCPhase.DISCOVERY, + commands=["ls", "id"], + credentials_tried=[("root", "toor")], + payload_hash="pl-1", + c2_callback="c2.example.com", + truth_campaign_id="c1", + truth_actor_id="actor-1", + ) + att = SyntheticAttacker( + attacker_id="a1", ip="1.1.1.1", asn=64500, + ja3="ja3-x", hassh="hassh-y", + first_seen=now, last_seen=now, + truth_campaign_id="c1", truth_actor_id="actor-1", + sessions=[sess], + ) + obs = from_synthetic(att) + assert obs.observation_id == "a1" + assert obs.ja3 == "ja3-x" + assert obs.hassh == "hassh-y" + assert obs.asn == 64500 + assert obs.payload_hashes == frozenset({"pl-1"}) + assert obs.c2_endpoints == frozenset({"c2.example.com"}) + assert obs.credentials == frozenset({("root", "toor")}) + assert obs.commands_by_phase == {"discovery": ("ls", "id")}