diff --git a/decnet/ttp/impl/behavioral_lifter.py b/decnet/ttp/impl/behavioral_lifter.py new file mode 100644 index 00000000..81e951cd --- /dev/null +++ b/decnet/ttp/impl/behavioral_lifter.py @@ -0,0 +1,26 @@ +"""Behavioral lifter — derives techniques from cross-event session signal. + +Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. +Implementation phase reads ``AttackerBehavior`` rows assembled by the +profiler and emits techniques the rule engine cannot see (timing, +ordering, command-graph shape). Inherits :class:`TolerantTagger` so a +missing ``AttackerBehavior`` join silently returns ``[]`` — sibling +worker absence is the steady state, not an error. +""" +from __future__ import annotations + +from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.web.db.models.ttp import TTPTag + + +class BehavioralLifter(TolerantTagger): + name = "behavioral" + #: Session-level events triggering a behavior-graph lookup. The + #: lifter reads ``AttackerBehavior`` keyed on the session. + HANDLES = frozenset({"session"}) + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +__all__ = ["BehavioralLifter"] diff --git a/decnet/ttp/impl/canary_fingerprint_lifter.py b/decnet/ttp/impl/canary_fingerprint_lifter.py new file mode 100644 index 00000000..bf38fd8c --- /dev/null +++ b/decnet/ttp/impl/canary_fingerprint_lifter.py @@ -0,0 +1,25 @@ +"""Canary fingerprint lifter — browser-payload derived technique tagger. + +Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. +Implementation phase reads canary-payload fingerprints (navigator +properties, canvas hashes, proxy/VPN leakage signatures) and emits +Discovery / Defense-Evasion techniques. The evidence shape is pinned +to :class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence` +(``metric`` + ``matched_signature``) — raw fingerprint blobs never +land in evidence. +""" +from __future__ import annotations + +from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.web.db.models.ttp import TTPTag + + +class CanaryFingerprintLifter(TolerantTagger): + name = "canary_fingerprint" + HANDLES = frozenset({"canary_fingerprint"}) + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +__all__ = ["CanaryFingerprintLifter"] diff --git a/decnet/ttp/impl/credential_lifter.py b/decnet/ttp/impl/credential_lifter.py new file mode 100644 index 00000000..81f8509c --- /dev/null +++ b/decnet/ttp/impl/credential_lifter.py @@ -0,0 +1,24 @@ +"""Credential lifter — credential-capture / reuse technique tagger. + +Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. +Implementation phase reads ``Credential`` and ``CredentialReuse`` rows +populated by the reuse-correlator and emits Credential-Access / +Lateral-Movement techniques. Tolerates absence of the reuse-correlator +output by inheriting :class:`TolerantTagger` — the correlator is a +sibling worker, not a hard dependency. +""" +from __future__ import annotations + +from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.web.db.models.ttp import TTPTag + + +class CredentialLifter(TolerantTagger): + name = "credential" + HANDLES = frozenset({"credential"}) + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +__all__ = ["CredentialLifter"] diff --git a/decnet/ttp/impl/email_lifter.py b/decnet/ttp/impl/email_lifter.py new file mode 100644 index 00000000..6fc2af31 --- /dev/null +++ b/decnet/ttp/impl/email_lifter.py @@ -0,0 +1,25 @@ +"""Email lifter — SMTP message-level technique tagger. + +Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. +Implementation phase parses message-level SMTP signal (headers, +attachment hashes, body sha) and emits Initial-Access / Phishing +techniques. PII discipline (design doc "Hard parts §6") is enforced at +the *type* layer: :class:`~decnet.web.db.models.ttp.EmailEvidence` +intentionally has no fields for raw rcpt addresses or body bytes, so +this lifter cannot leak them even by accident. +""" +from __future__ import annotations + +from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.web.db.models.ttp import TTPTag + + +class EmailLifter(TolerantTagger): + name = "email" + HANDLES = frozenset({"email"}) + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +__all__ = ["EmailLifter"] diff --git a/decnet/ttp/impl/identity_lifter.py b/decnet/ttp/impl/identity_lifter.py new file mode 100644 index 00000000..09f8ff21 --- /dev/null +++ b/decnet/ttp/impl/identity_lifter.py @@ -0,0 +1,26 @@ +"""Identity lifter — cross-attacker identity-rollup tagger. + +Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. +Implementation phase reads identity-formation events (the clusterer +publishing ``identity.formed``) and emits techniques that are only +visible at the identity scope, never per-attacker — for example, +infrastructure rotation or credential reuse across IPs that were +clustered into one identity. Tags carry ``identity_uuid`` and a NULL +``attacker_uuid`` per the design doc's "identity rollup" worked +example. +""" +from __future__ import annotations + +from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.web.db.models.ttp import TTPTag + + +class IdentityLifter(TolerantTagger): + name = "identity" + HANDLES = frozenset({"identity"}) + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +__all__ = ["IdentityLifter"] diff --git a/decnet/ttp/impl/intel_lifter.py b/decnet/ttp/impl/intel_lifter.py new file mode 100644 index 00000000..84c0e103 --- /dev/null +++ b/decnet/ttp/impl/intel_lifter.py @@ -0,0 +1,30 @@ +"""Intel lifter — opportunistic third-party verdict translator. + +Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. +Implementation phase reads ``AttackerIntel`` rows and translates +provider verdicts (AbuseIPDB categories, GreyNoise classification, +Feodo / ThreatFox membership) into ATT&CK technique tags with +confidence scaled by per-provider reliability. + +The decoupling rule (design doc §"Decoupling: bus-driven, never a +hard dependency") is enforced statically by E.2.7: this module MUST +NOT import from ``decnet.intel.{abuseipdb,greynoise,feodo,threatfox}``. +Only ``decnet.web.db.models`` symbols are permitted. +""" +from __future__ import annotations + +from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.web.db.models.ttp import TTPTag + + +class IntelLifter(TolerantTagger): + name = "intel" + #: ``intel`` events are bus-published when an ``AttackerIntel`` row + #: is upserted; the lifter treats absence as the steady state. + HANDLES = frozenset({"intel"}) + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +__all__ = ["IntelLifter"] diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 48eb0c85..0d437cb5 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2306,6 +2306,8 @@ Contracts ship in this order, one commit per step: **E.1.6 — Per-lifter contracts** (one file each, all empty bodies) +**Status:** ✅ done. + - `decnet/ttp/impl/behavioral_lifter.py` — `BehavioralLifter(TolerantTagger)`. - `decnet/ttp/impl/intel_lifter.py` — `IntelLifter(TolerantTagger)`. - `decnet/ttp/impl/email_lifter.py` — `EmailLifter(TolerantTagger)`. diff --git a/tests/ttp/test_lifters.py b/tests/ttp/test_lifters.py new file mode 100644 index 00000000..a97e525d --- /dev/null +++ b/tests/ttp/test_lifters.py @@ -0,0 +1,90 @@ +"""Contract tests for the six per-source lifters (E.1.6). + +Scoped to the contract surface: each lifter is a :class:`TolerantTagger` +subclass with a non-empty ``HANDLES`` ⊆ :data:`KNOWN_SOURCE_KINDS`, +unique ``name``, and an empty-list return from ``_tag_impl``. Behavioral +absence-tolerance assertions from E.2.6 (per-provider null patterns, +session-without-AttackerBehavior, etc.) are present but xfail-strict +pending E.3. +""" +from __future__ import annotations + +import asyncio + +import pytest + +from decnet.ttp.base import KNOWN_SOURCE_KINDS, TaggerEvent, TolerantTagger +from decnet.ttp.impl.behavioral_lifter import BehavioralLifter +from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter +from decnet.ttp.impl.credential_lifter import CredentialLifter +from decnet.ttp.impl.email_lifter import EmailLifter +from decnet.ttp.impl.identity_lifter import IdentityLifter +from decnet.ttp.impl.intel_lifter import IntelLifter + +ALL_LIFTERS = [ + BehavioralLifter, + IntelLifter, + EmailLifter, + CanaryFingerprintLifter, + IdentityLifter, + CredentialLifter, +] + + +def _ev(source_kind: str) -> TaggerEvent: + return TaggerEvent( + source_kind=source_kind, + source_id="src1", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload={}, + ) + + +@pytest.mark.parametrize("cls", ALL_LIFTERS) +def test_lifter_subclasses_tolerant_tagger(cls): + assert issubclass(cls, TolerantTagger) + + +@pytest.mark.parametrize("cls", ALL_LIFTERS) +def test_lifter_handles_is_non_empty_frozenset_subset_of_known(cls): + assert isinstance(cls.HANDLES, frozenset) + assert cls.HANDLES, f"{cls.__name__}.HANDLES must not be empty" + assert cls.HANDLES <= KNOWN_SOURCE_KINDS, ( + f"{cls.__name__}.HANDLES contains kinds not in KNOWN_SOURCE_KINDS" + ) + + +def test_lifter_names_are_unique_and_non_empty(): + names = [cls.name for cls in ALL_LIFTERS] + assert all(n for n in names), "every lifter needs a non-empty name" + assert len(set(names)) == len(names), "lifter names must be unique" + + +@pytest.mark.parametrize("cls", ALL_LIFTERS) +def test_lifter_tag_returns_empty_list_for_handled_event(cls): + lifter = cls() + kind = next(iter(cls.HANDLES)) + out = asyncio.run(lifter.tag(_ev(kind))) + assert out == [] + + +@pytest.mark.parametrize("cls", ALL_LIFTERS) +def test_lifter_instantiable(cls): + # No abstract methods left — concrete subclass must be constructible. + cls() + + +# ── E.2.6 deferred absence-tolerance behavior ────────────────────── + + +@pytest.mark.xfail(strict=True, reason="impl phase E.3 — IntelLifter null patterns") +def test_e26_intel_lifter_partial_provider_nulls(): + raise AssertionError("not yet implemented") + + +@pytest.mark.xfail(strict=True, reason="impl phase E.3 — BehavioralLifter empty join") +def test_e26_behavioral_lifter_no_attacker_behavior_row(): + raise AssertionError("not yet implemented")