feat(prober-cert): roll up fingerprints onto AttackerIdentity

Brings the federation-gossip columns on AttackerIdentity to life —
ja3_hashes, hassh_hashes, and the new tls_cert_sha256 — by projecting
the union of every member observation's fingerprints JSON onto the
identity at clusterer create / link / merge time.

- decnet/profiler/identity_rollup.py: pure extract_fp_summaries()
  reads the production bounty shape (payload.fingerprint_type +
  payload.{ja3,hash,cert_sha256}) and returns deduped+sorted JSON
  list[str] per family, or None when a family has no signal so the
  column stays NULL instead of '[]'.
- BaseRepository.update_identity_fingerprints + SQLModel impl: one
  idempotent write that overwrites the three summary columns and
  bumps updated_at.
- ConnectedComponentsClusterer: after every per-component
  reconciliation (fresh-create OR existing-merge+link), recomputes
  and writes the rollup for the target identity. Wrapped in a
  best-effort helper so a write failure logs but never breaks the
  tick.
- Tests: extract_fp_summaries unit (dedup, sort determinism,
  unknown types ignored, malformed JSON, nested-stringified
  payloads, non-string values); end-to-end clusterer ticks
  populate the columns on create + on later observation links;
  no-fingerprint clusters keep the columns NULL.
This commit is contained in:
2026-04-28 11:28:54 -04:00
parent 9ab43b4ea4
commit 72cc928ebf
6 changed files with 416 additions and 2 deletions

View File

