diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py index ac5177fe..4f3ad14a 100644 --- a/decnet/ttp/worker.py +++ b/decnet/ttp/worker.py @@ -228,6 +228,51 @@ def _str_or_none(value: Any) -> str | None: return str(value) +async def _build_intel_catchup_event( + repo: "BaseRepository", + base: TaggerEvent, +) -> TaggerEvent | None: + """Synthesize an intel TaggerEvent from the persisted AttackerIntel row. + + Called on every ``attacker.session.ended`` so intel-derived tags emit + even when ``attacker.intel.enriched`` was dropped or arrived before the + TTP worker started. Per the no-SPOF contract (TTP_TAGGING.md lines + 212–219) we import ``AttackerIntel`` (a data shape) but never any + ``decnet.intel.*`` provider client. + + Returns ``None`` when no intel row exists for the attacker (the normal + case for a freshly-observed attacker) or when the lookup fails. + """ + if base.attacker_uuid is None: + return None + with _span( + "ttp.worker.intel_catchup", + attacker_uuid=base.attacker_uuid, + ): + try: + row = await repo.get_attacker_intel_row_by_uuid(base.attacker_uuid) + except Exception as exc: # noqa: BLE001 + log.warning( + "ttp worker: intel catch-up lookup failed for " + "attacker_uuid=%r: %s", + base.attacker_uuid, exc, + ) + return None + if row is None: + return None + payload = row.to_intel_event_payload() + source_id = f"intel-catchup:{base.session_id or base.attacker_uuid}" + return TaggerEvent( + source_kind="intel", + source_id=source_id, + attacker_uuid=base.attacker_uuid, + identity_uuid=base.identity_uuid, + session_id=base.session_id, + decky_id=base.decky_id, + payload=payload, + ) + + async def run_ttp_worker_loop( repo: BaseRepository, *, @@ -424,6 +469,15 @@ async def _process_event( tagger_events = _build_events(topic, payload) if not tagger_events: return + # Intel catch-up: on session.ended, read the persisted intel row (if + # any) and append an intel TaggerEvent so intel-derived tags emit even + # when attacker.intel.enriched was dropped or arrived before the worker + # started. Idempotent UUIDs deduplicate against any prior intel.enriched + # path. No-intel-row case is silent (freshly-observed attacker). + if "session.ended" in topic: + intel_event = await _build_intel_catchup_event(repo, tagger_events[0]) + if intel_event is not None: + tagger_events.append(intel_event) # Aggregate tags across the session-level event AND any per-command # fan-out so the bus publish sees a single ttp.tagged envelope per # upstream session. The repository's INSERT OR IGNORE keeps replay diff --git a/decnet/web/db/models/attacker_intel.py b/decnet/web/db/models/attacker_intel.py index 97f580ee..40210ae7 100644 --- a/decnet/web/db/models/attacker_intel.py +++ b/decnet/web/db/models/attacker_intel.py @@ -1,6 +1,7 @@ """Threat-intel enrichment row — one per attacker IP, TTL-cached.""" +import json as _json from datetime import datetime, timezone -from typing import Optional +from typing import Any, Optional from sqlalchemy import Column from sqlmodel import Field, SQLModel @@ -8,6 +9,18 @@ from sqlmodel import Field, SQLModel from ._base import _BIG_TEXT +def _decode_json_list(value: Any) -> list[Any]: + if isinstance(value, list): + return value + if isinstance(value, str) and value: + try: + decoded = _json.loads(value) + except (_json.JSONDecodeError, TypeError): + return [] + return decoded if isinstance(decoded, list) else [] + return [] + + class AttackerIntel(SQLModel, table=True): """Aggregated threat-intel verdict for a single attacker IP. @@ -129,3 +142,46 @@ class AttackerIntel(SQLModel, table=True): default_factory=lambda: datetime.now(timezone.utc), index=True ) expires_at: datetime = Field(index=True) + + def to_intel_event_payload( + self, + *, + providers: Optional[list[str]] = None, + ) -> dict[str, Any]: + """Project this row into the payload shape the IntelLifter consumes. + + Called by both the intel worker (on live publish of + ``attacker.intel.enriched``) and the TTP worker (on + ``attacker.session.ended`` catch-up). The two callers produce + identical payloads for the same row, so IntelLifter tag UUIDs + are deterministic regardless of which path delivered them. + + ``providers`` is included when the intel worker knows which + providers contributed; the TTP catch-up path omits it (the + IntelLifter does not predicate on ``providers``). + """ + d: dict[str, Any] = { + "attacker_uuid": self.attacker_uuid, + "attacker_ip": self.attacker_ip, + "aggregate_verdict": self.aggregate_verdict, + # AbuseIPDB + "abuseipdb_score": self.abuseipdb_score, + "abuseipdb_categories": _decode_json_list(self.abuseipdb_categories), + # GreyNoise + "greynoise_classification": self.greynoise_classification, + "greynoise_name": self.greynoise_name, + "greynoise_tags": _decode_json_list(self.greynoise_tags), + # Feodo + "feodo_listed": self.feodo_listed, + "feodo_malware_family": self.feodo_malware_family, + # ThreatFox + "threatfox_listed": self.threatfox_listed, + "threatfox_threat_types": _decode_json_list(self.threatfox_threat_types), + "threatfox_ioc_types": _decode_json_list(self.threatfox_ioc_types), + "threatfox_malware_families": _decode_json_list( + self.threatfox_malware_families + ), + } + if providers is not None: + d["providers"] = providers + return d diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 59fd3d4e..d2c40a24 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -3,6 +3,7 @@ from collections.abc import AsyncIterator from datetime import datetime from typing import Any, Optional +from decnet.web.db.models.attacker_intel import AttackerIntel from decnet.web.db.models.topology import DeckyRow, EdgeRow, LANRow, TopologySummary from decnet.web.db.models import ( CampaignTechniqueRow, @@ -452,6 +453,19 @@ class BaseRepository(ABC): """Return the threat-intel row for ``uuid`` or ``None`` if missing.""" pass + @abstractmethod + async def get_attacker_intel_row_by_uuid( + self, uuid: str, + ) -> Optional[AttackerIntel]: + """Return the live :class:`AttackerIntel` SQLModel instance for + ``uuid``, or ``None`` if no row exists. + + Prefer this over :meth:`get_attacker_intel_by_uuid` when the + caller needs to call :meth:`~AttackerIntel.to_intel_event_payload` + (e.g. the TTP worker's intel catch-up path on session.ended). + """ + pass + @abstractmethod async def get_unenriched_attackers( self, limit: int = 100, diff --git a/decnet/web/db/sqlmodel_repo/attacker_intel.py b/decnet/web/db/sqlmodel_repo/attacker_intel.py index 6bc47e69..2bedc03b 100644 --- a/decnet/web/db/sqlmodel_repo/attacker_intel.py +++ b/decnet/web/db/sqlmodel_repo/attacker_intel.py @@ -46,6 +46,16 @@ class AttackerIntelMixin(_MixinBase): await session.commit() return row_uuid + async def get_attacker_intel_row_by_uuid( + self, + uuid: str, + ) -> Optional[AttackerIntel]: + async with self._session() as session: + result = await session.execute( + select(AttackerIntel).where(AttackerIntel.attacker_uuid == uuid) + ) + return result.scalar_one_or_none() + async def get_attacker_intel_by_uuid( self, uuid: str, diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 5ff04cc7..746c157a 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -78,6 +78,7 @@ class DummyRepo(BaseRepository): # DEBT-041 / 3eb67c9 — attacker_intel re-key async def find_credential_reuse_candidates(self, min_targets=2): await super().find_credential_reuse_candidates(min_targets); return [] async def get_attacker_intel_by_uuid(self, u): await super().get_attacker_intel_by_uuid(u) + async def get_attacker_intel_row_by_uuid(self, u): await super().get_attacker_intel_row_by_uuid(u) async def get_unenriched_attackers(self, limit=100): await super().get_unenriched_attackers(limit) async def upsert_attacker_intel(self, d): await super().upsert_attacker_intel(d); return "" # Identity resolution (this PR) @@ -228,6 +229,7 @@ async def test_base_repo_coverage(): await dr.get_session_log("a") await dr.find_credential_reuse_candidates() await dr.get_attacker_intel_by_uuid("a") + await dr.get_attacker_intel_row_by_uuid("a") await dr.get_unenriched_attackers() await dr.upsert_attacker_intel({"attacker_uuid": "a", "attacker_ip": "1.1.1.1"}) await dr.get_identity_by_uuid("a")