# SPDX-License-Identifier: AGPL-3.0-or-later """Similarity-graph primitives for the connected-components clusterer. Each function takes two :class:`Observation` projections and returns a similarity score in ``[0.0, 1.0]``. The connected-components impl (landing in subsequent commits) decides how to combine these into a single edge weight, applies a threshold, and runs union-find. **Time-agnostic.** Edges MUST NOT depend on observation timestamps. Fixture 7 (``slow_burn``) proves recency-decay clustering fragments multi-month APT campaigns; the production graph cannot silently expire old edges. Timestamps are still useful for *audit* (the ``first_seen`` on the resulting identity row) but never for *similarity*. **Weight tiers** (from `development/IDENTITY_RESOLUTION.md`): * High — JA3 / HASSH / payload-hash / C2-callback exact match. Stable signals an attacker can't cheaply rotate. A single high-tier match supports identity strongly. * Medium — command-sequence Jaccard, bucketed by UKC phase. Tooling habits leak through command order; phase-bucketing avoids comparing a Discovery cmd-list to an Exploitation one. * Low — credential-attempt-set Jaccard. Defeated alone by fixture 1 (``shared_wordlist``) where two campaigns share rockyou but diverge on infra. * Very low — ASN match. Defeated alone by fixture 2 (``vpn_hopping``) where one identity rotates across many ASNs. The functions are pure (no DB, no I/O); the worker maps observations into :class:`Observation` once per tick and feeds these into the graph builder. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Mapping, Optional # ─── Observation projection ───────────────────────────────────────────────── @dataclass(frozen=True) class Observation: """Minimal projection of a per-IP attacker observation. Built once per ``Attacker`` row by the worker (or per ``SyntheticAttacker`` in tests via :func:`from_synthetic`). Keeping the projection tight isolates the graph code from schema drift on either side. All set-typed fields are :class:`frozenset` so they hash and so callers don't accidentally mutate them mid-pass. """ observation_id: str """Stable ID — for production, the ``Attacker.uuid``; for tests, the ``SyntheticAttacker.attacker_id``.""" ja3: Optional[str] = None hassh: Optional[str] = None asn: Optional[int] = None payload_hashes: frozenset[str] = field(default_factory=frozenset) c2_endpoints: frozenset[str] = field(default_factory=frozenset) credentials: frozenset[tuple[str, str]] = field(default_factory=frozenset) commands_by_phase: Mapping[str, tuple[str, ...]] = field(default_factory=dict) """``UKCPhase.value`` → ordered command sequence observed in that phase. Empty dict when no command-bearing sessions were seen.""" # ─── Edge functions ───────────────────────────────────────────────────────── def _fingerprints_fully_disagree(a: Observation, b: Observation) -> bool: """True iff every comparable fingerprint slot disagrees. "Comparable" = both sides have a non-null value for that slot. Used as a soft-veto on shared C2 / payload signals: when two observations have distinct stable TLS + SSH stacks, sharing a C2 endpoint is a *campaign*-level signal (cooperating operators, distinct identities) — not an identity-level one. Fixture 5 (``multi_operator``) is the canonical demonstration. Returns ``False`` when no fingerprint slot is comparable (any-null cases) — without evidence of disagreement we don't veto. Also ``False`` when at least one slot agrees. """ ja3_comparable = a.ja3 is not None and b.ja3 is not None hassh_comparable = a.hassh is not None and b.hassh is not None if not (ja3_comparable or hassh_comparable): return False if ja3_comparable and a.ja3 == b.ja3: return False if hassh_comparable and a.hassh == b.hassh: return False if ja3_comparable and hassh_comparable: return a.ja3 != b.ja3 and a.hassh != b.hassh return True # exactly one slot is comparable, and it disagrees def high_weight_edge(a: Observation, b: Observation) -> float: """JA3 / HASSH / payload-hash / C2-endpoint exact match. Returns ``1.0`` if any of the four exact-match signals agrees (non-null on both sides), ``0.0`` otherwise. Single-signal high-tier agreement is by design enough to support identity — these are the signals the design doc calls out as "stable signals an attacker can't cheaply rotate." **Fingerprint-disagreement veto.** Payload and C2 are infra signals that two cooperating operators (different identities) can share. JA3 + HASSH are tooling signals that differ when the operators are actually different humans with different tool stacks. So when the available fingerprint slots fully disagree, we drop the payload/C2 contribution to zero — preventing a campaign-level co-op signal from fusing two distinct identities. Fixture 5 (``multi_operator``) is the canonical demonstration: shared stage-1 payload + shared C2, distinct JA3/HASSH per operator — must stay two identities. JA3 / HASSH agreement still returns ``1.0`` directly, since by definition no veto applies when something agrees. JA4 will join this tier as a sibling of JA3 once the prober emits it (``ATTACKER_FINGERPRINTED`` already carries a JA4 slot in ``AttackerIdentity``); the function shape doesn't change. """ if a.ja3 is not None and a.ja3 == b.ja3: return 1.0 if a.hassh is not None and a.hassh == b.hassh: return 1.0 if _fingerprints_fully_disagree(a, b): # Stable-tool disagreement vetoes shared-infra signals. return 0.0 if a.payload_hashes and b.payload_hashes and (a.payload_hashes & b.payload_hashes): return 1.0 if a.c2_endpoints and b.c2_endpoints and (a.c2_endpoints & b.c2_endpoints): return 1.0 return 0.0 def medium_weight_edge(a: Observation, b: Observation) -> float: """Phase-bucketed command-sequence Jaccard. For each UKC phase observed on both sides, computes the Jaccard similarity of the command sets (multisets collapsed to sets — the *order* signal is reserved for a future feature, this commit is the scaffolding). Returns the **maximum** Jaccard across shared phases, so a single strong phase match isn't averaged away by a different phase where the actors diverge. Phase-bucketing matters: comparing a Discovery cmd-list to an Exploitation one is meaningless. Both actors had to be in the same phase for the comparison to count. Returns ``0.0`` when no phase is observed on both sides. """ shared_phases = set(a.commands_by_phase) & set(b.commands_by_phase) if not shared_phases: return 0.0 best = 0.0 for phase in shared_phases: sa = set(a.commands_by_phase[phase]) sb = set(b.commands_by_phase[phase]) if not sa and not sb: continue union = sa | sb if not union: continue j = len(sa & sb) / len(union) if j > best: best = j return best def low_weight_edge(a: Observation, b: Observation) -> float: """Credential-attempt-set Jaccard. Returns the Jaccard of ``(username, password)`` tuples. Two campaigns burning the same wordlist will score high here — fixture 1 proves this signal is dangerous in isolation. The connected-components impl combines this with other signals; alone it must not push a pair over threshold. Returns ``0.0`` when either side attempted no credentials, or when the union is empty. """ if not a.credentials or not b.credentials: return 0.0 union = a.credentials | b.credentials if not union: return 0.0 return len(a.credentials & b.credentials) / len(union) def very_low_weight_edge(a: Observation, b: Observation) -> float: """ASN equality. Returns ``1.0`` iff both observations have a non-null ASN and they match. Fixture 2 (``vpn_hopping``) proves ASN-only clustering is a failure mode — one identity legitimately rotates across many ASNs. The combination logic in the connected-components impl weights this so that ASN agreement alone never crosses threshold. """ if a.asn is None or b.asn is None: return 0.0 return 1.0 if a.asn == b.asn else 0.0 # ─── Combined weight ──────────────────────────────────────────────────────── #: Tier multipliers applied to the per-tier edge scores when combining #: into a single weight. Tuned so that: #: #: * High-tier agreement alone (1.0) crosses the 1.0 threshold. #: * Medium-tier alone (max 1.0) yields 0.6 — below threshold. #: * Low-tier alone (max 1.0) yields 0.2 — defeats fixture 1's #: credential-overlap-only failure mode. #: * Very-low alone (max 1.0) yields 0.05 — defeats fixture 2's #: ASN-rotation failure mode. #: #: The ratio between tiers matters more than the absolute values: a #: tier should never combine its way past threshold without help from #: a stronger one. TIER_WEIGHTS = { "high": 1.0, "medium": 0.6, "low": 0.2, "very_low": 0.05, } #: Threshold a combined edge weight must meet to survive into the #: similarity graph. The connected-components impl drops anything #: under this before running union-find. EDGE_THRESHOLD = 1.0 def combined_edge_weight(a: Observation, b: Observation) -> float: """Sum of all four tier scores, weighted by :data:`TIER_WEIGHTS`. Each per-tier function returns a score in ``[0, 1]``; the weighted sum lets stronger tiers dominate without letting weaker ones combine their way past threshold. The connected-components clusterer compares this against :data:`EDGE_THRESHOLD` to decide whether to draw an edge. Pure / time-agnostic — fixture 7 forbids recency-decay weighting. Commits 5–7 land each tier in the call site: * Commit 5 (this commit): high + medium. * Commit 6: + phase-handoff (a separate edge family, not a tier). * Commit 7: + low + very_low. Until commit 7 lands, the low / very_low contributions stay zero by virtue of the underlying functions returning ``0.0`` whenever their inputs are missing. The combination is forward-compatible. """ return ( TIER_WEIGHTS["high"] * high_weight_edge(a, b) + TIER_WEIGHTS["medium"] * medium_weight_edge(a, b) + TIER_WEIGHTS["low"] * low_weight_edge(a, b) + TIER_WEIGHTS["very_low"] * very_low_weight_edge(a, b) ) # ─── Adapter for the synthetic-corpus tests ───────────────────────────────── def from_synthetic(att) -> Observation: """Build an :class:`Observation` from a ``SyntheticAttacker``. Lives here so test code doesn't import the factory shape into the production module — the adapter is a documented integration point. Imported lazily by callers; the production worker uses a parallel adapter from :class:`Attacker` rows once that lands. """ payload_hashes: set[str] = set() c2_endpoints: set[str] = set() credentials: set[tuple[str, str]] = set() commands_by_phase: dict[str, list[str]] = {} for s in att.sessions: if s.payload_hash: payload_hashes.add(s.payload_hash) if s.c2_callback: c2_endpoints.add(s.c2_callback) for cred in s.credentials_tried: credentials.add(tuple(cred)) if s.commands: commands_by_phase.setdefault(s.phase.value, []).extend(s.commands) return Observation( observation_id=att.attacker_id, ja3=att.ja3, hassh=att.hassh, asn=att.asn, payload_hashes=frozenset(payload_hashes), c2_endpoints=frozenset(c2_endpoints), credentials=frozenset(credentials), commands_by_phase={k: tuple(v) for k, v in commands_by_phase.items()}, ) __all__ = [ "Observation", "high_weight_edge", "medium_weight_edge", "low_weight_edge", "very_low_weight_edge", "combined_edge_weight", "from_synthetic", "EDGE_THRESHOLD", "TIER_WEIGHTS", ]