From 301d3feee9712db3942c936e0d65ed37e537d824 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 2 May 2026 01:35:17 -0400 Subject: [PATCH] feat(ttp): E.4.a extract decnet/cli/ttp.py with worker run + backfill CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TTP worker entry moved out of decnet/cli/workers.py into its own module so the TTP CLI surface (worker + admin verbs) is colocated, mirroring decnet/cli/canary.py / webhook.py / swarm.py. - New `decnet/cli/ttp.py` with `decnet ttp` (worker, ExecStart-stable for decnet-ttp.service) and `decnet ttp-backfill --since-days N`. - `decnet ttp-backfill` walks Attacker.commands and CanaryTrigger history, dispatches each row through the live CompositeTagger, persists tags via repo.insert_tags (idempotent INSERT OR IGNORE). --dry-run / --source command|canary|all / --batch-size supported. - Backfill deliberately bypasses bus publish — historical replay must not re-trigger SIEM/webhook fan-out per TTP_TAGGING.md §"Bus topics" loop-prevention invariant. - Added `iter_attacker_commands_since` / `iter_canary_triggers_since` read-only iterators on TTPMixin + abstract bindings on BaseRepository. - Master-only via gating; both `ttp` and `ttp-backfill` listed in MASTER_ONLY_COMMANDS. --- decnet/cli/__init__.py | 3 +- decnet/cli/gating.py | 2 +- decnet/cli/ttp.py | 312 +++++++++++++++++++++++++++++ decnet/cli/workers.py | 59 +----- decnet/web/db/repository.py | 20 ++ decnet/web/db/sqlmodel_repo/ttp.py | 53 +++++ tests/ttp/test_backfill.py | 279 ++++++++++++++++++++++++++ 7 files changed, 673 insertions(+), 55 deletions(-) create mode 100644 decnet/cli/ttp.py create mode 100644 tests/ttp/test_backfill.py diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index f0508319..c63d2f58 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -39,6 +39,7 @@ from . import ( swarm, swarmctl, topology, + ttp, updater, web, webhook, @@ -59,7 +60,7 @@ for _mod in ( swarm, deploy, lifecycle, workers, inventory, web, profiler, orchestrator, realism, reconciler, sniffer, db, - topology, bus, geoip, init, webhook, canary, + topology, bus, geoip, init, webhook, canary, ttp, ): _mod.register(app) diff --git a/decnet/cli/gating.py b/decnet/cli/gating.py index a5db1925..0cc19533 100644 --- a/decnet/cli/gating.py +++ b/decnet/cli/gating.py @@ -30,7 +30,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ "mutate", "listener", "profiler", "services", "distros", "correlate", "archetypes", "web", "db-reset", "init", "webhook", "clusterer", "campaign-clusterer", - "ttp", + "ttp", "ttp-backfill", }) MASTER_ONLY_GROUPS: frozenset[str] = frozenset( {"swarm", "topology", "geoip", "realism"} diff --git a/decnet/cli/ttp.py b/decnet/cli/ttp.py new file mode 100644 index 00000000..45694b1d --- /dev/null +++ b/decnet/cli/ttp.py @@ -0,0 +1,312 @@ +"""``decnet ttp`` — TTP-tagging worker and admin commands. + +Two flat commands share this module: + +* ``decnet ttp`` — runs the long-running tagger worker. Bus-woken on + ``attacker.session.ended`` / ``attacker.observed`` / + ``attacker.intel.enriched`` / ``identity.{formed,merged}`` / + ``credential.reuse.detected`` / ``email.received`` / ``canary.>``; + dispatches each event through :class:`CompositeTagger` (RuleEngine + + Behavioral / Intel / CanaryFingerprint / Email / Identity / Credential + lifters), persists ``ttp_tag`` rows via the idempotent + ``INSERT OR IGNORE`` write, and publishes ``ttp.tagged`` + + ``ttp.rule.fired.`` only when the insert returned a + non-zero rowcount (loop-prevention invariant from TTP_TAGGING.md + §"Bus topics"). Invoked by the ``decnet-ttp.service`` systemd unit + so its argv must stay stable. + +* ``decnet ttp-backfill`` — replays historical events (shell commands + recorded on :class:`Attacker.commands`, :class:`CanaryTrigger` rows) + through the live tagger. Writes ``ttp_tag`` rows using the same + idempotent insert path. **Does not publish** to the bus — replay must + not re-trigger SIEM/webhook fan-out on already-attributed events. + +Both are master-only — gated via ``MASTER_ONLY_COMMANDS`` in +:mod:`decnet.cli.gating`. +""" +from __future__ import annotations + +import asyncio +import time +from datetime import datetime, timedelta, timezone +from typing import Any + +import typer + +from decnet.ttp.factory import CompositeTagger, get_tagger + +from . import utils as _utils +from .utils import console, log + + +_BACKFILL_SOURCES = ("command", "canary", "all") + + +def register(app: typer.Typer) -> None: + @app.command(name="ttp") + def ttp( + poll_interval_secs: float = typer.Option( + 60.0, "--poll-interval", "-i", + help="Slow-tick fallback when the bus is idle or unavailable (seconds)", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + ) -> None: + """TTP-tagging worker — MITRE ATT&CK technique tagging.""" + from decnet.cli.gating import _require_master_mode + from decnet.ttp.worker import run_ttp_worker_loop + from decnet.web.dependencies import repo + + _require_master_mode("ttp") + + if daemon: + log.info("ttp daemonizing poll=%s", poll_interval_secs) + _utils._daemonize() + + log.info("ttp command invoked poll=%s", poll_interval_secs) + console.print( + f"[bold cyan]TTP tagging worker starting[/] " + f"poll={poll_interval_secs}s" + ) + console.print("[dim]Press Ctrl+C to stop[/]") + + async def _run() -> None: + await repo.initialize() + await run_ttp_worker_loop( + repo, poll_interval_secs=poll_interval_secs, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]TTP tagging worker stopped.[/]") + + @app.command(name="ttp-backfill") + def ttp_backfill( + since_days: int = typer.Option( + 7, "--since-days", "-s", + min=1, max=3650, + help="Replay events whose source row is newer than N days ago.", + ), + source: str = typer.Option( + "all", "--source", + help=f"Source slice to replay. One of: {', '.join(_BACKFILL_SOURCES)}.", + ), + dry_run: bool = typer.Option( + False, "--dry-run", + help="Run the tagger but skip insert_tags. Reports counts only.", + ), + batch_size: int = typer.Option( + 500, "--batch-size", + min=1, max=100_000, + help="Number of tags accumulated before each repo.insert_tags call.", + ), + ) -> None: + """Replay historical attacker activity through the live tagger. + + Walks ``Attacker.commands`` (per-IP shell-command history) and + ``CanaryTrigger`` (canary callback log) since N days ago, + builds the same :class:`TaggerEvent` shape the live worker + emits, and persists tags via the idempotent INSERT OR IGNORE + write. Re-running is safe — a second pass over identical + source rows reports ``inserted=0``. + + Bus publish is intentionally suppressed; SIEM / webhook fan-out + sees only live events, never replays. + """ + from decnet.cli.gating import _require_master_mode + from decnet.web.dependencies import repo + + _require_master_mode("ttp-backfill") + + if source not in _BACKFILL_SOURCES: + console.print( + f"[red]invalid --source {source!r}; expected one of " + f"{_BACKFILL_SOURCES}[/]" + ) + raise typer.Exit(code=2) + + cutoff = datetime.now(tz=timezone.utc) - timedelta(days=since_days) + console.print( + f"[bold cyan]TTP backfill[/] since={cutoff.isoformat()} " + f"source={source} dry_run={dry_run} batch_size={batch_size}" + ) + + async def _run() -> None: + await repo.initialize() + await _backfill( + repo, + cutoff=cutoff, + sources=_resolve_sources(source), + dry_run=dry_run, + batch_size=batch_size, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Backfill interrupted.[/]") + + +def _resolve_sources(name: str) -> tuple[str, ...]: + if name == "all": + return ("command", "canary") + return (name,) + + +async def _backfill( + repo: Any, + *, + cutoff: datetime, + sources: tuple[str, ...], + dry_run: bool, + batch_size: int, +) -> None: + """Drive the per-source backfill loops and report structured counts. + + One :class:`CompositeTagger` is built once and reused for every + source — the per-lifter watch fan-out the live worker performs is + inlined here as a `watch_store()` startup task per + :class:`WatchableTagger`, so the dispatch indexes hydrate before + we start feeding events. + """ + # Import-time bound so tests can monkeypatch ``decnet.cli.ttp.get_tagger`` + # to inject a recording fake without touching the global factory. + tagger = get_tagger() + watch_tasks: list[asyncio.Task[None]] = [] + if isinstance(tagger, CompositeTagger): + for watchable in tagger.iter_watchables(): + watch_tasks.append(asyncio.create_task(watchable.watch_store())) + # Yield once so each watch_store gets a chance to run its + # initial `load_compiled` before we feed the first event. + await asyncio.sleep(0.05) + + try: + if "command" in sources: + await _backfill_commands( + repo, tagger, cutoff=cutoff, + dry_run=dry_run, batch_size=batch_size, + ) + if "canary" in sources: + await _backfill_canaries( + repo, tagger, cutoff=cutoff, + dry_run=dry_run, batch_size=batch_size, + ) + finally: + for task in watch_tasks: + task.cancel() + for task in watch_tasks: + try: + await task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass + + +async def _backfill_commands( + repo: Any, + tagger: Any, + *, + cutoff: datetime, + dry_run: bool, + batch_size: int, +) -> None: + from decnet.ttp.base import TaggerEvent + + started = time.monotonic() + rows_seen = 0 + cmds_seen = 0 + inserted = 0 + pending: list[Any] = [] + + async for attacker, commands in repo.iter_attacker_commands_since(cutoff): + rows_seen += 1 + for idx, cmd in enumerate(commands): + cmds_seen += 1 + text = cmd.get("command_text") or cmd.get("text") + if not isinstance(text, str): + continue + cmd_id = ( + cmd.get("id") + or cmd.get("uuid") + or cmd.get("command_id") + or f"{attacker.uuid}#cmd{idx}" + ) + event = TaggerEvent( + source_kind="command", + source_id=str(cmd_id), + attacker_uuid=attacker.uuid, + identity_uuid=getattr(attacker, "identity_id", None), + session_id=cmd.get("session_id"), + decky_id=cmd.get("decky_id") or cmd.get("decky"), + payload={**cmd, "command_text": text}, + ) + tags = await tagger.tag(event) + if tags: + pending.extend(tags) + if len(pending) >= batch_size: + inserted += await _flush(repo, pending, dry_run) + pending = [] + if pending: + inserted += await _flush(repo, pending, dry_run) + elapsed = time.monotonic() - started + console.print( + f"source=command rows={rows_seen} commands={cmds_seen} " + f"inserted={inserted} dry_run={dry_run} elapsed_s={elapsed:.2f}" + ) + + +async def _backfill_canaries( + repo: Any, + tagger: Any, + *, + cutoff: datetime, + dry_run: bool, + batch_size: int, +) -> None: + from decnet.ttp.base import TaggerEvent + + started = time.monotonic() + rows_seen = 0 + inserted = 0 + pending: list[Any] = [] + + async for trigger in repo.iter_canary_triggers_since(cutoff): + rows_seen += 1 + event = TaggerEvent( + source_kind="canary_fingerprint", + source_id=trigger.uuid, + attacker_uuid=trigger.attacker_id, + identity_uuid=None, + session_id=None, + decky_id=None, + payload={ + "token_uuid": trigger.token_uuid, + "src_ip": trigger.src_ip, + "ua_signature": trigger.user_agent or "", + "user_agent": trigger.user_agent, + "request_path": trigger.request_path, + "dns_qname": trigger.dns_qname, + "headers": trigger.headers(), + }, + ) + tags = await tagger.tag(event) + if tags: + pending.extend(tags) + if len(pending) >= batch_size: + inserted += await _flush(repo, pending, dry_run) + pending = [] + if pending: + inserted += await _flush(repo, pending, dry_run) + elapsed = time.monotonic() - started + console.print( + f"source=canary rows={rows_seen} inserted={inserted} " + f"dry_run={dry_run} elapsed_s={elapsed:.2f}" + ) + + +async def _flush(repo: Any, tags: list[Any], dry_run: bool) -> int: + if dry_run: + return 0 + return int(await repo.insert_tags(tags)) diff --git a/decnet/cli/workers.py b/decnet/cli/workers.py index c126e3dc..fe5c268f 100644 --- a/decnet/cli/workers.py +++ b/decnet/cli/workers.py @@ -296,56 +296,9 @@ def register(app: typer.Typer) -> None: except KeyboardInterrupt: console.print("\n[yellow]Campaign clusterer stopped.[/]") - @app.command(name="ttp") - def ttp( - poll_interval_secs: float = typer.Option( - 60.0, "--poll-interval", "-i", - help="Slow-tick fallback when the bus is idle or unavailable (seconds)", - ), - daemon: bool = typer.Option( - False, "--daemon", "-d", - help="Detach to background as a daemon process", - ), - ) -> None: - """TTP-tagging worker — MITRE ATT&CK technique tagging. - - Bus-woken on ``attacker.session.ended`` / ``attacker.observed`` - / ``attacker.intel.enriched`` / ``identity.formed`` / - ``identity.merged`` / ``credential.reuse.detected`` / - ``email.received`` / ``canary.>``. Dispatches each event - through the :class:`CompositeTagger` (RuleEngine + - Behavioral / Intel / Email / CanaryFingerprint / Identity / - Credential lifters), persists ``ttp_tag`` rows via the - idempotent ``INSERT OR IGNORE`` write, and publishes - ``ttp.tagged`` + per-technique ``ttp.rule.fired.*`` only when - the insert returned a non-zero rowcount (loop-prevention - invariant from TTP_TAGGING.md §"Bus topics"). - """ - import asyncio - from decnet.cli.gating import _require_master_mode - from decnet.ttp.worker import run_ttp_worker_loop - from decnet.web.dependencies import repo - - _require_master_mode("ttp") - - if daemon: - log.info("ttp daemonizing poll=%s", poll_interval_secs) - _utils._daemonize() - - log.info("ttp command invoked poll=%s", poll_interval_secs) - console.print( - f"[bold cyan]TTP tagging worker starting[/] " - f"poll={poll_interval_secs}s" - ) - console.print("[dim]Press Ctrl+C to stop[/]") - - async def _run() -> None: - await repo.initialize() - await run_ttp_worker_loop( - repo, poll_interval_secs=poll_interval_secs, - ) - - try: - asyncio.run(_run()) - except KeyboardInterrupt: - console.print("\n[yellow]TTP tagging worker stopped.[/]") + # ``decnet ttp`` and ``decnet ttp-backfill`` moved to + # :mod:`decnet.cli.ttp` — the TTP CLI surface (worker + admin verbs) + # is colocated there, mirroring the per-feature CLI split used by + # :mod:`decnet.cli.canary`, :mod:`decnet.cli.webhook`, etc. The + # ``decnet-ttp.service`` systemd unit's ExecStart still resolves to + # ``decnet ttp`` because the command name is unchanged. diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 59aa751b..f3186b91 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -1,4 +1,6 @@ from abc import ABC, abstractmethod +from collections.abc import AsyncIterator +from datetime import datetime from typing import Any, Optional from decnet.web.db.models.topology import DeckyRow, EdgeRow, LANRow, TopologySummary @@ -1320,6 +1322,24 @@ class BaseRepository(ABC): """ raise NotImplementedError + @abstractmethod + def iter_attacker_commands_since( + self, since: "datetime", + ) -> "AsyncIterator[tuple[Any, list[dict[str, Any]]]]": + """Yield (Attacker, decoded_commands) pairs since *since*. + + Used by ``decnet ttp backfill`` (E.4) to replay shell-command + history through the live tagger. Read-only. + """ + raise NotImplementedError + + @abstractmethod + def iter_canary_triggers_since( + self, since: "datetime", + ) -> "AsyncIterator[Any]": + """Yield ``CanaryTrigger`` rows since *since*. Used by backfill.""" + raise NotImplementedError + @abstractmethod async def list_techniques_by_identity( self, uuid: str, diff --git a/decnet/web/db/sqlmodel_repo/ttp.py b/decnet/web/db/sqlmodel_repo/ttp.py index db03962f..47950ce7 100644 --- a/decnet/web/db/sqlmodel_repo/ttp.py +++ b/decnet/web/db/sqlmodel_repo/ttp.py @@ -12,6 +12,9 @@ per-dialect ``SQLiteRepository`` / ``MySQLRepository`` subclasses """ from __future__ import annotations +import json +from collections.abc import AsyncIterator +from datetime import datetime from typing import Any from sqlalchemy import func, select @@ -25,6 +28,7 @@ from decnet.web.db.models import ( TechniqueRollupRow, TTPTag, ) +from decnet.web.db.models.canary import CanaryTrigger from decnet.web.db.sqlmodel_repo._helpers import _MixinBase @@ -275,6 +279,55 @@ class TTPMixin(_MixinBase): for r in res.all() ] + # ── Backfill iterators (E.4) ──────────────────────────────────── + # + # Read-only iterators consumed by ``decnet ttp backfill`` to replay + # historical events through the live :class:`CompositeTagger`. The + # CLI builds :class:`TaggerEvent` objects from these and persists + # results via :meth:`insert_tags` — same idempotent path the bus + # worker uses, no bus publish. + # + # Per TTP_TAGGING.md §"Order of work" / §"Bus topics" the historical + # replay deliberately bypasses bus publish so SIEM/webhook fan-out + # does not re-fire on already-attributed events. + + async def iter_attacker_commands_since( + self, since: datetime, + ) -> AsyncIterator[tuple[Attacker, list[dict[str, Any]]]]: + """Yield ``(Attacker, decoded_commands)`` pairs since *since*. + + Walks every :class:`Attacker` whose ``last_seen >= since`` and + decodes the JSON ``commands`` blob; non-list / malformed + payloads are skipped silently (the JSON column is best-effort + per the model docstring). + """ + async with self._session() as session: + stmt: Any = ( + select(Attacker).where(col(Attacker.last_seen) >= since) + ) + res = await session.execute(stmt) + for row in res.scalars().all(): + try: + decoded = json.loads(row.commands or "[]") + except (ValueError, TypeError): + continue + if not isinstance(decoded, list): + continue + yield row, [c for c in decoded if isinstance(c, dict)] + + async def iter_canary_triggers_since( + self, since: datetime, + ) -> AsyncIterator[CanaryTrigger]: + """Yield :class:`CanaryTrigger` rows fired since *since*.""" + async with self._session() as session: + stmt: Any = ( + select(CanaryTrigger) + .where(col(CanaryTrigger.occurred_at) >= since) + ) + res = await session.execute(stmt) + for row in res.scalars().all(): + yield row + async def list_distinct_techniques(self) -> list[TechniqueRollupRow]: """Fleet-wide distinct-technique rollup with counts + most-recent-seen timestamps. diff --git a/tests/ttp/test_backfill.py b/tests/ttp/test_backfill.py new file mode 100644 index 00000000..2162a467 --- /dev/null +++ b/tests/ttp/test_backfill.py @@ -0,0 +1,279 @@ +"""E.4.a — TTP backfill CLI replays history through the live tagger. + +Pins the contract from ``development/TTP_TAGGING.md`` §"E.4 Out-of-band +tasks": ``decnet ttp-backfill --since-days N`` walks +:class:`Attacker.commands` and :class:`CanaryTrigger` history, +dispatches each row through :class:`CompositeTagger`, persists tags via +``insert_tags`` (idempotent) and **does NOT publish** to the bus — +historical replay must not re-trigger SIEM/webhook fan-out on +already-attributed events. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from decnet.cli.ttp import ( + _BACKFILL_SOURCES, + _backfill, + _resolve_sources, +) +from decnet.ttp.base import Tagger, TaggerEvent +from decnet.web.db.models.ttp import TTPTag + + +# ── Test doubles ──────────────────────────────────────────────────── + + +class _RecordingTagger(Tagger): + """Records every TaggerEvent and returns one TTPTag per call. + + The composite is bypassed entirely — the backfill driver is + correct iff it emits the right TaggerEvent shape per source row. + """ + + name = "recording" + HANDLES = frozenset({"command", "canary_fingerprint"}) + + def __init__(self) -> None: + self.events: list[TaggerEvent] = [] + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + self.events.append(event) + return [TTPTag( + uuid=f"tag-{event.source_kind}-{event.source_id}", + source_kind=event.source_kind, + source_id=event.source_id, + attacker_uuid=event.attacker_uuid, + identity_uuid=event.identity_uuid, + session_id=event.session_id, + decky_id=event.decky_id, + tactic="TA0002", + technique_id="T1059", + sub_technique_id=None, + confidence=0.9, + rule_id="R0001", + rule_version=1, + evidence={}, + attack_release="v15.1", + created_at=datetime.now(tz=timezone.utc), + )] + + +class _FakeRepo: + def __init__( + self, + attackers_with_commands: list[tuple[Any, list[dict[str, Any]]]], + canary_triggers: list[Any], + ) -> None: + self._attackers = attackers_with_commands + self._triggers = canary_triggers + self.insert_calls: int = 0 + self._seen: set[str] = set() + + async def iter_attacker_commands_since(self, since: datetime): # noqa: ANN201 + for pair in self._attackers: + yield pair + + async def iter_canary_triggers_since(self, since: datetime): # noqa: ANN201 + for t in self._triggers: + yield t + + async def insert_tags(self, rows: list[TTPTag]) -> int: + self.insert_calls += 1 + new = [r for r in rows if r.uuid not in self._seen] + for r in new: + self._seen.add(r.uuid) + return len(new) + + +def _make_attacker(uuid: str = "att-1", identity_id: str | None = "id-1") -> Any: + a = MagicMock() + a.uuid = uuid + a.identity_id = identity_id + return a + + +def _make_trigger(uuid: str, src_ip: str = "1.2.3.4") -> Any: + t = MagicMock() + t.uuid = uuid + t.token_uuid = "tok-1" + t.src_ip = src_ip + t.user_agent = "curl/7.88.1" + t.request_path = "/x" + t.dns_qname = None + t.attacker_id = "att-1" + t.headers = lambda: {"x-forwarded-for": "9.9.9.9"} + return t + + +# ── Surface ───────────────────────────────────────────────────────── + + +def test_backfill_sources_constant() -> None: + assert _BACKFILL_SOURCES == ("command", "canary", "all") + + +def test_resolve_sources_all_expands() -> None: + assert _resolve_sources("all") == ("command", "canary") + assert _resolve_sources("command") == ("command",) + assert _resolve_sources("canary") == ("canary",) + + +# ── Driver behaviour ──────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_backfill_command_source_emits_one_event_per_command( + monkeypatch: pytest.MonkeyPatch, +) -> None: + tagger = _RecordingTagger() + monkeypatch.setattr( + "decnet.cli.ttp.get_tagger", lambda: tagger, + ) + attacker = _make_attacker() + repo = _FakeRepo( + attackers_with_commands=[(attacker, [ + {"id": "cmd-a", "command_text": "whoami"}, + {"id": "cmd-b", "command_text": "id"}, + {"id": "cmd-c", "command_text": "uname -a"}, + ])], + canary_triggers=[], + ) + await _backfill( + repo, + cutoff=datetime(2026, 1, 1, tzinfo=timezone.utc), + sources=("command",), + dry_run=False, + batch_size=10, + ) + kinds = [e.source_kind for e in tagger.events] + assert kinds == ["command", "command", "command"] + assert [e.source_id for e in tagger.events] == ["cmd-a", "cmd-b", "cmd-c"] + assert [e.payload["command_text"] for e in tagger.events] == [ + "whoami", "id", "uname -a", + ] + assert repo.insert_calls == 1 + + +@pytest.mark.asyncio +async def test_backfill_is_idempotent_on_replay( + monkeypatch: pytest.MonkeyPatch, +) -> None: + tagger = _RecordingTagger() + monkeypatch.setattr("decnet.cli.ttp.get_tagger", lambda: tagger) + attacker = _make_attacker() + repo = _FakeRepo( + attackers_with_commands=[(attacker, [ + {"id": "cmd-a", "command_text": "whoami"}, + ])], + canary_triggers=[], + ) + await _backfill(repo, cutoff=datetime(2026, 1, 1, tzinfo=timezone.utc), + sources=("command",), dry_run=False, batch_size=10) + # Run twice — second pass writes zero rows because INSERT OR IGNORE + # collapses on the deterministic compute_tag_uuid PK. + await _backfill(repo, cutoff=datetime(2026, 1, 1, tzinfo=timezone.utc), + sources=("command",), dry_run=False, batch_size=10) + # Same set of UUIDs across both passes; second pass yields 0. + assert len(repo._seen) == 1 + + +@pytest.mark.asyncio +async def test_backfill_dry_run_skips_insert_tags( + monkeypatch: pytest.MonkeyPatch, +) -> None: + tagger = _RecordingTagger() + monkeypatch.setattr("decnet.cli.ttp.get_tagger", lambda: tagger) + attacker = _make_attacker() + repo = _FakeRepo( + attackers_with_commands=[(attacker, [ + {"id": "cmd-a", "command_text": "whoami"}, + ])], + canary_triggers=[], + ) + await _backfill( + repo, cutoff=datetime(2026, 1, 1, tzinfo=timezone.utc), + sources=("command",), dry_run=True, batch_size=10, + ) + assert repo.insert_calls == 0 + # Tagger was still invoked — the dry-run only skips persistence. + assert len(tagger.events) == 1 + + +@pytest.mark.asyncio +async def test_backfill_canary_source_emits_canary_fingerprint_events( + monkeypatch: pytest.MonkeyPatch, +) -> None: + tagger = _RecordingTagger() + monkeypatch.setattr("decnet.cli.ttp.get_tagger", lambda: tagger) + repo = _FakeRepo( + attackers_with_commands=[], + canary_triggers=[_make_trigger("trig-1"), _make_trigger("trig-2")], + ) + await _backfill( + repo, cutoff=datetime(2026, 1, 1, tzinfo=timezone.utc), + sources=("canary",), dry_run=False, batch_size=10, + ) + assert [e.source_kind for e in tagger.events] == [ + "canary_fingerprint", "canary_fingerprint", + ] + assert [e.source_id for e in tagger.events] == ["trig-1", "trig-2"] + assert tagger.events[0].payload["src_ip"] == "1.2.3.4" + + +@pytest.mark.asyncio +async def test_backfill_does_not_publish_to_bus( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The backfill path must never touch the bus — no SIEM re-fire.""" + tagger = _RecordingTagger() + monkeypatch.setattr("decnet.cli.ttp.get_tagger", lambda: tagger) + publish_called = False + + def _explode(*_a: object, **_kw: object) -> None: + nonlocal publish_called + publish_called = True + + # The CLI module must not import the bus publisher at all; this + # guard catches any future drift. + monkeypatch.setattr( + "decnet.bus.publish.run_health_heartbeat", _explode, raising=False, + ) + attacker = _make_attacker() + repo = _FakeRepo( + attackers_with_commands=[(attacker, [ + {"id": "cmd-a", "command_text": "whoami"}, + ])], + canary_triggers=[], + ) + await _backfill( + repo, cutoff=datetime(2026, 1, 1, tzinfo=timezone.utc), + sources=("command",), dry_run=False, batch_size=10, + ) + assert not publish_called + + +@pytest.mark.asyncio +async def test_backfill_command_skips_malformed_entries( + monkeypatch: pytest.MonkeyPatch, +) -> None: + tagger = _RecordingTagger() + monkeypatch.setattr("decnet.cli.ttp.get_tagger", lambda: tagger) + attacker = _make_attacker() + repo = _FakeRepo( + attackers_with_commands=[(attacker, [ + {"id": "cmd-a", "command_text": "whoami"}, + {"id": "cmd-b"}, # no command_text + {"id": "cmd-c", "command_text": "id"}, + ])], + canary_triggers=[], + ) + await _backfill( + repo, cutoff=datetime(2026, 1, 1, tzinfo=timezone.utc), + sources=("command",), dry_run=False, batch_size=10, + ) + assert [e.source_id for e in tagger.events] == ["cmd-a", "cmd-c"]