feat(ttp): E.1.3+E.1.4 Tagger ABC and composite factory contract

Third and fourth TTP-tagging contract commits, plus a scoped subset
of the E.2.4 conformance tests covering the contract surface shipped
here (full hypothesis-fuzz suite still lands with E.2.4).

E.1.3 — decnet/ttp/base.py
- TaggerEvent NamedTuple: source_kind, source_id, attacker_uuid,
  identity_uuid, session_id, decky_id, opaque payload.
- Tagger(ABC) with abstract async tag(); class-level name and
  HANDLES: frozenset[str] (default empty so a misconfigured subclass
  is loudly idle, not loudly noisy).
- TolerantTagger(Tagger): concrete tag() wraps abstract _tag_impl()
  in try/except Exception (deliberately not BaseException — so
  KeyboardInterrupt / SystemExit / asyncio.CancelledError propagate
  and the worker can shut down cleanly). Swallowed exceptions log
  at WARNING with exc_info, never ERROR — absence is the steady
  state, not a bug. Subclasses override _tag_impl, never tag — the
  tolerance contract is enforced in the base class, not on trust.
- KNOWN_SOURCE_KINDS: Final[frozenset[str]] enumerating every
  source_kind a producer is allowed to emit. Closed-by-enumeration
  at the runtime layer; the composite tagger keys its WARNING/INFO
  bridge off this constant to surface the silent-drop trap from
  the design doc (lines 160–195).

E.1.4 — decnet/ttp/factory.py
- get_tagger() reads DECNET_TTP_TAGGER_TYPE (default 'composite');
  unknown values raise ValueError with the known-list. Mirrors
  decnet.intel.factory and decnet.clustering.factory.
- _KNOWN = ('composite',). Per-lifter classes (E.1.6) are children
  of the composite, not standalone tagger types.
- CompositeTagger(Tagger): pre-computes a dict[str, list[Tagger]]
  dispatch index from each lifter's HANDLES; fans events out
  concurrently with asyncio.gather and concatenates results.
  Empty lifters=[] is the legal contract-phase state — E.1.6
  wires the real lifters in.
- Unhandled-event observability: source_kind in KNOWN_SOURCE_KINDS
  but no lifter claims it -> WARNING once per kind per process
  (missed E.1.6 update). Unknown kind -> INFO once per kind per
  process (future-feature telemetry, by design). Per-process dedup
  via plain set; E.1.6 may swap in a proper rate-limiter once
  production traffic shapes are known.

Tests — tests/ttp/test_base.py, tests/ttp/test_factory.py
- Tagger / TolerantTagger abstractness, missing-tag-impl rejection,
  WARNING-not-ERROR log level, propagation of KeyboardInterrupt /
  SystemExit / asyncio.CancelledError.
- Factory env-var routing, unknown-name ValueError, dispatch-index
  correctness, only-claiming-lifter invocation, WARNING-once for
  known-but-unclaimed kinds, INFO-once for unknown kinds, result
  concatenation across lifters.

Mypy clean under .311/bin/mypy --ignore-missing-imports.
This commit is contained in:
2026-05-01 06:20:10 -04:00
parent e395306dcb
commit c3c5813211
6 changed files with 514 additions and 0 deletions

