From 3977f06374288bc5b6e2ae9a31647f310a337348 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 17 May 2026 20:41:55 -0400 Subject: [PATCH] feat(ttp/ipv6_leak): wire Ipv6LeakLifter into composite tagger and worker - Add "ipv6_leak" to KNOWN_SOURCE_KINDS in ttp/base.py - Register Ipv6LeakLifter(store) in factory.py get_tagger() - Subscribe worker to attacker.fingerprinted; route by Event.type so JARM/HASSH/ipv6_leak share the topic without source_kind collision - Add bump_attacker_ipv6_leak() to BaseRepository (abstract) + TTPMixin (implementation): increments ipv6_leak_count, sets last_ipv6_* denorm fields, appends-with-dedup to AttackerIdentity.ipv6_link_local_iids - Call bump_attacker_ipv6_leak from _process_event after insert_tags - Add DummyRepo stub + coverage call in tests/db/test_base_repo.py --- decnet/ttp/base.py | 1 + decnet/ttp/factory.py | 2 + decnet/ttp/worker.py | 61 +++++++++++++++++++++++++++--- decnet/web/db/repository.py | 17 +++++++++ decnet/web/db/sqlmodel_repo/ttp.py | 57 +++++++++++++++++++++++++++- tests/db/test_base_repo.py | 4 ++ 6 files changed, 135 insertions(+), 7 deletions(-) diff --git a/decnet/ttp/base.py b/decnet/ttp/base.py index 351e7aad..282684a5 100644 --- a/decnet/ttp/base.py +++ b/decnet/ttp/base.py @@ -41,6 +41,7 @@ KNOWN_SOURCE_KINDS: Final[frozenset[str]] = frozenset({ "session", "http_request", "http_fingerprint", + "ipv6_leak", }) diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index cbab0679..ed6390e3 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -165,6 +165,7 @@ def get_tagger() -> Tagger: from decnet.ttp.impl.http_fingerprint_lifter import HttpFingerprintLifter from decnet.ttp.impl.identity_lifter import IdentityLifter from decnet.ttp.impl.intel_lifter import IntelLifter + from decnet.ttp.impl.ipv6_leak_lifter import Ipv6LeakLifter from decnet.ttp.impl.rule_engine import RuleEngineTagger from decnet.ttp.store.factory import get_rule_store store = get_rule_store() @@ -182,6 +183,7 @@ def get_tagger() -> Tagger: IdentityLifter(store), CredentialLifter(store), HttpFingerprintLifter(store), + Ipv6LeakLifter(store), ]) raise ValueError( f"Unknown tagger: {name!r}. Known: {_KNOWN}" diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py index 4f3ad14a..62900548 100644 --- a/decnet/ttp/worker.py +++ b/decnet/ttp/worker.py @@ -60,6 +60,10 @@ _TOPICS: tuple[str, ...] = ( _topics.attacker(_topics.ATTACKER_SESSION_ENDED), _topics.attacker(_topics.ATTACKER_OBSERVED), _topics.attacker(_topics.ATTACKER_INTEL_ENRICHED), + # attacker.fingerprinted carries JARM/HASSH/tcpfp/ipv6_leak results from + # the prober and sniffer. Event.type discriminates the kind; lifters that + # don't recognise the source_kind derived from Event.type are no-ops. + _topics.attacker(_topics.ATTACKER_FINGERPRINTED), _topics.identity(_topics.IDENTITY_FORMED), _topics.identity(_topics.IDENTITY_MERGED), _topics.credential(_topics.CREDENTIAL_REUSE_DETECTED), @@ -113,7 +117,9 @@ def _span(name: str, **attrs: Any) -> Iterator[Any]: yield span -def _build_events(topic: str, payload: dict[str, Any]) -> list[TaggerEvent]: +def _build_events( + topic: str, payload: dict[str, Any], event_type: str = "", +) -> list[TaggerEvent]: """Translate one bus payload into one OR MORE :class:`TaggerEvent`s. A single ``attacker.session.ended`` event carries a *bag* of commands @@ -134,8 +140,12 @@ def _build_events(topic: str, payload: dict[str, Any]) -> list[TaggerEvent]: * ``commands: list[{"command_text": str, "id": str?, ...}]`` — dicts with at least a ``command_text`` field; any ``id`` / ``uuid`` / ``command_id`` becomes the ``source_id`` for idempotency. + + *event_type* is forwarded from ``Event.type``; used by multiplex + topics (``attacker.fingerprinted``) where the kind discriminator lives + in the envelope rather than the topic path. """ - base = _build_event(topic, payload) + base = _build_event(topic, payload, event_type=event_type) if base is None: return [] out = [base] @@ -183,7 +193,9 @@ def _build_command_event( ) -def _build_event(topic: str, payload: dict[str, Any]) -> TaggerEvent | None: +def _build_event( + topic: str, payload: dict[str, Any], event_type: str = "", +) -> TaggerEvent | None: """Translate one bus payload into a :class:`TaggerEvent`. Returns ``None`` if the topic isn't one we know how to dispatch @@ -197,10 +209,18 @@ def _build_event(topic: str, payload: dict[str, Any]) -> TaggerEvent | None: same :func:`compute_tag_uuid` and the ``INSERT OR IGNORE`` write becomes a no-op the second time around. The order below is the same priority list the lifters use internally. + + *event_type* is used as ``source_kind`` when ``_source_kind_for`` + has no static mapping for *topic* — this covers multiplex topics + such as ``attacker.fingerprinted`` where the kind discriminator is + carried in ``Event.type`` rather than the topic path itself. """ source_kind = _source_kind_for(topic) if source_kind is None: - return None + if event_type: + source_kind = event_type + else: + return None source_id = ( payload.get("source_id") or payload.get("session_id") @@ -466,7 +486,7 @@ async def _process_event( # requires at least one anchor, so emitting any tag would # raise. Drop the event with one log line per cold IP. return - tagger_events = _build_events(topic, payload) + tagger_events = _build_events(topic, payload, event_type=event.type) if not tagger_events: return # Intel catch-up: on session.ended, read the persisted intel row (if @@ -514,10 +534,41 @@ async def _process_event( # Idempotent re-eval — the loop-prevention invariant # forbids publishing here. return + await _bump_ipv6_leak_denorm(repo, all_tags) if bus is not None: await _publish_tagged(bus, all_tags) +async def _bump_ipv6_leak_denorm( + repo: BaseRepository, tags: list[TTPTag], +) -> None: + """Update Attacker / AttackerIdentity denorm columns for ipv6_leak tags. + + Called once per successful insert_tags batch. Takes the first tag + per attacker_uuid (all tags in a batch share the same attacker context). + Silently skips if the repo method is unavailable (pre-migration DBs). + """ + ipv6_tags = [t for t in tags if t.source_kind == "ipv6_leak"] + if not ipv6_tags: + return + seen: set[str] = set() + for tag in ipv6_tags: + if tag.attacker_uuid is None or tag.attacker_uuid in seen: + continue + seen.add(tag.attacker_uuid) + try: + await repo.bump_attacker_ipv6_leak( + attacker_uuid=tag.attacker_uuid, + identity_uuid=tag.identity_uuid, + evidence=tag.evidence or {}, + ) + except Exception: # noqa: BLE001 + log.warning( + "ttp worker: bump_attacker_ipv6_leak failed for " + "attacker_uuid=%r", tag.attacker_uuid, + ) + + async def _publish_tagged(bus: BaseBus, tags: list[TTPTag]) -> None: """Publish ``ttp.tagged`` + per-technique ``ttp.rule.fired.*``. diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index d2c40a24..fc8d7c04 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -1549,6 +1549,23 @@ class BaseRepository(ABC): """Fleet-wide distinct-technique rollup.""" raise NotImplementedError + @abstractmethod + async def bump_attacker_ipv6_leak( + self, + attacker_uuid: str, + identity_uuid: Optional[str], + evidence: dict[str, Any], + ) -> None: + """Increment ``Attacker.ipv6_leak_count``, set ``last_ipv6_*`` denorm + fields, and append-with-dedup to ``AttackerIdentity.ipv6_link_local_iids``. + + *evidence* is an ``Ipv6LinkLocalLeakEvidence``-shaped dict carrying + ``addr``, ``iid_kind``, ``mac_oui``, and ``observed_at``. Missing + keys default to empty string. The method is idempotent for the + count but deduplicates IID entries by ``addr``. + """ + raise NotImplementedError + @abstractmethod async def list_ttp_tags_by_attacker( self, uuid: str, limit: int = 2000, diff --git a/decnet/web/db/sqlmodel_repo/ttp.py b/decnet/web/db/sqlmodel_repo/ttp.py index 03d18a2e..e6b6aa35 100644 --- a/decnet/web/db/sqlmodel_repo/ttp.py +++ b/decnet/web/db/sqlmodel_repo/ttp.py @@ -14,8 +14,8 @@ from __future__ import annotations import json from collections.abc import AsyncIterator -from datetime import datetime -from typing import Any +from datetime import datetime, timezone +from typing import Any, Optional from sqlalchemy import func, select from sqlmodel import col @@ -453,6 +453,59 @@ class TTPMixin(_MixinBase): for row in res.scalars().all(): yield row + async def bump_attacker_ipv6_leak( + self, + attacker_uuid: str, + identity_uuid: Optional[str], + evidence: dict[str, Any], + ) -> None: + """Increment ``Attacker.ipv6_leak_count`` + set last_ipv6_* denorm fields. + + Also appends-with-dedup to ``AttackerIdentity.ipv6_link_local_iids`` + (JSON text column, keyed by ``addr``). Both updates run in a single + session; missing rows are silently skipped. + """ + now = datetime.now(timezone.utc) + addr = evidence.get("addr", "") + async with self._session() as session: + res = await session.execute( + select(Attacker).where(Attacker.uuid == attacker_uuid) + ) + attacker = res.scalar_one_or_none() + if attacker is not None: + attacker.ipv6_leak_count = (attacker.ipv6_leak_count or 0) + 1 + attacker.last_ipv6_leak_at = now + attacker.last_ipv6_link_local = addr or None + attacker.last_ipv6_iid_kind = evidence.get("iid_kind") or None + attacker.last_ipv6_mac_oui = evidence.get("mac_oui") or None + session.add(attacker) + + if identity_uuid: + id_res = await session.execute( + select(AttackerIdentity).where( + AttackerIdentity.uuid == identity_uuid + ) + ) + identity = id_res.scalar_one_or_none() + if identity is not None and addr: + try: + iids: list[dict[str, Any]] = json.loads( + identity.ipv6_link_local_iids or "[]" + ) + except (json.JSONDecodeError, TypeError): + iids = [] + if not any(e.get("iid") == addr for e in iids): + iids.append({ + "iid": addr, + "oui": evidence.get("mac_oui", ""), + "kind": evidence.get("iid_kind", "unknown"), + "first_seen": now.isoformat(), + }) + identity.ipv6_link_local_iids = json.dumps(iids) + session.add(identity) + + await session.commit() + async def list_distinct_techniques(self) -> list[TechniqueRollupRow]: """Fleet-wide distinct-technique rollup with counts + most-recent-seen timestamps. diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 746c157a..d3e2fbcd 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -136,6 +136,8 @@ class DummyRepo(BaseRepository): await super().list_tags_by_scope_and_technique(**kw); return [] async def list_distinct_techniques(self): await super().list_distinct_techniques(); return [] + async def bump_attacker_ipv6_leak(self, attacker_uuid, identity_uuid, evidence): + await super().bump_attacker_ipv6_leak(attacker_uuid, identity_uuid, evidence) async def list_ttp_tags_by_attacker(self, uuid, limit=2000): return [] async def list_attacker_commands_deduped(self, uuid): @@ -289,6 +291,8 @@ async def test_base_repo_coverage(): ) with pytest.raises(NotImplementedError): await dr.list_distinct_techniques() + with pytest.raises(NotImplementedError): + await dr.bump_attacker_ipv6_leak("uuid-1", None, {}) with pytest.raises(NotImplementedError): from decnet.web.db.repository import BaseRepository await BaseRepository.list_ttp_tags_by_attacker(dr, "a")