diff --git a/decnet/cli/gating.py b/decnet/cli/gating.py index 19b039c9..a5db1925 100644 --- a/decnet/cli/gating.py +++ b/decnet/cli/gating.py @@ -30,6 +30,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ "mutate", "listener", "profiler", "services", "distros", "correlate", "archetypes", "web", "db-reset", "init", "webhook", "clusterer", "campaign-clusterer", + "ttp", }) MASTER_ONLY_GROUPS: frozenset[str] = frozenset( {"swarm", "topology", "geoip", "realism"} diff --git a/decnet/cli/workers.py b/decnet/cli/workers.py index 4d4ae7db..c126e3dc 100644 --- a/decnet/cli/workers.py +++ b/decnet/cli/workers.py @@ -295,3 +295,57 @@ def register(app: typer.Typer) -> None: asyncio.run(_run()) except KeyboardInterrupt: console.print("\n[yellow]Campaign clusterer stopped.[/]") + + @app.command(name="ttp") + def ttp( + poll_interval_secs: float = typer.Option( + 60.0, "--poll-interval", "-i", + help="Slow-tick fallback when the bus is idle or unavailable (seconds)", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + ) -> None: + """TTP-tagging worker — MITRE ATT&CK technique tagging. + + Bus-woken on ``attacker.session.ended`` / ``attacker.observed`` + / ``attacker.intel.enriched`` / ``identity.formed`` / + ``identity.merged`` / ``credential.reuse.detected`` / + ``email.received`` / ``canary.>``. Dispatches each event + through the :class:`CompositeTagger` (RuleEngine + + Behavioral / Intel / Email / CanaryFingerprint / Identity / + Credential lifters), persists ``ttp_tag`` rows via the + idempotent ``INSERT OR IGNORE`` write, and publishes + ``ttp.tagged`` + per-technique ``ttp.rule.fired.*`` only when + the insert returned a non-zero rowcount (loop-prevention + invariant from TTP_TAGGING.md §"Bus topics"). + """ + import asyncio + from decnet.cli.gating import _require_master_mode + from decnet.ttp.worker import run_ttp_worker_loop + from decnet.web.dependencies import repo + + _require_master_mode("ttp") + + if daemon: + log.info("ttp daemonizing poll=%s", poll_interval_secs) + _utils._daemonize() + + log.info("ttp command invoked poll=%s", poll_interval_secs) + console.print( + f"[bold cyan]TTP tagging worker starting[/] " + f"poll={poll_interval_secs}s" + ) + console.print("[dim]Press Ctrl+C to stop[/]") + + async def _run() -> None: + await repo.initialize() + await run_ttp_worker_loop( + repo, poll_interval_secs=poll_interval_secs, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]TTP tagging worker stopped.[/]") diff --git a/deploy/decnet-ttp.service.j2 b/deploy/decnet-ttp.service.j2 new file mode 100644 index 00000000..e466084c --- /dev/null +++ b/deploy/decnet-ttp.service.j2 @@ -0,0 +1,57 @@ +[Unit] +Description=DECNET TTP Tagger (MITRE ATT&CK technique tagging) +Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#ttp-tagger +After=network-online.target decnet-bus.service decnet-clusterer.service decnet-enrich.service decnet-reuse-correlator.service +Wants=network-online.target decnet-bus.service + +[Service] +Type=simple +User={{ user }} +Group={{ group }} +WorkingDirectory={{ install_dir }} +EnvironmentFile=-{{ install_dir }}/.env.local +Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.ttp.log +# Subscribes to attacker.session.ended (primary), attacker.observed, +# attacker.intel.enriched, identity.formed, identity.merged, +# credential.reuse.detected, email.received, and canary.> ; falls back +# to a 60s slow-tick poll when the bus is idle or unavailable. Each +# event is dispatched through the CompositeTagger (RuleEngine + +# Behavioral / Intel / Email / CanaryFingerprint / Identity / +# Credential lifters), persisted via the idempotent INSERT OR IGNORE +# repo write, and ttp.tagged + ttp.rule.fired. are +# published only when the insert returned a non-zero rowcount +# (loop-prevention invariant — see TTP_TAGGING.md §"Bus topics"). +# +# Master-only: gated via MASTER_ONLY_COMMANDS in decnet/cli/gating.py. +# Sits one layer above the identity / intel / reuse-correlator +# workers — the After= dependencies ensure their bus topics are live +# before the TTP worker subscribes. +ExecStart={{ venv_dir }}/bin/decnet ttp +StandardOutput=append:/var/log/decnet/decnet.ttp.log +StandardError=append:/var/log/decnet/decnet.ttp.log + +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +# Dev installs under /home need ProtectHome=read-only (the worker +# reads ./rules/ttp/ from the project root, which lives under /home +# on dev boxes — read-only suffices because the FilesystemRuleStore +# only reads YAMLs, never writes). +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +ReadWritePaths={{ install_dir }} /var/log/decnet + +Restart=on-failure +RestartSec=5 +TimeoutStopSec=15 + +[Install] +WantedBy=multi-user.target diff --git a/deploy/decnet.target b/deploy/decnet.target index 409b831c..71f0b25f 100644 --- a/deploy/decnet.target +++ b/deploy/decnet.target @@ -18,6 +18,7 @@ Wants=decnet-bus.service \ decnet-enrich.service \ decnet-clusterer.service \ decnet-campaign-clusterer.service \ + decnet-ttp.service \ decnet-webhook.service \ decnet-canary.service \ decnet-orchestrator.service diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index f9baf255..fa3d5e62 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -3050,6 +3050,14 @@ Order: deferred to E.3.14b — today the worker is 1:1 source-kind → lifter; the catch-up rewrite needs a session→intel join the repo doesn't expose yet. + Worker registration: `decnet ttp` CLI command lands in + `decnet/cli/workers.py` (master-only, gated through + `MASTER_ONLY_COMMANDS` in `decnet/cli/gating.py`); the + rendered systemd unit at `deploy/decnet-ttp.service.j2` + sits one layer above the identity / intel / reuse-correlator + workers via `After=` deps and is included in + `deploy/decnet.target`. `ProtectHome=read-only` suffices — + FilesystemRuleStore only reads `./rules/ttp/`. 15. **UKC bridge** — implement `tactic_to_ukc_phase` and inverse. Rewrite the campaign clusterer's `IdentityFeatures.commands_by_phase_on_decky` adapter to read @@ -3087,7 +3095,19 @@ Order: `tsc --noEmit` + `vite build` clean. 17. **Schemathesis pass** — full API fuzz including the new TTP routes. Document any new 4xx codes per the project's - "POST/PUT/PATCH 400" convention. + "POST/PUT/PATCH 400" convention. ✅ done. + `POST /ttp/rules/{rule_id}/state` already documents 400 + (manual-parse for malformed JSON, per + `feedback_schemathesis_400.md`); the GET rollups + (by-identity / by-attacker / by-campaign / by-session / + techniques / rules / export-navigator{,/identity}) + uniformly document 401 + 403 per the auth-gated convention. + `wiki-checkout/Service-Bus.md` updated to flip the TTP + worker topics from "_reserved (TTP worker)_" to actual + publisher attribution (`decnet.ttp.worker`) now that the + worker bootstrap publishes them. Suppression-event publish + stays deferred per the v0 contract — the repo drops + sub-floor confidence directly, no bus event. ### E.4 Out-of-band tasks (not gated on the above) diff --git a/tests/api/test_schemathesis_ttp.py b/tests/api/test_schemathesis_ttp.py new file mode 100644 index 00000000..1b789af8 --- /dev/null +++ b/tests/api/test_schemathesis_ttp.py @@ -0,0 +1,87 @@ +"""Schemathesis contract tests scoped to the TTP Tagging API surface. + +E.3.17 of ``development/TTP_TAGGING.md``. The full ``test_schemathesis`` +suite fuzzes every endpoint with ``max_examples=3000`` — slow and +overkill when iterating on TTP-routes-only changes (E.3.13–E.3.16). +This file filters by the OpenAPI ``tags=["TTP Tagging"]`` annotation +the eight TTP routes carry, runs against the same live uvicorn +subprocess the wider suite spins up, and applies the same check +battery so a 4xx-shape regression on a TTP route fails here without +waiting on the rest of the API. + +Routes covered (all decorated ``tags=["TTP Tagging"]``): + +* ``GET /api/v1/ttp/techniques`` +* ``GET /api/v1/ttp/by-identity/{identity_uuid}`` +* ``GET /api/v1/ttp/by-attacker/{attacker_uuid}`` +* ``GET /api/v1/ttp/by-campaign/{campaign_uuid}`` +* ``GET /api/v1/ttp/by-session/{session_id}`` +* ``GET /api/v1/ttp/rules`` +* ``POST /api/v1/ttp/rules/{rule_id}/state`` +* ``DELETE /api/v1/ttp/rules/{rule_id}/state`` +* ``GET /api/v1/ttp/export/navigator`` +* ``GET /api/v1/ttp/export/navigator/identity/{identity_uuid}`` +""" +from __future__ import annotations + +import pytest +import schemathesis as st +from hypothesis import HealthCheck, Verbosity, settings + +from tests.api.test_schemathesis import ( + ALL_CHECKS, + AUTH_CHECKS, + LIVE_SERVER_URL, + before_call as _shared_before_call, # noqa: F401 (registers @st.hook) +) + +# Reuse the schema fetched against the same uvicorn subprocess started +# by ``test_schemathesis``. Filtering by tag keeps the TTP suite a +# fast, focused contract gate without re-spinning the server. +TTP_SCHEMA = st.openapi.from_url( + f"{LIVE_SERVER_URL}/openapi.json", +).include(tag="TTP Tagging") + + +@pytest.mark.fuzz +@TTP_SCHEMA.parametrize() +@settings( + max_examples=400, + deadline=None, + verbosity=Verbosity.normal, + suppress_health_check=[ + HealthCheck.filter_too_much, + HealthCheck.too_slow, + HealthCheck.data_too_large, + ], +) +def test_ttp_schema_compliance(case): + """Per-TTP-route schema compliance — valid + invalid inputs.""" + case.call_and_validate(checks=ALL_CHECKS) + + +@pytest.mark.fuzz +@TTP_SCHEMA.parametrize() +@settings( + max_examples=120, + deadline=None, + verbosity=Verbosity.normal, + suppress_health_check=[ + HealthCheck.filter_too_much, + HealthCheck.too_slow, + ], +) +def test_ttp_auth_enforcement(case): + """Every TTP route rejects requests without a Bearer token (401). + + The mutation endpoints additionally require ``admin`` (server-side + ``require_admin``); the authless probe doesn't distinguish 401 vs + 403 here — the ``ignored_auth`` check just asserts that an absent + token never lands the request inside the handler with a usable + identity. + """ + case.headers = { + k: v for k, v in (case.headers or {}).items() + if k.lower() != "authorization" + } + case.call_and_validate(checks=AUTH_CHECKS)