diff --git a/decnet/clustering/impl/connected_components.py b/decnet/clustering/impl/connected_components.py index 811f4343..3f511f1f 100644 --- a/decnet/clustering/impl/connected_components.py +++ b/decnet/clustering/impl/connected_components.py @@ -43,10 +43,15 @@ from decnet.clustering.impl.similarity import ( ) from decnet.logging import get_logger from decnet.profiler.identity_rollup import extract_fp_summaries +from decnet.util.simhash import from_bytes8, to_bytes8 from decnet.web.db.repository import BaseRepository log = get_logger("clustering.connected_components") +# Per-session SimHash observations of the keystroke-rhythm biometric; the +# rollup folds them into one identity-level centroid. +_DIGRAPH_PRIMITIVE = "motor.digraph_simhash" + def cluster_observations( observations: Iterable[Observation], @@ -354,6 +359,38 @@ async def _link( return False +async def _digraph_centroid( + repo: BaseRepository, identity_uuid: str, +) -> Optional[bytes]: + """Fold the identity's session-level ``motor.digraph_simhash`` + observations into one 8-byte bitwise-majority centroid. + + Bit *i* is set iff a majority of the identity's session SimHashes + have it set — denoises per-session jitter so the centroid is the + stable keystroke-rhythm fingerprint for Hamming comparison. Returns + ``None`` when the identity has no usable digraph observations yet. + """ + obs = await repo.observations_for_identity_primitive( + identity_uuid, _DIGRAPH_PRIMITIVE, + ) + hashes: list[int] = [] + for o in obs: + value = o.get("value") + if isinstance(value, str) and len(value) == 16: + try: + hashes.append(from_bytes8(bytes.fromhex(value))) + except ValueError: + continue + if not hashes: + return None + n = len(hashes) + centroid = 0 + for i in range(64): + if sum((h >> i) & 1 for h in hashes) * 2 > n: + centroid |= (1 << i) + return to_bytes8(centroid) + + async def _roll_up_fingerprints( repo: BaseRepository, identity_uuid: str, @@ -365,8 +402,11 @@ async def _roll_up_fingerprints( next pass.""" summaries = extract_fp_summaries(member_rows) fp_kwargs = {k: v for k, v in summaries.items() if k in {"ja3_hashes", "hassh_hashes", "tls_cert_sha256"}} + kd_centroid = await _digraph_centroid(repo, identity_uuid) try: - await repo.update_identity_fingerprints(identity_uuid, **fp_kwargs) + await repo.update_identity_fingerprints( + identity_uuid, kd_digraph_simhash=kd_centroid, **fp_kwargs, + ) except Exception: # noqa: BLE001 log.exception( "clusterer: failed to roll up fingerprints for identity=%s", diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 7325b1f4..56f89e84 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -683,15 +683,19 @@ class BaseRepository(ABC): ja3_hashes: Optional[str] = None, hassh_hashes: Optional[str] = None, tls_cert_sha256: Optional[str] = None, + kd_digraph_simhash: Optional[bytes] = None, ) -> None: """Set the fingerprint summary columns on one ``AttackerIdentity``. - Each argument is a JSON-encoded ``list[str]`` (the federation - wire shape) or ``None`` to leave the corresponding column at - ``NULL``. Always overwrites — the rollup writer is the source - of truth for these columns, computed deterministically from - the identity's member observations every clusterer tick. Also - bumps ``updated_at`` so cache subscribers can invalidate. + ``ja3_hashes`` / ``hassh_hashes`` / ``tls_cert_sha256`` are + JSON-encoded ``list[str]`` (the federation wire shape) or + ``None``. ``kd_digraph_simhash`` is the 8-byte keystroke-rhythm + centroid (bitwise majority over the identity's session-level + ``motor.digraph_simhash`` observations) or ``None``. Always + overwrites — the rollup writer is the source of truth for these + columns, recomputed deterministically from the identity's member + observations every clusterer tick. Also bumps ``updated_at`` so + cache subscribers can invalidate. """ pass diff --git a/decnet/web/db/sqlmodel_repo/campaigns.py b/decnet/web/db/sqlmodel_repo/campaigns.py index 78c04273..00f21bfe 100644 --- a/decnet/web/db/sqlmodel_repo/campaigns.py +++ b/decnet/web/db/sqlmodel_repo/campaigns.py @@ -105,6 +105,7 @@ class CampaignsMixin(_MixinBase): AttackerIdentity.hassh_hashes, AttackerIdentity.payload_simhashes, AttackerIdentity.c2_endpoints, + AttackerIdentity.kd_digraph_simhash, ).order_by(AttackerIdentity.created_at) if limit is not None: statement = statement.limit(limit) @@ -129,6 +130,7 @@ class CampaignsMixin(_MixinBase): "hassh_hashes": row.hassh_hashes, "payload_simhashes": row.payload_simhashes, "c2_endpoints": row.c2_endpoints, + "kd_digraph_simhash": row.kd_digraph_simhash, } for row in result.all() ] diff --git a/decnet/web/db/sqlmodel_repo/identities.py b/decnet/web/db/sqlmodel_repo/identities.py index 6b9ca639..321c469a 100644 --- a/decnet/web/db/sqlmodel_repo/identities.py +++ b/decnet/web/db/sqlmodel_repo/identities.py @@ -173,6 +173,7 @@ class IdentitiesMixin(_MixinBase): ja3_hashes: Optional[str] = None, hassh_hashes: Optional[str] = None, tls_cert_sha256: Optional[str] = None, + kd_digraph_simhash: Optional[bytes] = None, ) -> None: statement = ( update(AttackerIdentity) @@ -181,6 +182,7 @@ class IdentitiesMixin(_MixinBase): ja3_hashes=ja3_hashes, hassh_hashes=hassh_hashes, tls_cert_sha256=tls_cert_sha256, + kd_digraph_simhash=kd_digraph_simhash, updated_at=datetime.now(timezone.utc), ) ) diff --git a/tests/clustering/test_kd_digraph_centroid.py b/tests/clustering/test_kd_digraph_centroid.py new file mode 100644 index 00000000..d85e531c --- /dev/null +++ b/tests/clustering/test_kd_digraph_centroid.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""``_digraph_centroid`` — bitwise-majority rollup of session SimHashes.""" +from __future__ import annotations + +from decnet.clustering.impl.connected_components import _digraph_centroid +from decnet.util.simhash import from_bytes8, to_bytes8 + +_ALL_ONES = (1 << 64) - 1 + + +class _FakeRepo: + """Returns canned digraph observations for one identity.""" + + def __init__(self, hash_ints: list[int]) -> None: + self._values = [to_bytes8(h).hex() for h in hash_ints] + + async def observations_for_identity_primitive(self, identity_uuid, primitive): + assert primitive == "motor.digraph_simhash" + return [{"value": v} for v in self._values] + + +async def test_no_observations_returns_none() -> None: + assert await _digraph_centroid(_FakeRepo([]), "id") is None + + +async def test_single_session_centroid_is_that_hash() -> None: + out = await _digraph_centroid(_FakeRepo([0xDEADBEEFCAFEF00D]), "id") + assert from_bytes8(out) == 0xDEADBEEFCAFEF00D + + +async def test_majority_wins_per_bit() -> None: + # 2 of 3 sessions all-ones → every bit majority-set → all ones. + out = await _digraph_centroid(_FakeRepo([_ALL_ONES, _ALL_ONES, 0]), "id") + assert from_bytes8(out) == _ALL_ONES + + +async def test_tie_is_not_set() -> None: + # 1-1 tie per bit: majority requires strictly more than half → 0. + out = await _digraph_centroid(_FakeRepo([_ALL_ONES, 0]), "id") + assert from_bytes8(out) == 0 + + +async def test_garbage_values_skipped() -> None: + repo = _FakeRepo([]) + repo._values = ["not-hex-zz", "deadbeef", to_bytes8(_ALL_ONES).hex()] # only the last is valid + out = await _digraph_centroid(repo, "id") + assert from_bytes8(out) == _ALL_ONES diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 92825ca7..11f682b9 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -96,8 +96,8 @@ class DummyRepo(BaseRepository): async def set_attacker_identity_id(self, a, i): await super().set_attacker_identity_id(a, i) async def list_all_identities(self): await super().list_all_identities(); return [] async def update_identity_merged_into(self, u, w): await super().update_identity_merged_into(u, w) - async def update_identity_fingerprints(self, u, *, ja3_hashes=None, hassh_hashes=None, tls_cert_sha256=None): - await super().update_identity_fingerprints(u, ja3_hashes=ja3_hashes, hassh_hashes=hassh_hashes, tls_cert_sha256=tls_cert_sha256) + async def update_identity_fingerprints(self, u, *, ja3_hashes=None, hassh_hashes=None, tls_cert_sha256=None, kd_digraph_simhash=None): + await super().update_identity_fingerprints(u, ja3_hashes=ja3_hashes, hassh_hashes=hassh_hashes, tls_cert_sha256=tls_cert_sha256, kd_digraph_simhash=kd_digraph_simhash) # Campaign clustering (this PR) async def get_campaign_by_uuid(self, u): await super().get_campaign_by_uuid(u) async def list_campaigns(self, limit=50, offset=0): await super().list_campaigns(limit, offset); return [] @@ -260,7 +260,7 @@ async def test_base_repo_coverage(): await dr.list_all_identities() await dr.update_identity_merged_into("a", "b") await dr.update_identity_merged_into("a", None) - await dr.update_identity_fingerprints("a", ja3_hashes='["x"]', hassh_hashes=None, tls_cert_sha256='["y"]') + await dr.update_identity_fingerprints("a", ja3_hashes='["x"]', hassh_hashes=None, tls_cert_sha256='["y"]', kd_digraph_simhash=b"\x00" * 8) await dr.get_campaign_by_uuid("a") await dr.list_campaigns() await dr.count_campaigns() diff --git a/tests/db/test_kd_digraph_rollup.py b/tests/db/test_kd_digraph_rollup.py new file mode 100644 index 00000000..43034a91 --- /dev/null +++ b/tests/db/test_kd_digraph_rollup.py @@ -0,0 +1,45 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""kd_digraph_simhash round-trips through update_identity_fingerprints +and the campaign-clustering projection.""" +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "kd.db")) + await r.initialize() + return r + + +@pytest.mark.asyncio +async def test_fingerprint_write_and_clustering_read(repo): + now = datetime.now(timezone.utc) + await repo.create_attacker_identity({ + "uuid": "id-kd", "first_seen_at": now, "last_seen_at": now, + }) + raw = bytes.fromhex("deadbeefcafef00d") + await repo.update_identity_fingerprints("id-kd", kd_digraph_simhash=raw) + + rows = await repo.list_identities_for_clustering() + row = next(r for r in rows if r["uuid"] == "id-kd") + assert bytes(row["kd_digraph_simhash"]) == raw + + +@pytest.mark.asyncio +async def test_fingerprint_overwrite_to_none(repo): + now = datetime.now(timezone.utc) + await repo.create_attacker_identity({ + "uuid": "id-kd2", "first_seen_at": now, "last_seen_at": now, + }) + await repo.update_identity_fingerprints("id-kd2", kd_digraph_simhash=b"\x01" * 8) + # A later pass with no biometric clears it (full-overwrite contract). + await repo.update_identity_fingerprints("id-kd2", kd_digraph_simhash=None) + rows = await repo.list_identities_for_clustering() + row = next(r for r in rows if r["uuid"] == "id-kd2") + assert row["kd_digraph_simhash"] is None