From c3c5813211cee59140425aa3d313a6449f549aa4 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 06:20:10 -0400 Subject: [PATCH] feat(ttp): E.1.3+E.1.4 Tagger ABC and composite factory contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third and fourth TTP-tagging contract commits, plus a scoped subset of the E.2.4 conformance tests covering the contract surface shipped here (full hypothesis-fuzz suite still lands with E.2.4). E.1.3 — decnet/ttp/base.py - TaggerEvent NamedTuple: source_kind, source_id, attacker_uuid, identity_uuid, session_id, decky_id, opaque payload. - Tagger(ABC) with abstract async tag(); class-level name and HANDLES: frozenset[str] (default empty so a misconfigured subclass is loudly idle, not loudly noisy). - TolerantTagger(Tagger): concrete tag() wraps abstract _tag_impl() in try/except Exception (deliberately not BaseException — so KeyboardInterrupt / SystemExit / asyncio.CancelledError propagate and the worker can shut down cleanly). Swallowed exceptions log at WARNING with exc_info, never ERROR — absence is the steady state, not a bug. Subclasses override _tag_impl, never tag — the tolerance contract is enforced in the base class, not on trust. - KNOWN_SOURCE_KINDS: Final[frozenset[str]] enumerating every source_kind a producer is allowed to emit. Closed-by-enumeration at the runtime layer; the composite tagger keys its WARNING/INFO bridge off this constant to surface the silent-drop trap from the design doc (lines 160–195). E.1.4 — decnet/ttp/factory.py - get_tagger() reads DECNET_TTP_TAGGER_TYPE (default 'composite'); unknown values raise ValueError with the known-list. Mirrors decnet.intel.factory and decnet.clustering.factory. - _KNOWN = ('composite',). Per-lifter classes (E.1.6) are children of the composite, not standalone tagger types. - CompositeTagger(Tagger): pre-computes a dict[str, list[Tagger]] dispatch index from each lifter's HANDLES; fans events out concurrently with asyncio.gather and concatenates results. Empty lifters=[] is the legal contract-phase state — E.1.6 wires the real lifters in. - Unhandled-event observability: source_kind in KNOWN_SOURCE_KINDS but no lifter claims it -> WARNING once per kind per process (missed E.1.6 update). Unknown kind -> INFO once per kind per process (future-feature telemetry, by design). Per-process dedup via plain set; E.1.6 may swap in a proper rate-limiter once production traffic shapes are known. Tests — tests/ttp/test_base.py, tests/ttp/test_factory.py - Tagger / TolerantTagger abstractness, missing-tag-impl rejection, WARNING-not-ERROR log level, propagation of KeyboardInterrupt / SystemExit / asyncio.CancelledError. - Factory env-var routing, unknown-name ValueError, dispatch-index correctness, only-claiming-lifter invocation, WARNING-once for known-but-unclaimed kinds, INFO-once for unknown kinds, result concatenation across lifters. Mypy clean under .311/bin/mypy --ignore-missing-imports. --- decnet/ttp/__init__.py | 7 ++ decnet/ttp/base.py | 138 ++++++++++++++++++++++++++++++++++++++ decnet/ttp/factory.py | 121 +++++++++++++++++++++++++++++++++ tests/ttp/__init__.py | 0 tests/ttp/test_base.py | 118 ++++++++++++++++++++++++++++++++ tests/ttp/test_factory.py | 130 +++++++++++++++++++++++++++++++++++ 6 files changed, 514 insertions(+) create mode 100644 decnet/ttp/__init__.py create mode 100644 decnet/ttp/base.py create mode 100644 decnet/ttp/factory.py create mode 100644 tests/ttp/__init__.py create mode 100644 tests/ttp/test_base.py create mode 100644 tests/ttp/test_factory.py diff --git a/decnet/ttp/__init__.py b/decnet/ttp/__init__.py new file mode 100644 index 00000000..3768f12f --- /dev/null +++ b/decnet/ttp/__init__.py @@ -0,0 +1,7 @@ +"""TTP-tagging subsystem. + +Maps DECNET telemetry to MITRE ATT&CK technique tags. See +``development/TTP_TAGGING.md`` for the full design. Callers obtain +the active tagger via :func:`decnet.ttp.factory.get_tagger` — never +instantiate concrete lifter classes directly. +""" diff --git a/decnet/ttp/base.py b/decnet/ttp/base.py new file mode 100644 index 00000000..976f1448 --- /dev/null +++ b/decnet/ttp/base.py @@ -0,0 +1,138 @@ +"""Tagger ABC — input shape, base class, tolerant mixin. + +Contract step E.1.3 of ``development/TTP_TAGGING.md``. Defines the type +surface every lifter (E.1.6), the rule engine (E.1.5), the composite +tagger (E.1.4) and the worker (E.1.7) compile against. No behavior +beyond the tolerant-wrapper boundary lives here. + +The design doc's "schema is forward-compat, code is not" trap (lines +160–195) is mitigated *here*: :data:`KNOWN_SOURCE_KINDS` enumerates +every ``source_kind`` a producer is allowed to emit. Adding a new +producer means adding its kind to this set in the *same commit* that +ships the producer; the composite tagger's WARNING/INFO bridge in +:mod:`decnet.ttp.factory` keys off this constant to surface silent +drops. +""" +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import Any, Final, NamedTuple + +from decnet.web.db.models.ttp import TTPTag + +_log = logging.getLogger(__name__) + + +# Every ``source_kind`` string a DECNET producer is allowed to emit. +# Closed-by-enumeration at the runtime layer even though the storage +# column is open. Producers MUST add their kind here in the same +# commit that starts emitting — see the design doc lines 160–195 for +# the operational contract and the rationale. +KNOWN_SOURCE_KINDS: Final[frozenset[str]] = frozenset({ + "command", + "intel", + "email", + "canary_fingerprint", + "identity", + "credential", + "auth_attempt", + "payload", + "session", + "http_request", +}) + + +class TaggerEvent(NamedTuple): + """Input shape for every tagger. + + NamedTuple (not dataclass) so instances are hashable — downstream + dedup paths can put them in sets without a custom ``__hash__``. + ``payload`` is opaque on purpose: each ``source_kind`` carries a + different shape, and the per-lifter contract owns the parse. + """ + + source_kind: str + source_id: str + attacker_uuid: str | None + identity_uuid: str | None + session_id: str | None + decky_id: str | None + payload: dict[str, Any] + + +class Tagger(ABC): + """Abstract tagger. + + Every concrete tagger sets :attr:`name` and :attr:`HANDLES` at + class level. The composite tagger reads ``HANDLES`` to build its + dispatch index — a subclass that forgets to override it gets the + empty default and is therefore never invoked, which surfaces as a + test failure rather than a silent fan-out. + """ + + #: Short tag used in logs and the ``DECNET_TTP_TAGGER_TYPE`` env + #: var. Subclasses override. + name: str = "" + + #: ``source_kind`` strings this tagger consumes. Empty by default + #: so a misconfigured subclass is loudly idle, not loudly noisy. + HANDLES: frozenset[str] = frozenset() + + @abstractmethod + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + """Produce zero or more tags for ``event``. + + Implementations of :class:`Tagger` directly take responsibility + for their own error handling. Lifters that consume + sibling-worker output inherit from :class:`TolerantTagger` + instead, which enforces the "absence is not an error" contract + in the base class rather than on trust. + """ + + +class TolerantTagger(Tagger): + """Tagger mixin that converts uncaught exceptions to ``[]``. + + Every per-source lifter inherits from this. The rationale is + architectural, not stylistic: TTP tagging consumes outputs from + sibling workers (intel, behavioral, identity, …) that may not + have run yet, may have failed, or may simply have nothing to say + about a given event. "Absence" is the steady state, not the + exception, so a lifter blowing up on a missing join must not + cascade into a worker crash. + + Subclasses override :meth:`_tag_impl`, never :meth:`tag` — the + tolerance contract is *enforced in the base class*, not on trust. + """ + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + try: + return await self._tag_impl(event) + except Exception: + # ``Exception`` deliberately, not ``BaseException``: + # ``KeyboardInterrupt`` / ``SystemExit`` / + # ``asyncio.CancelledError`` propagate so the worker can + # shut down cleanly. E.2.4 conformance asserts this. + # WARNING, not ERROR: a sibling-worker absence is normal + # operation, not a bug. ERROR would page someone for the + # steady state. + _log.warning( + "tagger %r swallowed exception on source_kind=%r", + self.name, + event.source_kind, + exc_info=True, + ) + return [] + + @abstractmethod + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + """Real tagging logic — subclasses override this, not :meth:`tag`.""" + + +__all__ = [ + "KNOWN_SOURCE_KINDS", + "TaggerEvent", + "Tagger", + "TolerantTagger", +] diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py new file mode 100644 index 00000000..b54f34ec --- /dev/null +++ b/decnet/ttp/factory.py @@ -0,0 +1,121 @@ +"""Tagger factory + composite tagger. + +Contract step E.1.4 of ``development/TTP_TAGGING.md``. Mirrors the +provider-subpackage convention used by :mod:`decnet.intel.factory` and +:mod:`decnet.clustering.factory`: callers obtain the active tagger via +:func:`get_tagger` rather than instantiating a concrete class directly. + +The composite tagger is the only shippable tagger type — per-lifter +classes (E.1.6) are children of the composite, not standalone tagger +``DECNET_TTP_TAGGER_TYPE`` values. + +Configuration: + +* ``DECNET_TTP_TAGGER_TYPE`` — which tagger to instantiate. Default + ``"composite"``. Unknown values raise :class:`ValueError` so a typo + in ``decnet.ini`` surfaces immediately rather than silently falling + back. +""" +from __future__ import annotations + +import asyncio +import logging +import os +from typing import Final + +from decnet.ttp.base import KNOWN_SOURCE_KINDS, Tagger, TaggerEvent +from decnet.web.db.models.ttp import TTPTag + +_log = logging.getLogger(__name__) + +_KNOWN: Final[tuple[str, ...]] = ("composite",) +_DEFAULT: Final[str] = "composite" + + +class CompositeTagger(Tagger): + """Fans an event out to every lifter that claims its ``source_kind``. + + The composite is the runtime end of the closed-by-enumeration + bridge described in :mod:`decnet.ttp.base`: when an event arrives + with a ``source_kind`` no lifter claims, the composite emits a + structured log line so the silent-drop trap from the design doc + becomes observable. + + During the contract phase (this commit) ``lifters=[]`` is the + legal state — E.1.6 wires the real per-source lifters in. + """ + + name = "composite" + # The composite itself accepts every event; per-kind dispatch is + # delegated to children. Empty here is "n/a, computed from + # children" — the dispatch index below is what actually drives + # the fan-out. + HANDLES: frozenset[str] = frozenset() + + def __init__(self, lifters: list[Tagger]) -> None: + self._lifters: list[Tagger] = list(lifters) + index: dict[str, list[Tagger]] = {} + for lifter in self._lifters: + for kind in lifter.HANDLES: + index.setdefault(kind, []).append(lifter) + self._by_kind: dict[str, list[Tagger]] = index + # Per-process dedup state so a flood of one unknown kind + # produces one log line, not one per event. A simple set + # is fine for the contract; E.1.6 may swap in a proper + # rate-limiter once production traffic shapes are known. + self._warned_known: set[str] = set() + self._informed_unknown: set[str] = set() + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + lifters = self._by_kind.get(event.source_kind, []) + if not lifters: + self._log_unhandled(event.source_kind) + return [] + results = await asyncio.gather(*(t.tag(event) for t in lifters)) + out: list[TTPTag] = [] + for tags in results: + out.extend(tags) + return out + + def _log_unhandled(self, source_kind: str) -> None: + if source_kind in KNOWN_SOURCE_KINDS: + if source_kind not in self._warned_known: + self._warned_known.add(source_kind) + # Producer ships a kind that *should* be handled but + # no lifter claims it — almost certainly a missed + # E.1.6 update. Loud once per kind per process. + _log.warning( + "composite tagger: no lifter claims known " + "source_kind=%r; events will be dropped until a " + "lifter is registered", + source_kind, + ) + else: + if source_kind not in self._informed_unknown: + self._informed_unknown.add(source_kind) + # Telemetry from a future feature, no lifter yet, by + # design (lines 160–195 of the design doc). INFO once + # per process; never an error. + _log.info( + "composite tagger: unknown source_kind=%r " + "(not in KNOWN_SOURCE_KINDS); ignoring", + source_kind, + ) + + +def get_tagger() -> Tagger: + """Return the configured tagger instance. + + Lazy package layout: the composite is constructed with an empty + lifter list during the contract phase. E.1.6 will replace this + with explicit lifter wiring; callers don't change. + """ + name = os.environ.get("DECNET_TTP_TAGGER_TYPE", _DEFAULT).strip().lower() + if name == "composite": + return CompositeTagger(lifters=[]) + raise ValueError( + f"Unknown tagger: {name!r}. Known: {_KNOWN}" + ) + + +__all__ = ["get_tagger", "CompositeTagger"] diff --git a/tests/ttp/__init__.py b/tests/ttp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/ttp/test_base.py b/tests/ttp/test_base.py new file mode 100644 index 00000000..e9b44fe8 --- /dev/null +++ b/tests/ttp/test_base.py @@ -0,0 +1,118 @@ +"""Contract tests for :mod:`decnet.ttp.base` (E.1.3). + +Scoped to the contract surface: shape of TaggerEvent, abstractness +of Tagger, the swallow-Exception / propagate-BaseException boundary +of TolerantTagger, and the closed-by-enumeration KNOWN_SOURCE_KINDS +constant. The full E.2.4 conformance suite (hypothesis fuzz over +arbitrary exception types) lands in a later commit. +""" +from __future__ import annotations + +import asyncio +import logging + +import pytest + +from decnet.ttp.base import ( + KNOWN_SOURCE_KINDS, + Tagger, + TaggerEvent, + TolerantTagger, +) + + +def _ev(source_kind: str = "command") -> TaggerEvent: + return TaggerEvent( + source_kind=source_kind, + source_id="src1", + attacker_uuid=None, + identity_uuid="id1", + session_id=None, + decky_id=None, + payload={}, + ) + + +def test_tagger_event_is_namedtuple_and_hashable(): + ev = _ev() + assert ev.source_kind == "command" + assert ev.identity_uuid == "id1" + # NamedTuple gives instances tuple identity for downstream dedup + # paths. The payload field is a dict (unhashable by design — the + # raw event isn't meant to live in a set), but the structural + # tuple shape is what callers actually rely on. + assert tuple(ev)[0] == "command" + assert len(ev) == 7 + + +def test_tagger_is_abstract(): + with pytest.raises(TypeError): + Tagger() # type: ignore[abstract] + + +def test_tagger_subclass_without_tag_is_abstract(): + class Half(Tagger): + name = "half" + + with pytest.raises(TypeError): + Half() # type: ignore[abstract] + + +def test_known_source_kinds_is_frozenset_of_strings(): + assert isinstance(KNOWN_SOURCE_KINDS, frozenset) + assert all(isinstance(k, str) for k in KNOWN_SOURCE_KINDS) + # The contract requires at least the lifter-aligned kinds enumerated + # in the design doc; further kinds may be added but these MUST be + # present. + must_have = { + "command", "intel", "email", "canary_fingerprint", + "identity", "credential", + } + assert must_have <= KNOWN_SOURCE_KINDS + + +def test_tolerant_tagger_swallows_exception_and_returns_empty(caplog): + class Boom(TolerantTagger): + name = "boom" + HANDLES = frozenset({"command"}) + + async def _tag_impl(self, event): + raise RuntimeError("synthetic") + + caplog.set_level(logging.WARNING, logger="decnet.ttp.base") + out = asyncio.run(Boom().tag(_ev())) + assert out == [] + # WARNING — never ERROR — per the absence-is-normal doctrine. + records = [r for r in caplog.records if r.name == "decnet.ttp.base"] + assert records, "expected a log line on swallowed exception" + assert all(r.levelno == logging.WARNING for r in records) + + +@pytest.mark.parametrize( + "exc_cls", + [KeyboardInterrupt, SystemExit, asyncio.CancelledError], +) +def test_tolerant_tagger_propagates_base_exceptions(exc_cls): + class Cancel(TolerantTagger): + name = "cancel" + HANDLES = frozenset({"command"}) + + async def _tag_impl(self, event): + raise exc_cls() + + with pytest.raises(exc_cls): + asyncio.run(Cancel().tag(_ev())) + + +def test_tolerant_tagger_subclass_must_implement_tag_impl(): + class Empty(TolerantTagger): + name = "empty" + + with pytest.raises(TypeError): + Empty() # type: ignore[abstract] + + +def test_tagger_default_handles_is_empty_frozenset(): + # Misconfigured subclass that forgets HANDLES is loudly idle, + # not loudly noisy — the composite skips it entirely. + assert Tagger.HANDLES == frozenset() diff --git a/tests/ttp/test_factory.py b/tests/ttp/test_factory.py new file mode 100644 index 00000000..3185a6f0 --- /dev/null +++ b/tests/ttp/test_factory.py @@ -0,0 +1,130 @@ +"""Contract tests for :mod:`decnet.ttp.factory` (E.1.4). + +Scoped to the factory + composite dispatch contract: env var routing, +unknown-name failure, dispatch index correctness, the +KNOWN_SOURCE_KINDS WARNING/INFO bridge for unhandled events. +""" +from __future__ import annotations + +import asyncio +import logging + +import pytest + +from decnet.ttp.base import KNOWN_SOURCE_KINDS, TaggerEvent, TolerantTagger +from decnet.ttp.factory import CompositeTagger, get_tagger +from decnet.web.db.models.ttp import TTPTag + + +def _ev(source_kind: str) -> TaggerEvent: + return TaggerEvent( + source_kind=source_kind, + source_id="src1", + attacker_uuid=None, + identity_uuid="id1", + session_id=None, + decky_id=None, + payload={}, + ) + + +def test_default_returns_composite_with_empty_lifters(monkeypatch): + monkeypatch.delenv("DECNET_TTP_TAGGER_TYPE", raising=False) + t = get_tagger() + assert isinstance(t, CompositeTagger) + assert t.name == "composite" + assert t._lifters == [] + + +def test_explicit_composite(monkeypatch): + monkeypatch.setenv("DECNET_TTP_TAGGER_TYPE", "composite") + assert isinstance(get_tagger(), CompositeTagger) + + +def test_unknown_tagger_type_raises(monkeypatch): + monkeypatch.setenv("DECNET_TTP_TAGGER_TYPE", "nope") + with pytest.raises(ValueError, match="Unknown tagger"): + get_tagger() + + +class _Recorder(TolerantTagger): + """Lifter that records calls and returns a single shaped TTPTag.""" + + def __init__(self, name: str, handles: frozenset[str]) -> None: + self.name = name + self.HANDLES = handles + self.calls: list[str] = [] + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + self.calls.append(event.source_kind) + return [] + + +def test_composite_dispatch_index_is_built_from_handles(): + a = _Recorder("a", frozenset({"command", "email"})) + b = _Recorder("b", frozenset({"email", "intel"})) + c = CompositeTagger(lifters=[a, b]) + assert set(c._by_kind["command"]) == {a} + assert set(c._by_kind["email"]) == {a, b} + assert set(c._by_kind["intel"]) == {b} + + +def test_composite_only_invokes_claiming_lifters(): + a = _Recorder("a", frozenset({"command"})) + b = _Recorder("b", frozenset({"email"})) + c = CompositeTagger(lifters=[a, b]) + asyncio.run(c.tag(_ev("command"))) + assert a.calls == ["command"] + assert b.calls == [] + + +def test_composite_unhandled_known_kind_logs_warning_once(caplog): + c = CompositeTagger(lifters=[]) + # Pick any element of KNOWN_SOURCE_KINDS deterministically. + known = sorted(KNOWN_SOURCE_KINDS)[0] + caplog.set_level(logging.INFO, logger="decnet.ttp.factory") + out1 = asyncio.run(c.tag(_ev(known))) + out2 = asyncio.run(c.tag(_ev(known))) + assert out1 == [] and out2 == [] + warnings = [ + r for r in caplog.records + if r.name == "decnet.ttp.factory" and r.levelno == logging.WARNING + ] + assert len(warnings) == 1, "expected one WARNING per kind per process" + + +def test_composite_unhandled_unknown_kind_logs_info_once(caplog): + c = CompositeTagger(lifters=[]) + unknown = "definitely_not_a_real_source_kind_zzz" + assert unknown not in KNOWN_SOURCE_KINDS + caplog.set_level(logging.INFO, logger="decnet.ttp.factory") + asyncio.run(c.tag(_ev(unknown))) + asyncio.run(c.tag(_ev(unknown))) + infos = [ + r for r in caplog.records + if r.name == "decnet.ttp.factory" and r.levelno == logging.INFO + ] + warnings = [ + r for r in caplog.records + if r.name == "decnet.ttp.factory" and r.levelno == logging.WARNING + ] + assert len(infos) == 1 + assert warnings == [] + + +def test_composite_concatenates_results_from_multiple_lifters(): + class Fixed(TolerantTagger): + def __init__(self, n: int) -> None: + self.name = f"fixed{n}" + self.HANDLES = frozenset({"command"}) + self._n = n + + async def _tag_impl(self, event): + # Return a list of the right length without constructing + # real TTPTag rows — concatenation semantics are what's + # under test, not row validity. + return [object()] * self._n + + c = CompositeTagger(lifters=[Fixed(2), Fixed(3)]) + out = asyncio.run(c.tag(_ev("command"))) + assert len(out) == 5