diff --git a/decnet/clustering/impl/connected_components.py b/decnet/clustering/impl/connected_components.py index eb95fb63..03306d87 100644 --- a/decnet/clustering/impl/connected_components.py +++ b/decnet/clustering/impl/connected_components.py @@ -41,6 +41,7 @@ from decnet.clustering.impl.similarity import ( combined_edge_weight, ) from decnet.logging import get_logger +from decnet.profiler.identity_rollup import extract_fp_summaries from decnet.web.db.repository import BaseRepository log = get_logger("clustering.connected_components") @@ -217,6 +218,9 @@ class ConnectedComponentsClusterer(Clusterer): "identity_uuid": identity_uuid, "observation_uuids": linked, }) + await _roll_up_fingerprints( + repo, identity_uuid, [row_by_id[m] for m in member_ids], + ) continue # Deterministic winner so two clusterer runs produce the @@ -250,6 +254,14 @@ class ConnectedComponentsClusterer(Clusterer): "observation_uuid": obs_id, }) + # Re-roll the winner's fingerprint summary across every + # observation now in this component (including the loser + # side — the merge unifies their evidence even though the + # loser's identity row stays FK'd via merged_into_uuid). + await _roll_up_fingerprints( + repo, winner_uuid, [row_by_id[m] for m in member_ids], + ) + # Pass 2 — revocable-merge undo. For each currently-merged-out # identity, check whether its observations still cluster with # the winner's. If not, the merge is contradicted by new @@ -341,6 +353,25 @@ async def _link( return False +async def _roll_up_fingerprints( + repo: BaseRepository, + identity_uuid: str, + member_rows: list[dict[str, Any]], +) -> None: + """Project member observations' fingerprint blobs onto the identity's + summary columns. Best-effort: a write failure is logged but never + breaks the clusterer tick — the columns just stay stale until the + next pass.""" + summaries = extract_fp_summaries(member_rows) + try: + await repo.update_identity_fingerprints(identity_uuid, **summaries) + except Exception: # noqa: BLE001 + log.exception( + "clusterer: failed to roll up fingerprints for identity=%s", + identity_uuid, + ) + + __all__ = [ "ConnectedComponentsClusterer", "cluster_observations", diff --git a/decnet/profiler/identity_rollup.py b/decnet/profiler/identity_rollup.py new file mode 100644 index 00000000..41b14f27 --- /dev/null +++ b/decnet/profiler/identity_rollup.py @@ -0,0 +1,109 @@ +"""Identity-level fingerprint rollup. + +The clusterer mints :class:`AttackerIdentity` rows (and merges them) from +union-find over per-IP :class:`Attacker` observations. Each ``Attacker`` +row already carries a ``fingerprints`` JSON list — the output of the +profiler's ``_build_record`` flatten of every ``bounty_type='fingerprint'`` +bounty seen for that IP. This module distils that per-observation list +into the cross-observation summary columns on ``AttackerIdentity``: + +* ``ja3_hashes`` — TLS ClientHello fingerprints +* ``hassh_hashes`` — SSH KEX fingerprints +* ``tls_cert_sha256`` — leaf cert SHA-256s presented by attacker-run + TLS servers (active-prober capture) + +These are JSON-serialised ``list[str]`` columns shaped for federation +gossip — same wire format the campaign clusterer reads. The values are +deduplicated and sorted so two clusterer runs over the same input produce +byte-identical column writes. +""" + +from __future__ import annotations + +import json +from typing import Any, Iterable, Optional + + +# Bounty payload key per fingerprint family. Only fingerprints whose +# payload carries a stable scalar identifier roll up cleanly here — +# tcpfp / http_quirks / ja4l etc. don't fit the "list of hashes" shape +# and stay out of the rollup until they get their own columns. +_PAYLOAD_KEY_BY_FP_TYPE: dict[str, str] = { + "ja3": "ja3", + "hassh_server": "hash", + "tls_certificate": "cert_sha256", +} + +_COLUMN_BY_FP_TYPE: dict[str, str] = { + "ja3": "ja3_hashes", + "hassh_server": "hassh_hashes", + "tls_certificate": "tls_cert_sha256", +} + + +def _payload_of(entry: Any) -> dict[str, Any]: + """Return the payload dict from a fingerprint bounty entry.""" + if not isinstance(entry, dict): + return {} + p = entry.get("payload") + if isinstance(p, dict): + return p + if isinstance(p, str): + try: + parsed = json.loads(p) + except (TypeError, ValueError): + return {} + return parsed if isinstance(parsed, dict) else {} + # Some legacy callers may have flattened the payload onto the entry. + return entry + + +def _parse_fingerprints(raw: Any) -> list[dict[str, Any]]: + """Best-effort parse of an Attacker.fingerprints column value.""" + if raw is None: + return [] + if isinstance(raw, list): + return [e for e in raw if isinstance(e, dict)] + if isinstance(raw, str): + try: + decoded = json.loads(raw) + except (TypeError, ValueError): + return [] + return [e for e in decoded if isinstance(e, dict)] if isinstance(decoded, list) else [] + return [] + + +def extract_fp_summaries( + member_rows: Iterable[dict[str, Any]], +) -> dict[str, Optional[str]]: + """Aggregate fingerprint hashes across the given Attacker rows. + + Returns a dict with keys ``ja3_hashes``, ``hassh_hashes``, + ``tls_cert_sha256`` — each value is either a JSON-encoded + ``list[str]`` (deduped, sorted) or ``None`` when no signal is + present. ``None`` is preferred over ``"[]"`` so the column stays + NULL and downstream readers can distinguish "no data yet" from + "actively known to be empty". + + Pure: no DB, no clock, no I/O. The clusterer drives the call. + """ + buckets: dict[str, set[str]] = {col: set() for col in _COLUMN_BY_FP_TYPE.values()} + + for row in member_rows: + for entry in _parse_fingerprints(row.get("fingerprints")): + payload = _payload_of(entry) + fp_type = payload.get("fingerprint_type") + if not isinstance(fp_type, str): + continue + payload_key = _PAYLOAD_KEY_BY_FP_TYPE.get(fp_type) + column = _COLUMN_BY_FP_TYPE.get(fp_type) + if payload_key is None or column is None: + continue + value = payload.get(payload_key) + if isinstance(value, str) and value: + buckets[column].add(value) + + return { + column: (json.dumps(sorted(values)) if values else None) + for column, values in buckets.items() + } diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index bd0df366..aac20518 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -474,6 +474,26 @@ class BaseRepository(ABC): """ pass + @abstractmethod + async def update_identity_fingerprints( + self, + identity_uuid: str, + *, + ja3_hashes: Optional[str] = None, + hassh_hashes: Optional[str] = None, + tls_cert_sha256: Optional[str] = None, + ) -> None: + """Set the fingerprint summary columns on one ``AttackerIdentity``. + + Each argument is a JSON-encoded ``list[str]`` (the federation + wire shape) or ``None`` to leave the corresponding column at + ``NULL``. Always overwrites — the rollup writer is the source + of truth for these columns, computed deterministically from + the identity's member observations every clusterer tick. Also + bumps ``updated_at`` so cache subscribers can invalidate. + """ + pass + # ─── Campaign clustering reads ──────────────────────────────────────── # Layer above identity resolution: campaigns group identities into # operations. Populated by ``decnet campaign-clusterer``. The diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 5fc56d4f..b2b8aa05 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -1545,6 +1545,28 @@ class SQLModelRepository(BaseRepository): await session.execute(statement) await session.commit() + async def update_identity_fingerprints( + self, + identity_uuid: str, + *, + ja3_hashes: Optional[str] = None, + hassh_hashes: Optional[str] = None, + tls_cert_sha256: Optional[str] = None, + ) -> None: + statement = ( + update(AttackerIdentity) + .where(AttackerIdentity.uuid == identity_uuid) + .values( + ja3_hashes=ja3_hashes, + hassh_hashes=hassh_hashes, + tls_cert_sha256=tls_cert_sha256, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + # ─── Campaign clustering reads ──────────────────────────────────────── async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py index f298e896..96ba3a6b 100644 --- a/tests/clustering/test_connected_components.py +++ b/tests/clustering/test_connected_components.py @@ -140,14 +140,41 @@ async def repo(tmp_path): async def _seed_attacker( repo, ip: str, *, - ja3: str | None = None, hassh: str | None = None, asn: int | None = None, + ja3: str | None = None, + hassh: str | None = None, + asn: int | None = None, + cert_sha256: str | None = None, ) -> str: now = datetime.now(timezone.utc) - fingerprints = [] + # Two-shape fingerprint payload: + # - the "kind" entries feed the clusterer's from_attacker_row + # (test-fixture shape, line ~115 of connected_components.py) + # - the "bounty_type/payload" entries feed identity_rollup's + # extract_fp_summaries (production shape, written by the + # profiler from real bounty rows). Both shapes coexist in + # the same JSON list so the same seed exercises clustering + # AND the identity-column rollup. + fingerprints: list[dict] = [] if ja3: fingerprints.append({"kind": "ja3", "hash": ja3}) + fingerprints.append({ + "bounty_type": "fingerprint", + "payload": {"fingerprint_type": "ja3", "ja3": ja3}, + }) if hassh: fingerprints.append({"kind": "hassh", "hash": hassh}) + fingerprints.append({ + "bounty_type": "fingerprint", + "payload": {"fingerprint_type": "hassh_server", "hash": hassh}, + }) + if cert_sha256: + fingerprints.append({ + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "tls_certificate", + "cert_sha256": cert_sha256, + }, + }) return await repo.upsert_attacker({ "ip": ip, "first_seen": now, @@ -377,6 +404,70 @@ async def test_tick_links_new_observation_to_existing_identity(repo): assert d in linked_uuids +# ─── identity fingerprint rollup ─────────────────────────────────────────── + + +@pytest.mark.anyio +async def test_tick_rolls_up_fingerprint_columns_on_create(repo): + """A fresh-component tick must populate ja3_hashes / hassh_hashes / + tls_cert_sha256 on the newly-minted identity row, deduplicated and + sorted across all member observations.""" + await _seed_attacker( + repo, "1.1.1.1", ja3="ja3-x", hassh="hassh-y", cert_sha256="ab" * 32, + ) + await _seed_attacker( + repo, "2.2.2.2", ja3="ja3-x", hassh="hassh-y", cert_sha256="cd" * 32, + ) + c = ConnectedComponentsClusterer() + result = await c.tick(repo) + assert len(result.identities_formed) == 1 + identity_uuid = result.identities_formed[0]["identity_uuid"] + + rows = {i["uuid"]: i for i in await repo.list_all_identities()} + identity = rows[identity_uuid] + assert json.loads(identity["ja3_hashes"]) == ["ja3-x"] + assert json.loads(identity["hassh_hashes"]) == ["hassh-y"] + assert json.loads(identity["tls_cert_sha256"]) == sorted(["ab" * 32, "cd" * 32]) + + +@pytest.mark.anyio +async def test_tick_rolls_up_fingerprints_on_link(repo): + """When a new observation links into an existing identity, the + rollup must reflect any new cert SHA-256 it brings.""" + await _seed_attacker( + repo, "1.1.1.1", ja3="ja3-x", cert_sha256="ab" * 32, + ) + c = ConnectedComponentsClusterer() + first = await c.tick(repo) + identity_uuid = first.identities_formed[0]["identity_uuid"] + + # New observation, same JA3, fresh cert. + await _seed_attacker( + repo, "2.2.2.2", ja3="ja3-x", cert_sha256="cd" * 32, + ) + await c.tick(repo) + + rows = {i["uuid"]: i for i in await repo.list_all_identities()} + identity = rows[identity_uuid] + assert json.loads(identity["tls_cert_sha256"]) == sorted(["ab" * 32, "cd" * 32]) + + +@pytest.mark.anyio +async def test_tick_leaves_columns_null_when_no_fingerprints(repo): + """Two attackers with NO fingerprint signal cluster as separate + singletons; their identity rows must keep all rollup columns NULL + (not "[]" — NULL distinguishes 'no signal yet' from 'known empty').""" + await _seed_attacker(repo, "1.1.1.1") + await _seed_attacker(repo, "2.2.2.2") + c = ConnectedComponentsClusterer() + await c.tick(repo) + + for identity in await repo.list_all_identities(): + assert identity["ja3_hashes"] is None + assert identity["hassh_hashes"] is None + assert identity["tls_cert_sha256"] is None + + # ─── fixture-bound assertions (in-memory) ────────────────────────────────── diff --git a/tests/profiler/test_identity_rollup.py b/tests/profiler/test_identity_rollup.py new file mode 100644 index 00000000..0036f0fd --- /dev/null +++ b/tests/profiler/test_identity_rollup.py @@ -0,0 +1,141 @@ +"""Tests for ``decnet.profiler.identity_rollup.extract_fp_summaries``. + +Pure unit tests against the production bounty shape that +``decnet.profiler.worker._build_record`` writes into +``Attacker.fingerprints`` — a list of ``{bounty_type, payload, ...}`` +dicts where the meaningful data lives under ``payload.fingerprint_type``. +""" + +from __future__ import annotations + +import json + +from decnet.profiler.identity_rollup import extract_fp_summaries + + +def _bounty(fp_type: str, **payload_extras) -> dict: + """Build a bounty dict shaped the way the profiler writes it.""" + return { + "bounty_type": "fingerprint", + "payload": {"fingerprint_type": fp_type, **payload_extras}, + } + + +def _row_with(*entries) -> dict: + return {"fingerprints": json.dumps(list(entries))} + + +class TestExtractFpSummaries: + + def test_empty_input_returns_all_none(self): + result = extract_fp_summaries([]) + assert result == { + "ja3_hashes": None, + "hassh_hashes": None, + "tls_cert_sha256": None, + } + + def test_single_row_single_cert(self): + row = _row_with(_bounty("tls_certificate", cert_sha256="ab" * 32)) + result = extract_fp_summaries([row]) + assert result["ja3_hashes"] is None + assert result["hassh_hashes"] is None + assert json.loads(result["tls_cert_sha256"]) == ["ab" * 32] + + def test_dedupe_across_rows(self): + sha = "ab" * 32 + a = _row_with(_bounty("tls_certificate", cert_sha256=sha)) + b = _row_with(_bounty("tls_certificate", cert_sha256=sha)) + result = extract_fp_summaries([a, b]) + assert json.loads(result["tls_cert_sha256"]) == [sha] + + def test_sorted_output_is_deterministic(self): + a = _row_with( + _bounty("tls_certificate", cert_sha256="ff" * 32), + _bounty("tls_certificate", cert_sha256="11" * 32), + _bounty("tls_certificate", cert_sha256="aa" * 32), + ) + result = extract_fp_summaries([a]) + # Same input twice must produce byte-identical output. + assert result == extract_fp_summaries([a]) + assert json.loads(result["tls_cert_sha256"]) == sorted( + ["ff" * 32, "11" * 32, "aa" * 32] + ) + + def test_all_three_families_at_once(self): + row = _row_with( + _bounty("ja3", ja3="ja3-abc"), + _bounty("hassh_server", hash="hassh-def"), + _bounty("tls_certificate", cert_sha256="ab" * 32), + ) + result = extract_fp_summaries([row]) + assert json.loads(result["ja3_hashes"]) == ["ja3-abc"] + assert json.loads(result["hassh_hashes"]) == ["hassh-def"] + assert json.loads(result["tls_cert_sha256"]) == ["ab" * 32] + + def test_unknown_fingerprint_type_ignored(self): + # tcpfp / ja4l / http_quirks have no rollup column yet; they + # must not pollute the three families that do. + row = _row_with( + _bounty("tcpfp", hash="tcpfp-x"), + _bounty("ja4l", ja4l="ja4l-y"), + _bounty("http_quirks", quirks="..."), + ) + result = extract_fp_summaries([row]) + assert result["ja3_hashes"] is None + assert result["hassh_hashes"] is None + assert result["tls_cert_sha256"] is None + + def test_missing_payload_key_skipped(self): + # tls_certificate bounty shaped like a sniffer-only payload + # (no cert_sha256). Must not crash, must not record an entry. + row = _row_with({ + "bounty_type": "fingerprint", + "payload": {"fingerprint_type": "tls_certificate", "subject_cn": "x"}, + }) + result = extract_fp_summaries([row]) + assert result["tls_cert_sha256"] is None + + def test_malformed_fingerprints_json_returns_all_none(self): + result = extract_fp_summaries([{"fingerprints": "not json"}]) + assert all(v is None for v in result.values()) + + def test_missing_fingerprints_field_returns_all_none(self): + result = extract_fp_summaries([{"some_other_field": True}]) + assert all(v is None for v in result.values()) + + def test_payload_as_string_is_json_decoded(self): + # Defensive: some legacy storage may have nested-stringified payloads. + row = { + "fingerprints": json.dumps([{ + "bounty_type": "fingerprint", + "payload": json.dumps({ + "fingerprint_type": "tls_certificate", + "cert_sha256": "cd" * 32, + }), + }]), + } + result = extract_fp_summaries([row]) + assert json.loads(result["tls_cert_sha256"]) == ["cd" * 32] + + def test_non_string_hash_values_skipped(self): + row = _row_with({ + "bounty_type": "fingerprint", + "payload": {"fingerprint_type": "tls_certificate", "cert_sha256": 12345}, + }) + result = extract_fp_summaries([row]) + assert result["tls_cert_sha256"] is None + + def test_dedup_across_many_rows_with_overlap(self): + rows = [ + _row_with(_bounty("ja3", ja3="ja3-shared")), + _row_with( + _bounty("ja3", ja3="ja3-shared"), + _bounty("ja3", ja3="ja3-second"), + ), + _row_with(_bounty("ja3", ja3="ja3-third")), + ] + result = extract_fp_summaries(rows) + assert json.loads(result["ja3_hashes"]) == sorted( + ["ja3-shared", "ja3-second", "ja3-third"] + )