feat(ttp): E.4.a extract decnet/cli/ttp.py with worker run + backfill CLI
The TTP worker entry moved out of decnet/cli/workers.py into its own module so the TTP CLI surface (worker + admin verbs) is colocated, mirroring decnet/cli/canary.py / webhook.py / swarm.py. - New `decnet/cli/ttp.py` with `decnet ttp` (worker, ExecStart-stable for decnet-ttp.service) and `decnet ttp-backfill --since-days N`. - `decnet ttp-backfill` walks Attacker.commands and CanaryTrigger history, dispatches each row through the live CompositeTagger, persists tags via repo.insert_tags (idempotent INSERT OR IGNORE). --dry-run / --source command|canary|all / --batch-size supported. - Backfill deliberately bypasses bus publish — historical replay must not re-trigger SIEM/webhook fan-out per TTP_TAGGING.md §"Bus topics" loop-prevention invariant. - Added `iter_attacker_commands_since` / `iter_canary_triggers_since` read-only iterators on TTPMixin + abstract bindings on BaseRepository. - Master-only via gating; both `ttp` and `ttp-backfill` listed in MASTER_ONLY_COMMANDS.
This commit is contained in:
312
decnet/cli/ttp.py
Normal file
312
decnet/cli/ttp.py
Normal file
@@ -0,0 +1,312 @@
|
||||
"""``decnet ttp`` — TTP-tagging worker and admin commands.
|
||||
|
||||
Two flat commands share this module:
|
||||
|
||||
* ``decnet ttp`` — runs the long-running tagger worker. Bus-woken on
|
||||
``attacker.session.ended`` / ``attacker.observed`` /
|
||||
``attacker.intel.enriched`` / ``identity.{formed,merged}`` /
|
||||
``credential.reuse.detected`` / ``email.received`` / ``canary.>``;
|
||||
dispatches each event through :class:`CompositeTagger` (RuleEngine +
|
||||
Behavioral / Intel / CanaryFingerprint / Email / Identity / Credential
|
||||
lifters), persists ``ttp_tag`` rows via the idempotent
|
||||
``INSERT OR IGNORE`` write, and publishes ``ttp.tagged`` +
|
||||
``ttp.rule.fired.<technique_id>`` only when the insert returned a
|
||||
non-zero rowcount (loop-prevention invariant from TTP_TAGGING.md
|
||||
§"Bus topics"). Invoked by the ``decnet-ttp.service`` systemd unit
|
||||
so its argv must stay stable.
|
||||
|
||||
* ``decnet ttp-backfill`` — replays historical events (shell commands
|
||||
recorded on :class:`Attacker.commands`, :class:`CanaryTrigger` rows)
|
||||
through the live tagger. Writes ``ttp_tag`` rows using the same
|
||||
idempotent insert path. **Does not publish** to the bus — replay must
|
||||
not re-trigger SIEM/webhook fan-out on already-attributed events.
|
||||
|
||||
Both are master-only — gated via ``MASTER_ONLY_COMMANDS`` in
|
||||
:mod:`decnet.cli.gating`.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import typer
|
||||
|
||||
from decnet.ttp.factory import CompositeTagger, get_tagger
|
||||
|
||||
from . import utils as _utils
|
||||
from .utils import console, log
|
||||
|
||||
|
||||
_BACKFILL_SOURCES = ("command", "canary", "all")
|
||||
|
||||
|
||||
def register(app: typer.Typer) -> None:
|
||||
@app.command(name="ttp")
|
||||
def ttp(
|
||||
poll_interval_secs: float = typer.Option(
|
||||
60.0, "--poll-interval", "-i",
|
||||
help="Slow-tick fallback when the bus is idle or unavailable (seconds)",
|
||||
),
|
||||
daemon: bool = typer.Option(
|
||||
False, "--daemon", "-d",
|
||||
help="Detach to background as a daemon process",
|
||||
),
|
||||
) -> None:
|
||||
"""TTP-tagging worker — MITRE ATT&CK technique tagging."""
|
||||
from decnet.cli.gating import _require_master_mode
|
||||
from decnet.ttp.worker import run_ttp_worker_loop
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
_require_master_mode("ttp")
|
||||
|
||||
if daemon:
|
||||
log.info("ttp daemonizing poll=%s", poll_interval_secs)
|
||||
_utils._daemonize()
|
||||
|
||||
log.info("ttp command invoked poll=%s", poll_interval_secs)
|
||||
console.print(
|
||||
f"[bold cyan]TTP tagging worker starting[/] "
|
||||
f"poll={poll_interval_secs}s"
|
||||
)
|
||||
console.print("[dim]Press Ctrl+C to stop[/]")
|
||||
|
||||
async def _run() -> None:
|
||||
await repo.initialize()
|
||||
await run_ttp_worker_loop(
|
||||
repo, poll_interval_secs=poll_interval_secs,
|
||||
)
|
||||
|
||||
try:
|
||||
asyncio.run(_run())
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n[yellow]TTP tagging worker stopped.[/]")
|
||||
|
||||
@app.command(name="ttp-backfill")
|
||||
def ttp_backfill(
|
||||
since_days: int = typer.Option(
|
||||
7, "--since-days", "-s",
|
||||
min=1, max=3650,
|
||||
help="Replay events whose source row is newer than N days ago.",
|
||||
),
|
||||
source: str = typer.Option(
|
||||
"all", "--source",
|
||||
help=f"Source slice to replay. One of: {', '.join(_BACKFILL_SOURCES)}.",
|
||||
),
|
||||
dry_run: bool = typer.Option(
|
||||
False, "--dry-run",
|
||||
help="Run the tagger but skip insert_tags. Reports counts only.",
|
||||
),
|
||||
batch_size: int = typer.Option(
|
||||
500, "--batch-size",
|
||||
min=1, max=100_000,
|
||||
help="Number of tags accumulated before each repo.insert_tags call.",
|
||||
),
|
||||
) -> None:
|
||||
"""Replay historical attacker activity through the live tagger.
|
||||
|
||||
Walks ``Attacker.commands`` (per-IP shell-command history) and
|
||||
``CanaryTrigger`` (canary callback log) since N days ago,
|
||||
builds the same :class:`TaggerEvent` shape the live worker
|
||||
emits, and persists tags via the idempotent INSERT OR IGNORE
|
||||
write. Re-running is safe — a second pass over identical
|
||||
source rows reports ``inserted=0``.
|
||||
|
||||
Bus publish is intentionally suppressed; SIEM / webhook fan-out
|
||||
sees only live events, never replays.
|
||||
"""
|
||||
from decnet.cli.gating import _require_master_mode
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
_require_master_mode("ttp-backfill")
|
||||
|
||||
if source not in _BACKFILL_SOURCES:
|
||||
console.print(
|
||||
f"[red]invalid --source {source!r}; expected one of "
|
||||
f"{_BACKFILL_SOURCES}[/]"
|
||||
)
|
||||
raise typer.Exit(code=2)
|
||||
|
||||
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=since_days)
|
||||
console.print(
|
||||
f"[bold cyan]TTP backfill[/] since={cutoff.isoformat()} "
|
||||
f"source={source} dry_run={dry_run} batch_size={batch_size}"
|
||||
)
|
||||
|
||||
async def _run() -> None:
|
||||
await repo.initialize()
|
||||
await _backfill(
|
||||
repo,
|
||||
cutoff=cutoff,
|
||||
sources=_resolve_sources(source),
|
||||
dry_run=dry_run,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
|
||||
try:
|
||||
asyncio.run(_run())
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n[yellow]Backfill interrupted.[/]")
|
||||
|
||||
|
||||
def _resolve_sources(name: str) -> tuple[str, ...]:
|
||||
if name == "all":
|
||||
return ("command", "canary")
|
||||
return (name,)
|
||||
|
||||
|
||||
async def _backfill(
|
||||
repo: Any,
|
||||
*,
|
||||
cutoff: datetime,
|
||||
sources: tuple[str, ...],
|
||||
dry_run: bool,
|
||||
batch_size: int,
|
||||
) -> None:
|
||||
"""Drive the per-source backfill loops and report structured counts.
|
||||
|
||||
One :class:`CompositeTagger` is built once and reused for every
|
||||
source — the per-lifter watch fan-out the live worker performs is
|
||||
inlined here as a `watch_store()` startup task per
|
||||
:class:`WatchableTagger`, so the dispatch indexes hydrate before
|
||||
we start feeding events.
|
||||
"""
|
||||
# Import-time bound so tests can monkeypatch ``decnet.cli.ttp.get_tagger``
|
||||
# to inject a recording fake without touching the global factory.
|
||||
tagger = get_tagger()
|
||||
watch_tasks: list[asyncio.Task[None]] = []
|
||||
if isinstance(tagger, CompositeTagger):
|
||||
for watchable in tagger.iter_watchables():
|
||||
watch_tasks.append(asyncio.create_task(watchable.watch_store()))
|
||||
# Yield once so each watch_store gets a chance to run its
|
||||
# initial `load_compiled` before we feed the first event.
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
try:
|
||||
if "command" in sources:
|
||||
await _backfill_commands(
|
||||
repo, tagger, cutoff=cutoff,
|
||||
dry_run=dry_run, batch_size=batch_size,
|
||||
)
|
||||
if "canary" in sources:
|
||||
await _backfill_canaries(
|
||||
repo, tagger, cutoff=cutoff,
|
||||
dry_run=dry_run, batch_size=batch_size,
|
||||
)
|
||||
finally:
|
||||
for task in watch_tasks:
|
||||
task.cancel()
|
||||
for task in watch_tasks:
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception): # noqa: BLE001
|
||||
pass
|
||||
|
||||
|
||||
async def _backfill_commands(
|
||||
repo: Any,
|
||||
tagger: Any,
|
||||
*,
|
||||
cutoff: datetime,
|
||||
dry_run: bool,
|
||||
batch_size: int,
|
||||
) -> None:
|
||||
from decnet.ttp.base import TaggerEvent
|
||||
|
||||
started = time.monotonic()
|
||||
rows_seen = 0
|
||||
cmds_seen = 0
|
||||
inserted = 0
|
||||
pending: list[Any] = []
|
||||
|
||||
async for attacker, commands in repo.iter_attacker_commands_since(cutoff):
|
||||
rows_seen += 1
|
||||
for idx, cmd in enumerate(commands):
|
||||
cmds_seen += 1
|
||||
text = cmd.get("command_text") or cmd.get("text")
|
||||
if not isinstance(text, str):
|
||||
continue
|
||||
cmd_id = (
|
||||
cmd.get("id")
|
||||
or cmd.get("uuid")
|
||||
or cmd.get("command_id")
|
||||
or f"{attacker.uuid}#cmd{idx}"
|
||||
)
|
||||
event = TaggerEvent(
|
||||
source_kind="command",
|
||||
source_id=str(cmd_id),
|
||||
attacker_uuid=attacker.uuid,
|
||||
identity_uuid=getattr(attacker, "identity_id", None),
|
||||
session_id=cmd.get("session_id"),
|
||||
decky_id=cmd.get("decky_id") or cmd.get("decky"),
|
||||
payload={**cmd, "command_text": text},
|
||||
)
|
||||
tags = await tagger.tag(event)
|
||||
if tags:
|
||||
pending.extend(tags)
|
||||
if len(pending) >= batch_size:
|
||||
inserted += await _flush(repo, pending, dry_run)
|
||||
pending = []
|
||||
if pending:
|
||||
inserted += await _flush(repo, pending, dry_run)
|
||||
elapsed = time.monotonic() - started
|
||||
console.print(
|
||||
f"source=command rows={rows_seen} commands={cmds_seen} "
|
||||
f"inserted={inserted} dry_run={dry_run} elapsed_s={elapsed:.2f}"
|
||||
)
|
||||
|
||||
|
||||
async def _backfill_canaries(
|
||||
repo: Any,
|
||||
tagger: Any,
|
||||
*,
|
||||
cutoff: datetime,
|
||||
dry_run: bool,
|
||||
batch_size: int,
|
||||
) -> None:
|
||||
from decnet.ttp.base import TaggerEvent
|
||||
|
||||
started = time.monotonic()
|
||||
rows_seen = 0
|
||||
inserted = 0
|
||||
pending: list[Any] = []
|
||||
|
||||
async for trigger in repo.iter_canary_triggers_since(cutoff):
|
||||
rows_seen += 1
|
||||
event = TaggerEvent(
|
||||
source_kind="canary_fingerprint",
|
||||
source_id=trigger.uuid,
|
||||
attacker_uuid=trigger.attacker_id,
|
||||
identity_uuid=None,
|
||||
session_id=None,
|
||||
decky_id=None,
|
||||
payload={
|
||||
"token_uuid": trigger.token_uuid,
|
||||
"src_ip": trigger.src_ip,
|
||||
"ua_signature": trigger.user_agent or "",
|
||||
"user_agent": trigger.user_agent,
|
||||
"request_path": trigger.request_path,
|
||||
"dns_qname": trigger.dns_qname,
|
||||
"headers": trigger.headers(),
|
||||
},
|
||||
)
|
||||
tags = await tagger.tag(event)
|
||||
if tags:
|
||||
pending.extend(tags)
|
||||
if len(pending) >= batch_size:
|
||||
inserted += await _flush(repo, pending, dry_run)
|
||||
pending = []
|
||||
if pending:
|
||||
inserted += await _flush(repo, pending, dry_run)
|
||||
elapsed = time.monotonic() - started
|
||||
console.print(
|
||||
f"source=canary rows={rows_seen} inserted={inserted} "
|
||||
f"dry_run={dry_run} elapsed_s={elapsed:.2f}"
|
||||
)
|
||||
|
||||
|
||||
async def _flush(repo: Any, tags: list[Any], dry_run: bool) -> int:
|
||||
if dry_run:
|
||||
return 0
|
||||
return int(await repo.insert_tags(tags))
|
||||
Reference in New Issue
Block a user