feat(ttp): E.3.17 worker registration + scoped schemathesis suite

Wires decnet-ttp as a first-class worker:

* `decnet ttp` CLI command (master-only via MASTER_ONLY_COMMANDS)
* deploy/decnet-ttp.service.j2 systemd unit (After= identity / intel
  / reuse-correlator workers; ProtectHome=read-only since
  FilesystemRuleStore only reads ./rules/ttp/)
* deploy/decnet.target Wants= chain extended with decnet-ttp.service
* `ttp` was already in web/worker_registry.KNOWN_WORKERS

tests/api/test_schemathesis_ttp.py: TTP-routes-only schemathesis
suite, filtered via the OpenAPI tags=["TTP Tagging"] annotation
shared by the eight TTP routes. Reuses the live uvicorn subprocess
the wider test_schemathesis spawns; max_examples=400 keeps the
focused gate fast for E.3.13–E.3.16 iteration.

wiki-checkout/Service-Bus.md committed in its own repo: ttp.tagged
and ttp.rule.fired.<id> flipped from "reserved (TTP worker)" to
"decnet.ttp.worker" now that the worker publishes them.
This commit is contained in:
2026-05-01 21:26:46 -04:00
parent 07a609973b
commit 9a31d0e50c
6 changed files with 221 additions and 1 deletions

View File