@@ -41,6 +41,7 @@ from decnet.clustering.impl.similarity import (
combined_edge_weight,
)
from decnet.logging import get_logger
from decnet.profiler.identity_rollup import extract_fp_summaries
from decnet.web.db.repository import BaseRepository
log = get_logger("clustering.connected_components")
@@ -217,6 +218,9 @@ class ConnectedComponentsClusterer(Clusterer):
"identity_uuid": identity_uuid,
"observation_uuids": linked,
})
await _roll_up_fingerprints(
repo, identity_uuid, [row_by_id[m] for m in member_ids],
)
continue
# Deterministic winner so two clusterer runs produce the
@@ -250,6 +254,14 @@ class ConnectedComponentsClusterer(Clusterer):
"observation_uuid": obs_id,
})
# Re-roll the winner's fingerprint summary across every
# observation now in this component (including the loser
# side — the merge unifies their evidence even though the
# loser's identity row stays FK'd via merged_into_uuid).
await _roll_up_fingerprints(
repo, winner_uuid, [row_by_id[m] for m in member_ids],
)
# Pass 2 — revocable-merge undo. For each currently-merged-out
# identity, check whether its observations still cluster with
# the winner's. If not, the merge is contradicted by new
@@ -341,6 +353,25 @@ async def _link(
return False
async def _roll_up_fingerprints(
repo: BaseRepository,
identity_uuid: str,
member_rows: list[dict[str, Any]],
) -> None:
"""Project member observations' fingerprint blobs onto the identity's
summary columns. Best-effort: a write failure is logged but never
breaks the clusterer tick — the columns just stay stale until the
next pass."""
summaries = extract_fp_summaries(member_rows)
try:
await repo.update_identity_fingerprints(identity_uuid, **summaries)
except Exception: # noqa: BLE001
log.exception(
"clusterer: failed to roll up fingerprints for identity=%s",
identity_uuid,
)
__all__ = [
"ConnectedComponentsClusterer",
"cluster_observations",

View File

@@ -0,0 +1,109 @@
"""Identity-level fingerprint rollup.
The clusterer mints :class:`AttackerIdentity` rows (and merges them) from
union-find over per-IP :class:`Attacker` observations. Each ``Attacker``
row already carries a ``fingerprints`` JSON list — the output of the
profiler's ``_build_record`` flatten of every ``bounty_type='fingerprint'``
bounty seen for that IP. This module distils that per-observation list
into the cross-observation summary columns on ``AttackerIdentity``:
* ``ja3_hashes`` — TLS ClientHello fingerprints
* ``hassh_hashes`` — SSH KEX fingerprints
* ``tls_cert_sha256`` — leaf cert SHA-256s presented by attacker-run
TLS servers (active-prober capture)
These are JSON-serialised ``list[str]`` columns shaped for federation
gossip — same wire format the campaign clusterer reads. The values are
deduplicated and sorted so two clusterer runs over the same input produce
byte-identical column writes.
"""
from __future__ import annotations
import json
from typing import Any, Iterable, Optional
# Bounty payload key per fingerprint family. Only fingerprints whose
# payload carries a stable scalar identifier roll up cleanly here —
# tcpfp / http_quirks / ja4l etc. don't fit the "list of hashes" shape
# and stay out of the rollup until they get their own columns.
_PAYLOAD_KEY_BY_FP_TYPE: dict[str, str] = {
"ja3": "ja3",
"hassh_server": "hash",
"tls_certificate": "cert_sha256",
}
_COLUMN_BY_FP_TYPE: dict[str, str] = {
"ja3": "ja3_hashes",
"hassh_server": "hassh_hashes",
"tls_certificate": "tls_cert_sha256",
}
def _payload_of(entry: Any) -> dict[str, Any]:
"""Return the payload dict from a fingerprint bounty entry."""
if not isinstance(entry, dict):
return {}
p = entry.get("payload")
if isinstance(p, dict):
return p
if isinstance(p, str):
try:
parsed = json.loads(p)
except (TypeError, ValueError):
return {}
return parsed if isinstance(parsed, dict) else {}
# Some legacy callers may have flattened the payload onto the entry.
return entry
def _parse_fingerprints(raw: Any) -> list[dict[str, Any]]:
"""Best-effort parse of an Attacker.fingerprints column value."""
if raw is None:
return []
if isinstance(raw, list):
return [e for e in raw if isinstance(e, dict)]
if isinstance(raw, str):
try:
decoded = json.loads(raw)
except (TypeError, ValueError):
return []
return [e for e in decoded if isinstance(e, dict)] if isinstance(decoded, list) else []
return []
def extract_fp_summaries(
member_rows: Iterable[dict[str, Any]],
) -> dict[str, Optional[str]]:
"""Aggregate fingerprint hashes across the given Attacker rows.
Returns a dict with keys ``ja3_hashes``, ``hassh_hashes``,
``tls_cert_sha256`` — each value is either a JSON-encoded
``list[str]`` (deduped, sorted) or ``None`` when no signal is
present. ``None`` is preferred over ``"[]"`` so the column stays
NULL and downstream readers can distinguish "no data yet" from
"actively known to be empty".
Pure: no DB, no clock, no I/O. The clusterer drives the call.
"""
buckets: dict[str, set[str]] = {col: set() for col in _COLUMN_BY_FP_TYPE.values()}
for row in member_rows:
for entry in _parse_fingerprints(row.get("fingerprints")):
payload = _payload_of(entry)
fp_type = payload.get("fingerprint_type")
if not isinstance(fp_type, str):
continue
payload_key = _PAYLOAD_KEY_BY_FP_TYPE.get(fp_type)
column = _COLUMN_BY_FP_TYPE.get(fp_type)
if payload_key is None or column is None:
continue
value = payload.get(payload_key)
if isinstance(value, str) and value:
buckets[column].add(value)
return {
column: (json.dumps(sorted(values)) if values else None)
for column, values in buckets.items()
}

View File

@@ -474,6 +474,26 @@ class BaseRepository(ABC):
"""
pass
@abstractmethod
async def update_identity_fingerprints(
self,
identity_uuid: str,
*,
ja3_hashes: Optional[str] = None,
hassh_hashes: Optional[str] = None,
tls_cert_sha256: Optional[str] = None,
) -> None:
"""Set the fingerprint summary columns on one ``AttackerIdentity``.
Each argument is a JSON-encoded ``list[str]`` (the federation
wire shape) or ``None`` to leave the corresponding column at
``NULL``. Always overwrites — the rollup writer is the source
of truth for these columns, computed deterministically from
the identity's member observations every clusterer tick. Also
bumps ``updated_at`` so cache subscribers can invalidate.
"""
pass
# ─── Campaign clustering reads ────────────────────────────────────────
# Layer above identity resolution: campaigns group identities into
# operations. Populated by ``decnet campaign-clusterer``. The

View File

@@ -1545,6 +1545,28 @@ class SQLModelRepository(BaseRepository):
await session.execute(statement)
await session.commit()
async def update_identity_fingerprints(
self,
identity_uuid: str,
*,
ja3_hashes: Optional[str] = None,
hassh_hashes: Optional[str] = None,
tls_cert_sha256: Optional[str] = None,
) -> None:
statement = (
update(AttackerIdentity)
.where(AttackerIdentity.uuid == identity_uuid)
.values(
ja3_hashes=ja3_hashes,
hassh_hashes=hassh_hashes,
tls_cert_sha256=tls_cert_sha256,
updated_at=datetime.now(timezone.utc),
)
)
async with self._session() as session:
await session.execute(statement)
await session.commit()
# ─── Campaign clustering reads ────────────────────────────────────────
async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: