Third and fourth TTP-tagging contract commits, plus a scoped subset
of the E.2.4 conformance tests covering the contract surface shipped
here (full hypothesis-fuzz suite still lands with E.2.4).
E.1.3 — decnet/ttp/base.py
- TaggerEvent NamedTuple: source_kind, source_id, attacker_uuid,
identity_uuid, session_id, decky_id, opaque payload.
- Tagger(ABC) with abstract async tag(); class-level name and
HANDLES: frozenset[str] (default empty so a misconfigured subclass
is loudly idle, not loudly noisy).
- TolerantTagger(Tagger): concrete tag() wraps abstract _tag_impl()
in try/except Exception (deliberately not BaseException — so
KeyboardInterrupt / SystemExit / asyncio.CancelledError propagate
and the worker can shut down cleanly). Swallowed exceptions log
at WARNING with exc_info, never ERROR — absence is the steady
state, not a bug. Subclasses override _tag_impl, never tag — the
tolerance contract is enforced in the base class, not on trust.
- KNOWN_SOURCE_KINDS: Final[frozenset[str]] enumerating every
source_kind a producer is allowed to emit. Closed-by-enumeration
at the runtime layer; the composite tagger keys its WARNING/INFO
bridge off this constant to surface the silent-drop trap from
the design doc (lines 160–195).
E.1.4 — decnet/ttp/factory.py
- get_tagger() reads DECNET_TTP_TAGGER_TYPE (default 'composite');
unknown values raise ValueError with the known-list. Mirrors
decnet.intel.factory and decnet.clustering.factory.
- _KNOWN = ('composite',). Per-lifter classes (E.1.6) are children
of the composite, not standalone tagger types.
- CompositeTagger(Tagger): pre-computes a dict[str, list[Tagger]]
dispatch index from each lifter's HANDLES; fans events out
concurrently with asyncio.gather and concatenates results.
Empty lifters=[] is the legal contract-phase state — E.1.6
wires the real lifters in.
- Unhandled-event observability: source_kind in KNOWN_SOURCE_KINDS
but no lifter claims it -> WARNING once per kind per process
(missed E.1.6 update). Unknown kind -> INFO once per kind per
process (future-feature telemetry, by design). Per-process dedup
via plain set; E.1.6 may swap in a proper rate-limiter once
production traffic shapes are known.
Tests — tests/ttp/test_base.py, tests/ttp/test_factory.py
- Tagger / TolerantTagger abstractness, missing-tag-impl rejection,
WARNING-not-ERROR log level, propagation of KeyboardInterrupt /
SystemExit / asyncio.CancelledError.
- Factory env-var routing, unknown-name ValueError, dispatch-index
correctness, only-claiming-lifter invocation, WARNING-once for
known-but-unclaimed kinds, INFO-once for unknown kinds, result
concatenation across lifters.
Mypy clean under .311/bin/mypy --ignore-missing-imports.
131 lines
4.2 KiB
Python
131 lines
4.2 KiB
Python
"""Contract tests for :mod:`decnet.ttp.factory` (E.1.4).
|
|
|
|
Scoped to the factory + composite dispatch contract: env var routing,
|
|
unknown-name failure, dispatch index correctness, the
|
|
KNOWN_SOURCE_KINDS WARNING/INFO bridge for unhandled events.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
import pytest
|
|
|
|
from decnet.ttp.base import KNOWN_SOURCE_KINDS, TaggerEvent, TolerantTagger
|
|
from decnet.ttp.factory import CompositeTagger, get_tagger
|
|
from decnet.web.db.models.ttp import TTPTag
|
|
|
|
|
|
def _ev(source_kind: str) -> TaggerEvent:
|
|
return TaggerEvent(
|
|
source_kind=source_kind,
|
|
source_id="src1",
|
|
attacker_uuid=None,
|
|
identity_uuid="id1",
|
|
session_id=None,
|
|
decky_id=None,
|
|
payload={},
|
|
)
|
|
|
|
|
|
def test_default_returns_composite_with_empty_lifters(monkeypatch):
|
|
monkeypatch.delenv("DECNET_TTP_TAGGER_TYPE", raising=False)
|
|
t = get_tagger()
|
|
assert isinstance(t, CompositeTagger)
|
|
assert t.name == "composite"
|
|
assert t._lifters == []
|
|
|
|
|
|
def test_explicit_composite(monkeypatch):
|
|
monkeypatch.setenv("DECNET_TTP_TAGGER_TYPE", "composite")
|
|
assert isinstance(get_tagger(), CompositeTagger)
|
|
|
|
|
|
def test_unknown_tagger_type_raises(monkeypatch):
|
|
monkeypatch.setenv("DECNET_TTP_TAGGER_TYPE", "nope")
|
|
with pytest.raises(ValueError, match="Unknown tagger"):
|
|
get_tagger()
|
|
|
|
|
|
class _Recorder(TolerantTagger):
|
|
"""Lifter that records calls and returns a single shaped TTPTag."""
|
|
|
|
def __init__(self, name: str, handles: frozenset[str]) -> None:
|
|
self.name = name
|
|
self.HANDLES = handles
|
|
self.calls: list[str] = []
|
|
|
|
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
|
|
self.calls.append(event.source_kind)
|
|
return []
|
|
|
|
|
|
def test_composite_dispatch_index_is_built_from_handles():
|
|
a = _Recorder("a", frozenset({"command", "email"}))
|
|
b = _Recorder("b", frozenset({"email", "intel"}))
|
|
c = CompositeTagger(lifters=[a, b])
|
|
assert set(c._by_kind["command"]) == {a}
|
|
assert set(c._by_kind["email"]) == {a, b}
|
|
assert set(c._by_kind["intel"]) == {b}
|
|
|
|
|
|
def test_composite_only_invokes_claiming_lifters():
|
|
a = _Recorder("a", frozenset({"command"}))
|
|
b = _Recorder("b", frozenset({"email"}))
|
|
c = CompositeTagger(lifters=[a, b])
|
|
asyncio.run(c.tag(_ev("command")))
|
|
assert a.calls == ["command"]
|
|
assert b.calls == []
|
|
|
|
|
|
def test_composite_unhandled_known_kind_logs_warning_once(caplog):
|
|
c = CompositeTagger(lifters=[])
|
|
# Pick any element of KNOWN_SOURCE_KINDS deterministically.
|
|
known = sorted(KNOWN_SOURCE_KINDS)[0]
|
|
caplog.set_level(logging.INFO, logger="decnet.ttp.factory")
|
|
out1 = asyncio.run(c.tag(_ev(known)))
|
|
out2 = asyncio.run(c.tag(_ev(known)))
|
|
assert out1 == [] and out2 == []
|
|
warnings = [
|
|
r for r in caplog.records
|
|
if r.name == "decnet.ttp.factory" and r.levelno == logging.WARNING
|
|
]
|
|
assert len(warnings) == 1, "expected one WARNING per kind per process"
|
|
|
|
|
|
def test_composite_unhandled_unknown_kind_logs_info_once(caplog):
|
|
c = CompositeTagger(lifters=[])
|
|
unknown = "definitely_not_a_real_source_kind_zzz"
|
|
assert unknown not in KNOWN_SOURCE_KINDS
|
|
caplog.set_level(logging.INFO, logger="decnet.ttp.factory")
|
|
asyncio.run(c.tag(_ev(unknown)))
|
|
asyncio.run(c.tag(_ev(unknown)))
|
|
infos = [
|
|
r for r in caplog.records
|
|
if r.name == "decnet.ttp.factory" and r.levelno == logging.INFO
|
|
]
|
|
warnings = [
|
|
r for r in caplog.records
|
|
if r.name == "decnet.ttp.factory" and r.levelno == logging.WARNING
|
|
]
|
|
assert len(infos) == 1
|
|
assert warnings == []
|
|
|
|
|
|
def test_composite_concatenates_results_from_multiple_lifters():
|
|
class Fixed(TolerantTagger):
|
|
def __init__(self, n: int) -> None:
|
|
self.name = f"fixed{n}"
|
|
self.HANDLES = frozenset({"command"})
|
|
self._n = n
|
|
|
|
async def _tag_impl(self, event):
|
|
# Return a list of the right length without constructing
|
|
# real TTPTag rows — concatenation semantics are what's
|
|
# under test, not row validity.
|
|
return [object()] * self._n
|
|
|
|
c = CompositeTagger(lifters=[Fixed(2), Fixed(3)])
|
|
out = asyncio.run(c.tag(_ev("command")))
|
|
assert len(out) == 5
|