@@ -30,6 +30,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({
"mutate", "listener", "profiler",
"services", "distros", "correlate", "archetypes", "web",
"db-reset", "init", "webhook", "clusterer", "campaign-clusterer",
"ttp",
})
MASTER_ONLY_GROUPS: frozenset[str] = frozenset(
{"swarm", "topology", "geoip", "realism"}

View File

@@ -295,3 +295,57 @@ def register(app: typer.Typer) -> None:
asyncio.run(_run())
except KeyboardInterrupt:
console.print("\n[yellow]Campaign clusterer stopped.[/]")
@app.command(name="ttp")
def ttp(
poll_interval_secs: float = typer.Option(
60.0, "--poll-interval", "-i",
help="Slow-tick fallback when the bus is idle or unavailable (seconds)",
),
daemon: bool = typer.Option(
False, "--daemon", "-d",
help="Detach to background as a daemon process",
),
) -> None:
"""TTP-tagging worker — MITRE ATT&CK technique tagging.
Bus-woken on ``attacker.session.ended`` / ``attacker.observed``
/ ``attacker.intel.enriched`` / ``identity.formed`` /
``identity.merged`` / ``credential.reuse.detected`` /
``email.received`` / ``canary.>``. Dispatches each event
through the :class:`CompositeTagger` (RuleEngine +
Behavioral / Intel / Email / CanaryFingerprint / Identity /
Credential lifters), persists ``ttp_tag`` rows via the
idempotent ``INSERT OR IGNORE`` write, and publishes
``ttp.tagged`` + per-technique ``ttp.rule.fired.*`` only when
the insert returned a non-zero rowcount (loop-prevention
invariant from TTP_TAGGING.md §"Bus topics").
"""
import asyncio
from decnet.cli.gating import _require_master_mode
from decnet.ttp.worker import run_ttp_worker_loop
from decnet.web.dependencies import repo
_require_master_mode("ttp")
if daemon:
log.info("ttp daemonizing poll=%s", poll_interval_secs)
_utils._daemonize()
log.info("ttp command invoked poll=%s", poll_interval_secs)
console.print(
f"[bold cyan]TTP tagging worker starting[/] "
f"poll={poll_interval_secs}s"
)
console.print("[dim]Press Ctrl+C to stop[/]")
async def _run() -> None:
await repo.initialize()
await run_ttp_worker_loop(
repo, poll_interval_secs=poll_interval_secs,
)
try:
asyncio.run(_run())
except KeyboardInterrupt:
console.print("\n[yellow]TTP tagging worker stopped.[/]")

View File

@@ -0,0 +1,57 @@
[Unit]
Description=DECNET TTP Tagger (MITRE ATT&CK technique tagging)
Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#ttp-tagger
After=network-online.target decnet-bus.service decnet-clusterer.service decnet-enrich.service decnet-reuse-correlator.service
Wants=network-online.target decnet-bus.service
[Service]
Type=simple
User={{ user }}
Group={{ group }}
WorkingDirectory={{ install_dir }}
EnvironmentFile=-{{ install_dir }}/.env.local
Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.ttp.log
# Subscribes to attacker.session.ended (primary), attacker.observed,
# attacker.intel.enriched, identity.formed, identity.merged,
# credential.reuse.detected, email.received, and canary.> ; falls back
# to a 60s slow-tick poll when the bus is idle or unavailable. Each
# event is dispatched through the CompositeTagger (RuleEngine +
# Behavioral / Intel / Email / CanaryFingerprint / Identity /
# Credential lifters), persisted via the idempotent INSERT OR IGNORE
# repo write, and ttp.tagged + ttp.rule.fired.<technique_id> are
# published only when the insert returned a non-zero rowcount
# (loop-prevention invariant — see TTP_TAGGING.md §"Bus topics").
#
# Master-only: gated via MASTER_ONLY_COMMANDS in decnet/cli/gating.py.
# Sits one layer above the identity / intel / reuse-correlator
# workers — the After= dependencies ensure their bus topics are live
# before the TTP worker subscribes.
ExecStart={{ venv_dir }}/bin/decnet ttp
StandardOutput=append:/var/log/decnet/decnet.ttp.log
StandardError=append:/var/log/decnet/decnet.ttp.log
CapabilityBoundingSet=
AmbientCapabilities=
# Security Hardening
NoNewPrivileges=yes
ProtectSystem=full
# Dev installs under /home need ProtectHome=read-only (the worker
# reads ./rules/ttp/ from the project root, which lives under /home
# on dev boxes — read-only suffices because the FilesystemRuleStore
# only reads YAMLs, never writes).
ProtectHome=read-only
PrivateTmp=yes
ProtectKernelTunables=yes
ProtectKernelModules=yes
ProtectControlGroups=yes
RestrictSUIDSGID=yes
LockPersonality=yes
ReadWritePaths={{ install_dir }} /var/log/decnet
Restart=on-failure
RestartSec=5
TimeoutStopSec=15
[Install]
WantedBy=multi-user.target

View File

@@ -18,6 +18,7 @@ Wants=decnet-bus.service \
decnet-enrich.service \
decnet-clusterer.service \
decnet-campaign-clusterer.service \
decnet-ttp.service \
decnet-webhook.service \
decnet-canary.service \
decnet-orchestrator.service

View File

@@ -3050,6 +3050,14 @@ Order:
deferred to E.3.14b — today the worker is 1:1 source-kind →
lifter; the catch-up rewrite needs a session→intel join the
repo doesn't expose yet.
Worker registration: `decnet ttp` CLI command lands in
`decnet/cli/workers.py` (master-only, gated through
`MASTER_ONLY_COMMANDS` in `decnet/cli/gating.py`); the
rendered systemd unit at `deploy/decnet-ttp.service.j2`
sits one layer above the identity / intel / reuse-correlator
workers via `After=` deps and is included in
`deploy/decnet.target`. `ProtectHome=read-only` suffices —
FilesystemRuleStore only reads `./rules/ttp/`.
15. **UKC bridge** — implement `tactic_to_ukc_phase` and inverse.
Rewrite the campaign clusterer's
`IdentityFeatures.commands_by_phase_on_decky` adapter to read
@@ -3087,7 +3095,19 @@ Order:
`tsc --noEmit` + `vite build` clean.
17. **Schemathesis pass** — full API fuzz including the new TTP
routes. Document any new 4xx codes per the project's
"POST/PUT/PATCH 400" convention.
"POST/PUT/PATCH 400" convention. ✅ done.
`POST /ttp/rules/{rule_id}/state` already documents 400
(manual-parse for malformed JSON, per
`feedback_schemathesis_400.md`); the GET rollups
(by-identity / by-attacker / by-campaign / by-session /
techniques / rules / export-navigator{,/identity})
uniformly document 401 + 403 per the auth-gated convention.
`wiki-checkout/Service-Bus.md` updated to flip the TTP
worker topics from "_reserved (TTP worker)_" to actual
publisher attribution (`decnet.ttp.worker`) now that the
worker bootstrap publishes them. Suppression-event publish
stays deferred per the v0 contract — the repo drops
sub-floor confidence directly, no bus event.
### E.4 Out-of-band tasks (not gated on the above)

View File

@@ -0,0 +1,87 @@
"""Schemathesis contract tests scoped to the TTP Tagging API surface.
E.3.17 of ``development/TTP_TAGGING.md``. The full ``test_schemathesis``
suite fuzzes every endpoint with ``max_examples=3000`` — slow and
overkill when iterating on TTP-routes-only changes (E.3.13E.3.16).
This file filters by the OpenAPI ``tags=["TTP Tagging"]`` annotation
the eight TTP routes carry, runs against the same live uvicorn
subprocess the wider suite spins up, and applies the same check
battery so a 4xx-shape regression on a TTP route fails here without
waiting on the rest of the API.
Routes covered (all decorated ``tags=["TTP Tagging"]``):
* ``GET /api/v1/ttp/techniques``
* ``GET /api/v1/ttp/by-identity/{identity_uuid}``
* ``GET /api/v1/ttp/by-attacker/{attacker_uuid}``
* ``GET /api/v1/ttp/by-campaign/{campaign_uuid}``
* ``GET /api/v1/ttp/by-session/{session_id}``
* ``GET /api/v1/ttp/rules``
* ``POST /api/v1/ttp/rules/{rule_id}/state``
* ``DELETE /api/v1/ttp/rules/{rule_id}/state``
* ``GET /api/v1/ttp/export/navigator``
* ``GET /api/v1/ttp/export/navigator/identity/{identity_uuid}``
"""
from __future__ import annotations
import pytest
import schemathesis as st
from hypothesis import HealthCheck, Verbosity, settings
from tests.api.test_schemathesis import (
ALL_CHECKS,
AUTH_CHECKS,
LIVE_SERVER_URL,
before_call as _shared_before_call, # noqa: F401 (registers @st.hook)
)
# Reuse the schema fetched against the same uvicorn subprocess started
# by ``test_schemathesis``. Filtering by tag keeps the TTP suite a
# fast, focused contract gate without re-spinning the server.
TTP_SCHEMA = st.openapi.from_url(
f"{LIVE_SERVER_URL}/openapi.json",
).include(tag="TTP Tagging")
@pytest.mark.fuzz
@TTP_SCHEMA.parametrize()
@settings(
max_examples=400,
deadline=None,
verbosity=Verbosity.normal,
suppress_health_check=[
HealthCheck.filter_too_much,
HealthCheck.too_slow,
HealthCheck.data_too_large,
],
)
def test_ttp_schema_compliance(case):
"""Per-TTP-route schema compliance — valid + invalid inputs."""
case.call_and_validate(checks=ALL_CHECKS)
@pytest.mark.fuzz
@TTP_SCHEMA.parametrize()
@settings(
max_examples=120,
deadline=None,
verbosity=Verbosity.normal,
suppress_health_check=[
HealthCheck.filter_too_much,
HealthCheck.too_slow,
],
)
def test_ttp_auth_enforcement(case):
"""Every TTP route rejects requests without a Bearer token (401).
The mutation endpoints additionally require ``admin`` (server-side
``require_admin``); the authless probe doesn't distinguish 401 vs
403 here — the ``ignored_auth`` check just asserts that an absent
token never lands the request inside the handler with a usable
identity.
"""
case.headers = {
k: v for k, v in (case.headers or {}).items()
if k.lower() != "authorization"
}
case.call_and_validate(checks=AUTH_CHECKS)