From c4e29e3bf9b6efbd6a6d2e9e78906447458fed88 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 2 May 2026 02:44:30 -0400 Subject: [PATCH] fix(ttp): resolve attacker_uuid from attacker_ip on bus-event consume MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The collector's `attacker.session.ended` envelope carries `attacker_uuid: null` and `attacker_ip: ` because the collector doesn't talk to the DB. The TTP worker passed that null straight through, and `TTPTag.__init__` raised the documented invariant: ValueError: ttp_tag requires at least one of attacker_uuid / identity_uuid; both NULL is not a valid anchor. The worker now resolves `attacker_uuid` from `attacker_ip` via `BaseRepository.get_attacker_uuid_by_ip` before fanning out the event. When the IP isn't in the DB yet (profiler hasn't ingested the row), the event is dropped with one log line — better than exploding mid-tag. - New `get_attacker_uuid_by_ip(ip) -> str | None` on the repo (BaseRepository abstract + AttackersCoreMixin impl). - `_resolve_attacker_uuid` helper in `decnet/ttp/worker.py` runs before `_build_events`. Short-circuits when the payload already has either anchor; drops the event when neither anchor is resolvable. - Tests pin: short-circuit on existing uuid/identity, repo lookup, drop on unknown IP, drop on "Unknown" sentinel, drop on no-anchor payload, drop on repo failure. --- decnet/ttp/worker.py | 50 ++++++++- decnet/web/db/repository.py | 9 ++ .../web/db/sqlmodel_repo/attackers/_core.py | 14 +++ tests/ttp/test_worker_resolve_attacker.py | 102 ++++++++++++++++++ 4 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 tests/ttp/test_worker_resolve_attacker.py diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py index f797b8d5..886bf9df 100644 --- a/decnet/ttp/worker.py +++ b/decnet/ttp/worker.py @@ -344,6 +344,47 @@ async def run_ttp_worker_loop( await bus.close() +async def _resolve_attacker_uuid( + repo: BaseRepository, payload: dict[str, Any], +) -> dict[str, Any] | None: + """Inject ``attacker_uuid`` into *payload* via repo lookup if missing. + + Collector-side producers (notably ``attacker.session.ended`` from + the session aggregator) carry ``attacker_ip`` but cannot fill + ``attacker_uuid`` because the collector doesn't talk to the DB. + The TTP worker resolves it here so ``compute_tag_uuid`` and the + ``ttp_tag_has_anchor`` model invariant always have something to + work with. + + Returns the (possibly mutated) payload, or ``None`` if neither + ``attacker_uuid`` nor ``identity_uuid`` could be set — emitting a + tag with both NULL would raise inside :class:`TTPTag.__init__`. + """ + if payload.get("attacker_uuid") or payload.get("identity_uuid"): + return payload + ip = payload.get("attacker_ip") + if not isinstance(ip, str) or not ip or ip == "Unknown": + log.debug( + "ttp worker: dropping event with no anchor " + "(no attacker_uuid / identity_uuid / attacker_ip)", + ) + return None + try: + resolved = await repo.get_attacker_uuid_by_ip(ip) + except Exception: # noqa: BLE001 + log.exception( + "ttp worker: get_attacker_uuid_by_ip(%r) failed", ip, + ) + return None + if not resolved: + log.info( + "ttp worker: no Attacker row for ip=%r yet; " + "skipping until profiler catches up", ip, + ) + return None + return {**payload, "attacker_uuid": resolved} + + async def _process_event( topic: str, event: Event, @@ -358,7 +399,14 @@ async def _process_event( replay of the same upstream event hits the idempotent ``INSERT OR IGNORE`` and writes zero rows → publishes zero events. """ - tagger_events = _build_events(topic, event.payload) + payload = await _resolve_attacker_uuid(repo, event.payload) + if payload is None: + # Both attacker_uuid and identity_uuid are missing and we + # couldn't resolve from attacker_ip — the TTPTag invariant + # requires at least one anchor, so emitting any tag would + # raise. Drop the event with one log line per cold IP. + return + tagger_events = _build_events(topic, payload) if not tagger_events: return # Aggregate tags across the session-level event AND any per-command diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index f3186b91..b3b12822 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -382,6 +382,15 @@ class BaseRepository(ABC): """Retrieve a single attacker profile by UUID.""" pass + @abstractmethod + async def get_attacker_uuid_by_ip(self, ip: str) -> Optional[str]: + """Return the :class:`Attacker` UUID for *ip*, or ``None``. + + Used by the TTP worker to resolve ``attacker_uuid`` from the + ``attacker_ip`` carried by collector-side bus events. + """ + raise NotImplementedError + @abstractmethod async def get_attackers( self, diff --git a/decnet/web/db/sqlmodel_repo/attackers/_core.py b/decnet/web/db/sqlmodel_repo/attackers/_core.py index 22e1739d..b1e3bcc2 100644 --- a/decnet/web/db/sqlmodel_repo/attackers/_core.py +++ b/decnet/web/db/sqlmodel_repo/attackers/_core.py @@ -48,6 +48,20 @@ class AttackersCoreMixin(_MixinBase): await session.commit() return row_uuid + async def get_attacker_uuid_by_ip(self, ip: str) -> Optional[str]: + """Return the :class:`Attacker` UUID for *ip*, or ``None``. + + Used by the TTP worker to resolve ``attacker_uuid`` from the + ``attacker_ip`` carried by collector-side bus events + (``attacker.session.ended`` etc.). Cheaper than + :meth:`get_attacker_by_uuid` because it scalars a single column. + """ + async with self._session() as session: + result = await session.execute( + select(col(Attacker.uuid)).where(Attacker.ip == ip) + ) + return result.scalar_one_or_none() + async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: async with self._session() as session: result = await session.execute( diff --git a/tests/ttp/test_worker_resolve_attacker.py b/tests/ttp/test_worker_resolve_attacker.py new file mode 100644 index 00000000..cb86ffb4 --- /dev/null +++ b/tests/ttp/test_worker_resolve_attacker.py @@ -0,0 +1,102 @@ +"""TTP worker resolves ``attacker_uuid`` from ``attacker_ip`` per repo lookup. + +The collector publishes ``attacker.session.ended`` with +``attacker_uuid: null`` because it doesn't talk to the DB. +:class:`TTPTag` rejects rows whose ``attacker_uuid`` AND +``identity_uuid`` are both NULL — so the worker must resolve via +:meth:`BaseRepository.get_attacker_uuid_by_ip` before fanning the +event out, and drop the event entirely when no anchor can be set. +""" +from __future__ import annotations + +from typing import Any + +import pytest + +from decnet.ttp.worker import _resolve_attacker_uuid + + +class _FakeRepo: + def __init__(self, mapping: dict[str, str | None]) -> None: + self._mapping = mapping + self.calls: list[str] = [] + + async def get_attacker_uuid_by_ip(self, ip: str) -> str | None: + self.calls.append(ip) + return self._mapping.get(ip) + + +@pytest.mark.asyncio +async def test_payload_with_attacker_uuid_is_returned_unchanged() -> None: + repo = _FakeRepo({}) + payload = {"attacker_uuid": "att-1", "attacker_ip": "1.2.3.4"} + out = await _resolve_attacker_uuid(repo, payload) # type: ignore[arg-type] + assert out is payload # short-circuit, no DB lookup + assert repo.calls == [] + + +@pytest.mark.asyncio +async def test_payload_with_identity_uuid_is_returned_unchanged() -> None: + repo = _FakeRepo({}) + payload = {"identity_uuid": "id-1", "attacker_ip": "1.2.3.4"} + out = await _resolve_attacker_uuid(repo, payload) # type: ignore[arg-type] + assert out is payload + assert repo.calls == [] + + +@pytest.mark.asyncio +async def test_payload_resolves_uuid_via_attacker_ip() -> None: + repo = _FakeRepo({"192.168.1.5": "att-7"}) + payload: dict[str, Any] = { + "attacker_ip": "192.168.1.5", + "session_id": "sess-1", + "commands": [{"command_text": "whoami"}], + } + out = await _resolve_attacker_uuid(repo, payload) # type: ignore[arg-type] + assert out is not None + assert out["attacker_uuid"] == "att-7" + assert out["attacker_ip"] == "192.168.1.5" + # Other fields preserved. + assert out["session_id"] == "sess-1" + assert repo.calls == ["192.168.1.5"] + + +@pytest.mark.asyncio +async def test_payload_dropped_when_ip_unknown_to_repo() -> None: + """Profiler hasn't seen this IP yet → no Attacker row → drop.""" + repo = _FakeRepo({}) + payload = {"attacker_ip": "10.0.0.99"} + out = await _resolve_attacker_uuid(repo, payload) # type: ignore[arg-type] + assert out is None + + +@pytest.mark.asyncio +async def test_payload_dropped_when_no_anchor_fields_present() -> None: + repo = _FakeRepo({}) + payload: dict[str, Any] = {"foo": "bar"} + out = await _resolve_attacker_uuid(repo, payload) # type: ignore[arg-type] + assert out is None + assert repo.calls == [] + + +@pytest.mark.asyncio +async def test_payload_dropped_when_attacker_ip_is_unknown_sentinel() -> None: + repo = _FakeRepo({"Unknown": "should-not-resolve"}) + payload = {"attacker_ip": "Unknown"} + out = await _resolve_attacker_uuid(repo, payload) # type: ignore[arg-type] + assert out is None + # We must not even ask the repo about the literal "Unknown" sentinel. + assert repo.calls == [] + + +@pytest.mark.asyncio +async def test_payload_dropped_when_repo_lookup_raises() -> None: + class _RaisingRepo: + async def get_attacker_uuid_by_ip(self, _ip: str) -> str | None: + raise RuntimeError("db gone") + + out = await _resolve_attacker_uuid( + _RaisingRepo(), # type: ignore[arg-type] + {"attacker_ip": "1.2.3.4"}, + ) + assert out is None