7
decnet/ttp/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""TTP-tagging subsystem.
Maps DECNET telemetry to MITRE ATT&CK technique tags. See
``development/TTP_TAGGING.md`` for the full design. Callers obtain
the active tagger via :func:`decnet.ttp.factory.get_tagger` — never
instantiate concrete lifter classes directly.
"""

138
decnet/ttp/base.py Normal file
View File

@@ -0,0 +1,138 @@
"""Tagger ABC — input shape, base class, tolerant mixin.
Contract step E.1.3 of ``development/TTP_TAGGING.md``. Defines the type
surface every lifter (E.1.6), the rule engine (E.1.5), the composite
tagger (E.1.4) and the worker (E.1.7) compile against. No behavior
beyond the tolerant-wrapper boundary lives here.
The design doc's "schema is forward-compat, code is not" trap (lines
160195) is mitigated *here*: :data:`KNOWN_SOURCE_KINDS` enumerates
every ``source_kind`` a producer is allowed to emit. Adding a new
producer means adding its kind to this set in the *same commit* that
ships the producer; the composite tagger's WARNING/INFO bridge in
:mod:`decnet.ttp.factory` keys off this constant to surface silent
drops.
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Any, Final, NamedTuple
from decnet.web.db.models.ttp import TTPTag
_log = logging.getLogger(__name__)
# Every ``source_kind`` string a DECNET producer is allowed to emit.
# Closed-by-enumeration at the runtime layer even though the storage
# column is open. Producers MUST add their kind here in the same
# commit that starts emitting — see the design doc lines 160195 for
# the operational contract and the rationale.
KNOWN_SOURCE_KINDS: Final[frozenset[str]] = frozenset({
"command",
"intel",
"email",
"canary_fingerprint",
"identity",
"credential",
"auth_attempt",
"payload",
"session",
"http_request",
})
class TaggerEvent(NamedTuple):
"""Input shape for every tagger.
NamedTuple (not dataclass) so instances are hashable — downstream
dedup paths can put them in sets without a custom ``__hash__``.
``payload`` is opaque on purpose: each ``source_kind`` carries a
different shape, and the per-lifter contract owns the parse.
"""
source_kind: str
source_id: str
attacker_uuid: str | None
identity_uuid: str | None
session_id: str | None
decky_id: str | None
payload: dict[str, Any]
class Tagger(ABC):
"""Abstract tagger.
Every concrete tagger sets :attr:`name` and :attr:`HANDLES` at
class level. The composite tagger reads ``HANDLES`` to build its
dispatch index — a subclass that forgets to override it gets the
empty default and is therefore never invoked, which surfaces as a
test failure rather than a silent fan-out.
"""
#: Short tag used in logs and the ``DECNET_TTP_TAGGER_TYPE`` env
#: var. Subclasses override.
name: str = ""
#: ``source_kind`` strings this tagger consumes. Empty by default
#: so a misconfigured subclass is loudly idle, not loudly noisy.
HANDLES: frozenset[str] = frozenset()
@abstractmethod
async def tag(self, event: TaggerEvent) -> list[TTPTag]:
"""Produce zero or more tags for ``event``.
Implementations of :class:`Tagger` directly take responsibility
for their own error handling. Lifters that consume
sibling-worker output inherit from :class:`TolerantTagger`
instead, which enforces the "absence is not an error" contract
in the base class rather than on trust.
"""
class TolerantTagger(Tagger):
"""Tagger mixin that converts uncaught exceptions to ``[]``.
Every per-source lifter inherits from this. The rationale is
architectural, not stylistic: TTP tagging consumes outputs from
sibling workers (intel, behavioral, identity, …) that may not
have run yet, may have failed, or may simply have nothing to say
about a given event. "Absence" is the steady state, not the
exception, so a lifter blowing up on a missing join must not
cascade into a worker crash.
Subclasses override :meth:`_tag_impl`, never :meth:`tag` — the
tolerance contract is *enforced in the base class*, not on trust.
"""
async def tag(self, event: TaggerEvent) -> list[TTPTag]:
try:
return await self._tag_impl(event)
except Exception:
# ``Exception`` deliberately, not ``BaseException``:
# ``KeyboardInterrupt`` / ``SystemExit`` /
# ``asyncio.CancelledError`` propagate so the worker can
# shut down cleanly. E.2.4 conformance asserts this.
# WARNING, not ERROR: a sibling-worker absence is normal
# operation, not a bug. ERROR would page someone for the
# steady state.
_log.warning(
"tagger %r swallowed exception on source_kind=%r",
self.name,
event.source_kind,
exc_info=True,
)
return []
@abstractmethod
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
"""Real tagging logic — subclasses override this, not :meth:`tag`."""
__all__ = [
"KNOWN_SOURCE_KINDS",
"TaggerEvent",
"Tagger",
"TolerantTagger",
]

121
decnet/ttp/factory.py Normal file
View File

