Files
DECNET/decnet/ttp/base.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

186 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Tagger ABC — input shape, base class, tolerant mixin.
Contract step E.1.3 of ``development/TTP_TAGGING.md``. Defines the type
surface every lifter (E.1.6), the rule engine (E.1.5), the composite
tagger (E.1.4) and the worker (E.1.7) compile against. No behavior
beyond the tolerant-wrapper boundary lives here.
The design doc's "schema is forward-compat, code is not" trap (lines
160195) is mitigated *here*: :data:`KNOWN_SOURCE_KINDS` enumerates
every ``source_kind`` a producer is allowed to emit. Adding a new
producer means adding its kind to this set in the *same commit* that
ships the producer; the composite tagger's WARNING/INFO bridge in
:mod:`decnet.ttp.factory` keys off this constant to surface silent
drops.
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Any, Final, NamedTuple, Protocol, runtime_checkable
from decnet.web.db.models.ttp import EVIDENCE_SCHEMA, TTPTag
_log = logging.getLogger(__name__)
# Every ``source_kind`` string a DECNET producer is allowed to emit.
# Closed-by-enumeration at the runtime layer even though the storage
# column is open. Producers MUST add their kind here in the same
# commit that starts emitting — see the design doc lines 160195 for
# the operational contract and the rationale.
KNOWN_SOURCE_KINDS: Final[frozenset[str]] = frozenset({
"command",
"intel",
"email",
"canary_fingerprint",
"identity",
"credential",
"auth_attempt",
"payload",
"session",
"http_request",
"http_fingerprint",
"ipv6_leak",
})
class TaggerEvent(NamedTuple):
"""Input shape for every tagger.
NamedTuple (not dataclass) so instances are hashable — downstream
dedup paths can put them in sets without a custom ``__hash__``.
``payload`` is opaque on purpose: each ``source_kind`` carries a
different shape, and the per-lifter contract owns the parse.
"""
source_kind: str
source_id: str
attacker_uuid: str | None
identity_uuid: str | None
session_id: str | None
decky_id: str | None
payload: dict[str, Any]
class Tagger(ABC):
"""Abstract tagger.
Every concrete tagger sets :attr:`name` and :attr:`HANDLES` at
class level. The composite tagger reads ``HANDLES`` to build its
dispatch index — a subclass that forgets to override it gets the
empty default and is therefore never invoked, which surfaces as a
test failure rather than a silent fan-out.
"""
#: Short tag used in logs and the ``DECNET_TTP_TAGGER_TYPE`` env
#: var. Subclasses override.
name: str = ""
#: ``source_kind`` strings this tagger consumes. Empty by default
#: so a misconfigured subclass is loudly idle, not loudly noisy.
HANDLES: frozenset[str] = frozenset()
@abstractmethod
async def tag(self, event: TaggerEvent) -> list[TTPTag]:
"""Produce zero or more tags for ``event``.
Implementations of :class:`Tagger` directly take responsibility
for their own error handling. Lifters that consume
sibling-worker output inherit from :class:`TolerantTagger`
instead, which enforces the "absence is not an error" contract
in the base class rather than on trust.
"""
class TolerantTagger(Tagger):
"""Tagger mixin that converts uncaught exceptions to ``[]``.
Every per-source lifter inherits from this. The rationale is
architectural, not stylistic: TTP tagging consumes outputs from
sibling workers (intel, behavioral, identity, …) that may not
have run yet, may have failed, or may simply have nothing to say
about a given event. "Absence" is the steady state, not the
exception, so a lifter blowing up on a missing join must not
cascade into a worker crash.
Subclasses override :meth:`_tag_impl`, never :meth:`tag` — the
tolerance contract is *enforced in the base class*, not on trust.
"""
async def tag(self, event: TaggerEvent) -> list[TTPTag]:
try:
results = await self._tag_impl(event)
# Validate evidence shape: unknown keys are a programmer error,
# not a runtime absence. Raise TypeError so the caller sees the
# bug rather than silently dropping the tag.
td = EVIDENCE_SCHEMA.get(event.source_kind)
if td is not None:
declared = (
getattr(td, "__required_keys__", frozenset())
| getattr(td, "__optional_keys__", frozenset())
)
for tag in results:
ev = getattr(tag, "evidence", None)
if ev is None:
continue
unknown = set(ev) - declared
if unknown:
raise TypeError(
f"lifter {self.name!r} emitted evidence keys "
f"{unknown!r} not declared in "
f"{td.__name__} for source_kind={event.source_kind!r}"
)
return results
except TypeError:
# Programmer error — bad evidence shape or type mismatch.
# Propagate; do NOT swallow.
raise
except Exception:
# ``Exception`` deliberately, not ``BaseException``:
# ``KeyboardInterrupt`` / ``SystemExit`` /
# ``asyncio.CancelledError`` propagate so the worker can
# shut down cleanly. E.2.4 conformance asserts this.
# WARNING, not ERROR: a sibling-worker absence is normal
# operation, not a bug. ERROR would page someone for the
# steady state.
_log.warning(
"tagger %r swallowed exception on source_kind=%r",
self.name,
event.source_kind,
exc_info=True,
)
return []
@abstractmethod
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
"""Real tagging logic — subclasses override this, not :meth:`tag`."""
@runtime_checkable
class WatchableTagger(Protocol):
"""Structural protocol for taggers that hot-reload from a RuleStore.
Each per-source lifter (and :class:`RuleEngineTagger`) holds its
own :class:`~decnet.ttp.impl._rule_index.RuleIndex` and exposes an
``async def watch_store()`` coroutine that loads the initial
corpus and drains store change events forever. The worker
(E.3.14) starts one task per ``WatchableTagger`` so dispatch
indexes hydrate at startup; without this the indexes stay empty
and no rule fires. ``runtime_checkable`` so the worker can fan
out via :func:`isinstance` without leaking the protocol into the
abstract :class:`Tagger` base.
"""
async def watch_store(self) -> None: ...
__all__ = [
"KNOWN_SOURCE_KINDS",
"TaggerEvent",
"Tagger",
"TolerantTagger",
"WatchableTagger",
]