@@ -0,0 +1,121 @@
"""Tagger factory + composite tagger.
Contract step E.1.4 of ``development/TTP_TAGGING.md``. Mirrors the
provider-subpackage convention used by :mod:`decnet.intel.factory` and
:mod:`decnet.clustering.factory`: callers obtain the active tagger via
:func:`get_tagger` rather than instantiating a concrete class directly.
The composite tagger is the only shippable tagger type — per-lifter
classes (E.1.6) are children of the composite, not standalone tagger
``DECNET_TTP_TAGGER_TYPE`` values.
Configuration:
* ``DECNET_TTP_TAGGER_TYPE`` — which tagger to instantiate. Default
``"composite"``. Unknown values raise :class:`ValueError` so a typo
in ``decnet.ini`` surfaces immediately rather than silently falling
back.
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Final
from decnet.ttp.base import KNOWN_SOURCE_KINDS, Tagger, TaggerEvent
from decnet.web.db.models.ttp import TTPTag
_log = logging.getLogger(__name__)
_KNOWN: Final[tuple[str, ...]] = ("composite",)
_DEFAULT: Final[str] = "composite"
class CompositeTagger(Tagger):
"""Fans an event out to every lifter that claims its ``source_kind``.
The composite is the runtime end of the closed-by-enumeration
bridge described in :mod:`decnet.ttp.base`: when an event arrives
with a ``source_kind`` no lifter claims, the composite emits a
structured log line so the silent-drop trap from the design doc
becomes observable.
During the contract phase (this commit) ``lifters=[]`` is the
legal state — E.1.6 wires the real per-source lifters in.
"""
name = "composite"
# The composite itself accepts every event; per-kind dispatch is
# delegated to children. Empty here is "n/a, computed from
# children" — the dispatch index below is what actually drives
# the fan-out.
HANDLES: frozenset[str] = frozenset()
def __init__(self, lifters: list[Tagger]) -> None:
self._lifters: list[Tagger] = list(lifters)
index: dict[str, list[Tagger]] = {}
for lifter in self._lifters:
for kind in lifter.HANDLES:
index.setdefault(kind, []).append(lifter)
self._by_kind: dict[str, list[Tagger]] = index
# Per-process dedup state so a flood of one unknown kind
# produces one log line, not one per event. A simple set
# is fine for the contract; E.1.6 may swap in a proper
# rate-limiter once production traffic shapes are known.
self._warned_known: set[str] = set()
self._informed_unknown: set[str] = set()
async def tag(self, event: TaggerEvent) -> list[TTPTag]:
lifters = self._by_kind.get(event.source_kind, [])
if not lifters:
self._log_unhandled(event.source_kind)
return []
results = await asyncio.gather(*(t.tag(event) for t in lifters))
out: list[TTPTag] = []
for tags in results:
out.extend(tags)
return out
def _log_unhandled(self, source_kind: str) -> None:
if source_kind in KNOWN_SOURCE_KINDS:
if source_kind not in self._warned_known:
self._warned_known.add(source_kind)
# Producer ships a kind that *should* be handled but
# no lifter claims it — almost certainly a missed
# E.1.6 update. Loud once per kind per process.
_log.warning(
"composite tagger: no lifter claims known "
"source_kind=%r; events will be dropped until a "
"lifter is registered",
source_kind,
)
else:
if source_kind not in self._informed_unknown:
self._informed_unknown.add(source_kind)
# Telemetry from a future feature, no lifter yet, by
# design (lines 160195 of the design doc). INFO once
# per process; never an error.
_log.info(
"composite tagger: unknown source_kind=%r "
"(not in KNOWN_SOURCE_KINDS); ignoring",
source_kind,
)
def get_tagger() -> Tagger:
"""Return the configured tagger instance.
Lazy package layout: the composite is constructed with an empty
lifter list during the contract phase. E.1.6 will replace this
with explicit lifter wiring; callers don't change.
"""
name = os.environ.get("DECNET_TTP_TAGGER_TYPE", _DEFAULT).strip().lower()
if name == "composite":
return CompositeTagger(lifters=[])
raise ValueError(
f"Unknown tagger: {name!r}. Known: {_KNOWN}"
)
__all__ = ["get_tagger", "CompositeTagger"]