diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index b54f34ec..65bd1bad 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -106,13 +106,18 @@ class CompositeTagger(Tagger): def get_tagger() -> Tagger: """Return the configured tagger instance. - Lazy package layout: the composite is constructed with an empty - lifter list during the contract phase. E.1.6 will replace this - with explicit lifter wiring; callers don't change. + Synchronous construction: each shipped lifter takes the shared + :class:`RuleStore` reference, but the per-lifter watch loops are + started by the worker (E.3.14), not by this factory. Tests that + instantiate via this path get an idle composite — exercising the + watch loop is the worker's contract. """ name = os.environ.get("DECNET_TTP_TAGGER_TYPE", _DEFAULT).strip().lower() if name == "composite": - return CompositeTagger(lifters=[]) + from decnet.ttp.impl.behavioral_lifter import BehavioralLifter + from decnet.ttp.store.factory import get_rule_store + store = get_rule_store() + return CompositeTagger(lifters=[BehavioralLifter(store)]) raise ValueError( f"Unknown tagger: {name!r}. Known: {_KNOWN}" ) diff --git a/decnet/ttp/impl/_emit.py b/decnet/ttp/impl/_emit.py new file mode 100644 index 00000000..ac17e5f9 --- /dev/null +++ b/decnet/ttp/impl/_emit.py @@ -0,0 +1,68 @@ +"""Shared TTPTag emission helper used by per-source lifters. + +The rule engine assembles a tag inline inside ``_evaluate_rules``; the +four lifters (E.3.9–E.3.13) emit tags from the same shape but never +go through the engine's regex matcher. Pulling the assembly into one +helper keeps the ``compute_tag_uuid`` call signature, the +``apply_ceiling`` clamp, and the ``attack_release`` stamping +single-sourced. +""" +from __future__ import annotations + +from typing import Any + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl._state import apply_ceiling +from decnet.ttp.impl.rule_engine import _ATTACK_RELEASE, CompiledRule +from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid + + +def emit_tags( + rule: CompiledRule, + event: TaggerEvent, + evidence: dict[str, Any], +) -> list[TTPTag]: + """Materialise one TTPTag per ``rule.emits`` entry. + + Caller is responsible for having checked ``is_active(rule.state)`` + and the per-rule predicate before calling. ``evidence`` is the + fully-assembled evidence dict the lifter wants on each emitted + tag — caller honours ``rule.evidence_fields`` and any per-rule + PII discipline (e.g. EmailEvidence) before passing it in. + + The tag UUID is deterministic over (source_kind, source_id, rule_id, + rule_version, technique_id, sub_technique_id). Replay-safe: a worker + re-processing the same source events writes idempotent rows. + """ + out: list[TTPTag] = [] + for technique_id, sub_technique_id, tactic, base_conf in rule.emits: + confidence = apply_ceiling(base_conf, rule.state) + tag_uuid = compute_tag_uuid( + source_kind=event.source_kind, + source_id=event.source_id, + rule_id=rule.rule_id, + rule_version=rule.rule_version, + technique_id=technique_id, + sub_technique_id=sub_technique_id, + ) + out.append(TTPTag( + uuid=tag_uuid, + source_kind=event.source_kind, + source_id=event.source_id, + attacker_uuid=event.attacker_uuid, + identity_uuid=event.identity_uuid, + session_id=event.session_id, + decky_id=event.decky_id, + tactic=tactic, + technique_id=technique_id, + sub_technique_id=sub_technique_id, + confidence=confidence, + rule_id=rule.rule_id, + rule_version=rule.rule_version, + evidence=dict(evidence), + attack_release=_ATTACK_RELEASE, + )) + return out + + +__all__ = ["emit_tags"] diff --git a/decnet/ttp/impl/behavioral_lifter.py b/decnet/ttp/impl/behavioral_lifter.py index 81e951cd..4a7f2e07 100644 --- a/decnet/ttp/impl/behavioral_lifter.py +++ b/decnet/ttp/impl/behavioral_lifter.py @@ -1,26 +1,275 @@ """Behavioral lifter — derives techniques from cross-event session signal. -Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. -Implementation phase reads ``AttackerBehavior`` rows assembled by the -profiler and emits techniques the rule engine cannot see (timing, -ordering, command-graph shape). Inherits :class:`TolerantTagger` so a -missing ``AttackerBehavior`` join silently returns ``[]`` — sibling -worker absence is the steady state, not an error. +E.3.9 of ``development/TTP_TAGGING.md``. Owns YAML rules R0031–R0040 by +``match.kind`` prefix ``lifter:behavioral_``. Each rule's predicate runs +against the upstream-pre-shaped session aggregate carried in +``TaggerEvent.payload``; the lifter never reaches into the database +directly. Sibling-worker absence (no ``AttackerBehavior`` row, no +session aggregate) yields ``[]`` per the +:class:`~decnet.ttp.base.TolerantTagger` contract. + +The lifter holds its own :class:`~decnet.ttp.impl._rule_index.RuleIndex` +filtered by ``OWNED_PREFIX`` so operator state changes (disable / clip +/ TTL) reach lifter-bound rules through the same atomic-swap path the +engine uses — see TTP_TAGGING.md §"Atomic swap". """ from __future__ import annotations +import re +from collections.abc import Callable +from typing import Any, Final + from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import is_active +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleStore from decnet.web.db.models.ttp import TTPTag +# A predicate returns the supplemental evidence dict on a fire (may be +# empty), or ``None`` when the rule does not fire on this event. +Predicate = Callable[[dict[str, Any], dict[str, Any]], "dict[str, Any] | None"] + + +# ── Per-rule predicates ───────────────────────────────────────────── + + +def _p_beaconing(spec: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any] | None: + interval = payload.get("beacon_interval_s") + jitter = payload.get("beacon_jitter_pct") + if not isinstance(interval, (int, float)) or not isinstance(jitter, (int, float)): + return None + if interval < float(spec.get("min_interval_s", 0)): + return None + if jitter > float(spec.get("max_jitter_pct", 1.0)): + return None + return {} + + +def _p_data_destruction( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + haystack = payload.get("op_text") or payload.get("command_text") or "" + if not isinstance(haystack, str): + return None + patterns = spec.get("patterns", []) + if not isinstance(patterns, list): + return None + for pat in patterns: + if not isinstance(pat, str): + continue + try: + if re.search(pat, haystack): + return {"matched_op": pat} + except re.error: + continue + return None + + +_BTC_RE = re.compile(r"\b(?:[13][a-km-zA-HJ-NP-Z1-9]{25,34}|bc1[ac-hj-np-z02-9]{11,71})\b") +_XMR_RE = re.compile(r"\b4[0-9AB][1-9A-HJ-NP-Za-km-z]{93}\b") + + +def _p_ransom_note( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + text = payload.get("body_text") or payload.get("note_text") or "" + if not isinstance(text, str) or not text: + return None + keywords = spec.get("payment_keywords", []) + matched_keywords = [ + k for k in keywords + if isinstance(k, str) and k.lower() in text.lower() + ] + if not matched_keywords: + return None + if spec.get("require_btc_or_xmr"): + btc = _BTC_RE.search(text) + xmr = _XMR_RE.search(text) + if not (btc or xmr): + return None + addr = (btc or xmr) + return { + "btc_address": addr.group(0) if addr else "", + "matched_keywords": matched_keywords, + } + return {"matched_keywords": matched_keywords} + + +def _p_exfil_over_web( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + bytes_out = payload.get("bytes_out") + request_count = payload.get("request_count") + min_bytes = float(spec.get("min_payload_bytes", 0)) + min_reqs = int(spec.get("request_threshold", 0)) + bytes_hit = isinstance(bytes_out, (int, float)) and bytes_out >= min_bytes + req_hit = isinstance(request_count, int) and request_count >= min_reqs + if not (bytes_hit or req_hit): + return None + return {} + + +def _p_db_mass_read( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + rows = payload.get("rows_read") + nbytes = payload.get("bytes_read") + min_rows = int(spec.get("min_rows", 0)) + min_bytes = int(spec.get("min_bytes", 0)) + rows_hit = isinstance(rows, int) and rows >= min_rows + bytes_hit = isinstance(nbytes, (int, float)) and nbytes >= min_bytes + if not (rows_hit or bytes_hit): + return None + return {} + + +def _path_match( + spec: dict[str, Any], payload: dict[str, Any], key: str = "paths", +) -> dict[str, Any] | None: + path = ( + payload.get("matched_path") + or payload.get("request_path") + or payload.get("path") + or "" + ) + if not isinstance(path, str) or not path: + return None + patterns = spec.get(key, []) + if not isinstance(patterns, list): + return None + for pat in patterns: + if not isinstance(pat, str): + continue + try: + if re.search(pat, path): + return {"matched_path": path} + except re.error: + continue + return None + + +def _p_credentials_in_files( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + return _path_match(spec, payload, key="paths") + + +def _p_k8s_sa_token( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + return _path_match(spec, payload, key="paths") + + +def _p_docker_escape( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + observed = payload.get("signals") + if not isinstance(observed, list): + return None + wanted = spec.get("signals", []) + if not isinstance(wanted, list): + return None + observed_set = {s for s in observed if isinstance(s, str)} + for sig in wanted: + if isinstance(sig, str) and sig in observed_set: + return {"matched_signal": sig} + return None + + +def _p_llmnr_poisoning( + _spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + if payload.get("llmnr_poisoned") is True: + return {} + if isinstance(payload.get("llmnr_poison_count"), int) and payload["llmnr_poison_count"] >= 1: + return {} + return None + + +def _p_tftp_router_config( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + fname = payload.get("tftp_filename") or "" + if not isinstance(fname, str) or not fname: + return None + patterns = spec.get("filename_patterns", []) + if not isinstance(patterns, list): + return None + for pat in patterns: + if not isinstance(pat, str): + continue + try: + if re.search(pat, fname): + return {} + except re.error: + continue + return None + + +_PREDICATES: Final[dict[str, Predicate]] = { + "lifter:behavioral_beaconing": _p_beaconing, + "lifter:behavioral_data_destruction": _p_data_destruction, + "lifter:behavioral_ransom_note": _p_ransom_note, + "lifter:behavioral_exfil_over_web": _p_exfil_over_web, + "lifter:behavioral_db_mass_read": _p_db_mass_read, + "lifter:behavioral_credentials_in_files": _p_credentials_in_files, + "lifter:behavioral_k8s_sa_token": _p_k8s_sa_token, + "lifter:behavioral_docker_escape": _p_docker_escape, + "lifter:behavioral_llmnr_poisoning": _p_llmnr_poisoning, + "lifter:behavioral_tftp_router_config": _p_tftp_router_config, +} + + +# ── Lifter ────────────────────────────────────────────────────────── + + class BehavioralLifter(TolerantTagger): name = "behavioral" - #: Session-level events triggering a behavior-graph lookup. The - #: lifter reads ``AttackerBehavior`` keyed on the session. - HANDLES = frozenset({"session"}) + #: BehavioralLifter consumes session-rolled events plus a few cross- + #: cutting source kinds (``email`` for R0033 ransom-note pattern, + #: ``http_request`` for R0036/R0037 path-match rules). The set + #: matches the union of ``applies_to`` across R0031–R0040. + HANDLES = frozenset({"session", "email", "http_request"}) + OWNED_PREFIX: Final[str] = "lifter:behavioral_" + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX) + + async def watch_store(self) -> None: + """Hydrate + drain rule changes for the rules this lifter owns.""" + await self._index.watch(self._store, predicate=self._owns) async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: - return [] + out: list[TTPTag] = [] + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + kind = rule.match_spec.get("kind", "") + handler = _PREDICATES.get(kind) + if handler is None: + continue + extra = handler(rule.match_spec, event.payload) + if extra is None: + continue + evidence: dict[str, Any] = { + field: event.payload.get(field) + for field in rule.evidence_fields + if field in event.payload + } + evidence.update(extra) + out.extend(emit_tags(rule, event, evidence)) + return out __all__ = ["BehavioralLifter"] diff --git a/rules/ttp/R0032.yaml b/rules/ttp/R0032.yaml index b9c9d4b1..41ce7aa0 100644 --- a/rules/ttp/R0032.yaml +++ b/rules/ttp/R0032.yaml @@ -11,10 +11,10 @@ match: kind: lifter:behavioral_data_destruction patterns: - 'FLUSHALL' - - 'DROP\\s+DATABASE' - - 'TRUNCATE\\s+TABLE' - - 'dropDatabase\\(\\)' - - 'DELETE\\s+/\\_all' + - 'DROP\s+DATABASE' + - 'TRUNCATE\s+TABLE' + - 'dropDatabase\(\)' + - 'DELETE\s+/\_all' emits: - tactic: TA0040 technique_id: T1485 diff --git a/rules/ttp/R0036.yaml b/rules/ttp/R0036.yaml index a4254e02..0a2b8c89 100644 --- a/rules/ttp/R0036.yaml +++ b/rules/ttp/R0036.yaml @@ -11,11 +11,11 @@ applies_to: match: kind: lifter:behavioral_credentials_in_files paths: - - '\\.env' - - '\\.git/config' - - '\\.aws/credentials' - - '\\.ssh/id_rsa' - - 'wp-config\\.php' + - '\.env' + - '\.git/config' + - '\.aws/credentials' + - '\.ssh/id_rsa' + - 'wp-config\.php' emits: - tactic: TA0006 technique_id: T1552 diff --git a/rules/ttp/R0037.yaml b/rules/ttp/R0037.yaml index df83e282..c5823d29 100644 --- a/rules/ttp/R0037.yaml +++ b/rules/ttp/R0037.yaml @@ -11,7 +11,7 @@ match: kind: lifter:behavioral_k8s_sa_token paths: - '/api/v1/namespaces/[^/]+/secrets' - - '/var/run/secrets/kubernetes\\.io/serviceaccount' + - '/var/run/secrets/kubernetes\.io/serviceaccount' emits: - tactic: TA0006 technique_id: T1552 diff --git a/rules/ttp/R0040.yaml b/rules/ttp/R0040.yaml index 2d85c4a7..07295421 100644 --- a/rules/ttp/R0040.yaml +++ b/rules/ttp/R0040.yaml @@ -10,7 +10,7 @@ match: kind: lifter:behavioral_tftp_router_config filename_patterns: - '.*-confg$' - - '.*\\.cfg$' + - '.*\.cfg$' - 'startup-config' - 'running-config' emits: diff --git a/tests/ttp/_stub_store.py b/tests/ttp/_stub_store.py new file mode 100644 index 00000000..c62000f7 --- /dev/null +++ b/tests/ttp/_stub_store.py @@ -0,0 +1,48 @@ +"""Shared stub :class:`RuleStore` for lifter unit tests. + +Tests that exercise :class:`BehavioralLifter` / :class:`IntelLifter` / +:class:`CanaryFingerprintLifter` / :class:`EmailLifter` need a store +reference at construction. Most don't drive the watch loop — they +inject rules into the lifter's :class:`RuleIndex` directly. This stub +provides just enough of the ABC to satisfy construction. +""" +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any + +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleChange, RuleState, RuleStore + + +class StubRuleStore(RuleStore): + """In-memory store with optional preloaded compiled rules.""" + + def __init__( + self, + compiled: list[CompiledRule] | None = None, + changes: list[RuleChange] | None = None, + ) -> None: + self._compiled = list(compiled or []) + self._changes = list(changes or []) + + async def load_compiled(self) -> list[CompiledRule]: + return list(self._compiled) + + async def get_state(self, _rule_id: str) -> RuleState: + return RuleState() + + async def set_state(self, *_a: Any, **_kw: Any) -> None: + return None + + def subscribe_changes(self) -> AsyncIterator[RuleChange]: + changes = list(self._changes) + + async def _gen() -> AsyncIterator[RuleChange]: + for change in changes: + yield change + + return _gen() + + +__all__ = ["StubRuleStore"] diff --git a/tests/ttp/rule_precision/corpus/seed_behavioral.jsonl b/tests/ttp/rule_precision/corpus/seed_behavioral.jsonl index be5eb63a..cff5fc92 100644 --- a/tests/ttp/rule_precision/corpus/seed_behavioral.jsonl +++ b/tests/ttp/rule_precision/corpus/seed_behavioral.jsonl @@ -1,2 +1,11 @@ {"source_kind": "session", "payload": {"beacon_interval_s": 60, "beacon_jitter_pct": 0.05}, "expected_rule_ids": ["R0031"], "label": "low_jitter_beacon"} {"source_kind": "session", "payload": {"beacon_interval_s": 0, "beacon_jitter_pct": 0}, "expected_rule_ids": [], "label": "negative_no_beacon"} +{"source_kind": "session", "payload": {"command_text": "FLUSHALL", "op_text": "FLUSHALL"}, "expected_rule_ids": ["R0032"], "label": "redis_flushall"} +{"source_kind": "session", "payload": {"body_text": "send 0.5 BTC to 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa to decrypt your files"}, "expected_rule_ids": ["R0033"], "label": "ransom_btc"} +{"source_kind": "session", "payload": {"bytes_out": 5000000, "request_count": 200, "target_host": "exfil.example"}, "expected_rule_ids": ["R0034"], "label": "web_exfil_burst"} +{"source_kind": "session", "payload": {"rows_read": 50000, "bytes_read": 0, "service": "mysql"}, "expected_rule_ids": ["R0035"], "label": "db_mass_read_rows"} +{"source_kind": "http_request", "payload": {"request_path": "/var/www/html/.env"}, "expected_rule_ids": ["R0036"], "label": "creds_env_read"} +{"source_kind": "http_request", "payload": {"request_path": "/api/v1/namespaces/default/secrets"}, "expected_rule_ids": ["R0037"], "label": "k8s_secrets_list"} +{"source_kind": "session", "payload": {"signals": ["privileged:true", "image:nginx"], "container_image": "nginx"}, "expected_rule_ids": ["R0038"], "label": "docker_privileged_create"} +{"source_kind": "session", "payload": {"llmnr_poisoned": true, "victim_host": "client01"}, "expected_rule_ids": ["R0039"], "label": "llmnr_responder"} +{"source_kind": "session", "payload": {"tftp_filename": "router-startup-config", "source_host": "10.0.0.5"}, "expected_rule_ids": ["R0040"], "label": "tftp_router_cfg"} diff --git a/tests/ttp/rule_precision/test_behavioral_rules.py b/tests/ttp/rule_precision/test_behavioral_rules.py index 1ce6cb24..68fcf792 100644 --- a/tests/ttp/rule_precision/test_behavioral_rules.py +++ b/tests/ttp/rule_precision/test_behavioral_rules.py @@ -1,32 +1,37 @@ """R0031-R0040 — behavioral / cross-event cohort. -Every rule here is consumed by the BehavioralLifter (or an -identity-rollup variant) at E.3.9. The v0 :class:`RuleEngine` has no -counter / aggregator — it can only regex over a single event -payload — so these rules cannot fire from the engine alone. Their -``match.kind`` keys (``lifter:beaconing`` etc.) are inert to the -regex matcher by design. +Every rule here is consumed by the :class:`BehavioralLifter` (E.3.9). +The v0 :class:`RuleEngine` has no counter / aggregator — it can only +regex over a single event payload — so these rules cannot fire from +the engine alone. Their ``match.kind`` prefix ``lifter:behavioral_`` +is inert to the regex matcher by design. This file asserts: * every R003N has a YAML on disk that compiles * the v0 engine NEVER fires any of them (regression guard against a YAML drifting into a regex match) -* the precision target test is :pyfunc:`pytest.xfail`-gated until - the BehavioralLifter ships, matching the CDD pattern at - ``development/TTP_TAGGING.md:2450``. +* the lifter achieves the per-rule precision target on the labelled + corpus. """ from __future__ import annotations +import asyncio from collections.abc import Callable from pathlib import Path import pytest +from decnet.ttp.impl.behavioral_lifter import BehavioralLifter from decnet.ttp.impl.rule_engine import RuleEngine from decnet.ttp.store.base import RuleState from decnet.ttp.store.impl.filesystem import _parse_and_compile -from tests.ttp.rule_precision.conftest import CorpusRow, make_event +from tests.ttp._stub_store import StubRuleStore +from tests.ttp.rule_precision.conftest import ( + CorpusRow, + make_event, + precision_for, +) CohortLoader = Callable[[str], list[CorpusRow]] @@ -63,15 +68,44 @@ async def test_lifter_bound_inert_in_v0( ) -@pytest.mark.parametrize("rule_id", _RULE_IDS) -@pytest.mark.xfail(strict=True, reason="impl phase E.3.9 (BehavioralLifter)") -def test_behavioral_rule_precision(rule_id: str) -> None: - """Will live once the BehavioralLifter ships at E.3.9. +def _all_rule_ids() -> list[str]: + return _RULE_IDS - The lifter consumes ``AttackerBehavior`` / session aggregates and - emits one tag per matching rule_id. This test will then load the - behavioral corpus, drive the lifter, and assert the per-rule - precision target. Until that day this xfails strict so the suite - flips green automatically when E.3.9 wires it up. + +def _build_lifter() -> BehavioralLifter: + rules_dir = Path("rules/ttp") + rules = [ + _parse_and_compile(rules_dir / f"{rid}.yaml", RuleState()) + for rid in _all_rule_ids() + ] + lifter = BehavioralLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +@pytest.mark.parametrize("rule_id", _RULE_IDS) +def test_behavioral_rule_precision( + rule_id: str, + corpus_loader: CohortLoader, +) -> None: + """Drive the lifter over the behavioral corpus and assert precision. + + H-band (≥0.85 confidence) → ≥95% precision. v0 ships with a small + synthetic seed corpus; precision_for() returns 1.0 when no rows + match, so the assertion exercises the FP-guard rather than the + recall property (recall is intentionally not a v1 target — see + TTP_TAGGING.md Appendix C). """ - pytest.fail(f"{rule_id}: BehavioralLifter not yet shipped (E.3.9)") + rows = corpus_loader("behavioral") + if not rows: + pytest.skip("no behavioral corpus available") + lifter = _build_lifter() + fired: dict[str, list[str]] = {} + for row in rows: + tags = asyncio.run(lifter.tag(make_event(row))) + fired[row.label] = [tag.rule_id for tag in tags] + precision, _tp, _fp = precision_for(rule_id, rows, fired) + assert precision >= 0.95, ( + f"{rule_id} precision {precision:.2f} < 0.95 on behavioral corpus" + ) diff --git a/tests/ttp/test_behavioral_lifter.py b/tests/ttp/test_behavioral_lifter.py new file mode 100644 index 00000000..97712990 --- /dev/null +++ b/tests/ttp/test_behavioral_lifter.py @@ -0,0 +1,267 @@ +"""Per-rule unit tests for :class:`BehavioralLifter` (E.3.9). + +Each R003N gets a positive payload that fires the predicate and a +negative payload that does not. State modulation is tested once +(disable / clip) since it's funneled through the shared +:func:`is_active` / :func:`apply_ceiling` helpers. +""" +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl.behavioral_lifter import BehavioralLifter +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + +def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule: + return _parse_and_compile( + _RULES_DIR / f"{rule_id}.yaml", state or RuleState(), + ) + + +def _ev(source_kind: str, payload: dict[str, Any]) -> TaggerEvent: + return TaggerEvent( + source_kind=source_kind, + source_id=f"src-{source_kind}", + attacker_uuid="att1", + identity_uuid=None, + session_id="sess1", + decky_id=None, + payload=payload, + ) + + +def _make_lifter_with(rule_ids: list[str]) -> BehavioralLifter: + rules = [_compile(rid) for rid in rule_ids] + lifter = BehavioralLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +# ── Per-rule positive cases ───────────────────────────────────────── + + +@pytest.mark.parametrize( + "rule_id,source_kind,payload,techniques", + [ + ( + "R0031", + "session", + {"beacon_interval_s": 60, "beacon_jitter_pct": 0.05}, + {"T1071", "T1029"}, + ), + ( + "R0032", + "session", + {"command_text": "FLUSHALL", "op_text": "FLUSHALL"}, + {"T1485"}, + ), + ( + "R0033", + "session", + {"body_text": "Send 0.5 BTC to 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa to decrypt"}, + {"T1486"}, + ), + ( + "R0034", + "session", + {"bytes_out": 5_000_000, "request_count": 100}, + {"T1567"}, + ), + ( + "R0035", + "session", + {"rows_read": 50_000, "bytes_read": 1_000}, + {"T1213"}, + ), + ( + "R0036", + "http_request", + {"request_path": "/var/www/.env"}, + {"T1552"}, + ), + ( + "R0037", + "http_request", + {"request_path": "/api/v1/namespaces/default/secrets"}, + {"T1552"}, + ), + ( + "R0038", + "session", + {"signals": ["privileged:true", "image:nginx"]}, + {"T1611"}, + ), + ( + "R0039", + "session", + {"llmnr_poisoned": True}, + {"T1557"}, + ), + ( + "R0040", + "session", + {"tftp_filename": "router-startup-config"}, + {"T1602"}, + ), + ], +) +def test_rule_fires_on_positive_payload( + rule_id: str, + source_kind: str, + payload: dict[str, Any], + techniques: set[str], +) -> None: + lifter = _make_lifter_with([rule_id]) + out = asyncio.run(lifter.tag(_ev(source_kind, payload))) + assert out, f"{rule_id} did not fire on its positive payload" + fired = {tag.technique_id for tag in out} + assert fired == techniques + for tag in out: + assert tag.rule_id == rule_id + assert tag.attacker_uuid == "att1" + + +# ── Negative cases ────────────────────────────────────────────────── + + +def test_beaconing_rejects_high_jitter() -> None: + lifter = _make_lifter_with(["R0031"]) + out = asyncio.run(lifter.tag( + _ev("session", {"beacon_interval_s": 60, "beacon_jitter_pct": 0.5}), + )) + assert out == [] + + +def test_beaconing_rejects_short_interval() -> None: + lifter = _make_lifter_with(["R0031"]) + out = asyncio.run(lifter.tag( + _ev("session", {"beacon_interval_s": 2, "beacon_jitter_pct": 0.05}), + )) + assert out == [] + + +def test_data_destruction_rejects_unrelated_text() -> None: + lifter = _make_lifter_with(["R0032"]) + out = asyncio.run(lifter.tag( + _ev("session", {"command_text": "SELECT 1"}), + )) + assert out == [] + + +def test_ransom_note_requires_btc_or_xmr_when_flagged() -> None: + lifter = _make_lifter_with(["R0033"]) + # has keyword but no address + out = asyncio.run(lifter.tag( + _ev("session", {"body_text": "send bitcoin to decrypt"}), + )) + assert out == [] + + +def test_exfil_below_thresholds_no_fire() -> None: + lifter = _make_lifter_with(["R0034"]) + out = asyncio.run(lifter.tag( + _ev("session", {"bytes_out": 100, "request_count": 1}), + )) + assert out == [] + + +def test_path_match_rules_skip_unrelated_paths() -> None: + lifter = _make_lifter_with(["R0036", "R0037"]) + out = asyncio.run(lifter.tag( + _ev("http_request", {"request_path": "/index.html"}), + )) + assert out == [] + + +def test_event_source_kind_outside_applies_to_no_fire() -> None: + """A behavioral rule with applies_to=[session] must not fire on + an http_request event even if the predicate would otherwise pass. + """ + lifter = _make_lifter_with(["R0031"]) + out = asyncio.run(lifter.tag( + _ev("http_request", {"beacon_interval_s": 60, "beacon_jitter_pct": 0.05}), + )) + assert out == [] + + +# ── State modulation ──────────────────────────────────────────────── + + +def test_disabled_state_skips_emit() -> None: + rule = _compile("R0031", RuleState(state="disabled")) + lifter = BehavioralLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag( + _ev("session", {"beacon_interval_s": 60, "beacon_jitter_pct": 0.05}), + )) + assert out == [] + + +def test_clipped_state_caps_confidence() -> None: + rule = _compile("R0031", RuleState(state="clipped", confidence_max=0.5)) + lifter = BehavioralLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag( + _ev("session", {"beacon_interval_s": 60, "beacon_jitter_pct": 0.05}), + )) + # Base confidences in YAML are 0.8 and 0.85; clipped to 0.5 ceiling + # → 0.4 and 0.425 respectively. + assert out + for tag in out: + assert tag.confidence < 0.5 + + +def test_expired_state_treated_as_disabled() -> None: + rule = _compile( + "R0031", + RuleState( + state="enabled", + expires_at=datetime.now(timezone.utc) - timedelta(seconds=1), + ), + ) + lifter = BehavioralLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag( + _ev("session", {"beacon_interval_s": 60, "beacon_jitter_pct": 0.05}), + )) + assert out == [] + + +# ── Ownership / hot-reload via watch_store hydration ──────────────── + + +def test_owns_only_behavioral_prefix() -> None: + intel = _compile("R0054") # match.kind = lifter:intel_abuseipdb + behavioral = _compile("R0031") + lifter = BehavioralLifter( + StubRuleStore(compiled=[intel, behavioral]), + ) + asyncio.run(lifter._index.hydrate_from( + lifter._store, predicate=lifter._owns, # type: ignore[arg-type] + )) + assert lifter._index.get("R0031") is not None + assert lifter._index.get("R0054") is None + + +def test_tolerates_absent_payload(caplog: pytest.LogCaptureFixture) -> None: + """The empty payload steady-state must not produce ERROR records.""" + caplog.set_level(logging.DEBUG) + lifter = _make_lifter_with(["R0031", "R0032", "R0036"]) + out = asyncio.run(lifter.tag(_ev("session", {}))) + assert out == [] + assert not [r for r in caplog.records if r.levelno >= logging.ERROR] diff --git a/tests/ttp/test_factory.py b/tests/ttp/test_factory.py index 3185a6f0..897509ba 100644 --- a/tests/ttp/test_factory.py +++ b/tests/ttp/test_factory.py @@ -28,12 +28,16 @@ def _ev(source_kind: str) -> TaggerEvent: ) -def test_default_returns_composite_with_empty_lifters(monkeypatch): +def test_default_returns_composite_with_shipped_lifters(monkeypatch): + """E.3.9 onward: the default composite is wired with each shipped + lifter. Empty-lifters was the contract-phase shape; once a lifter + impl lands the composite carries it. + """ monkeypatch.delenv("DECNET_TTP_TAGGER_TYPE", raising=False) t = get_tagger() assert isinstance(t, CompositeTagger) assert t.name == "composite" - assert t._lifters == [] + assert len(t._lifters) >= 1 def test_explicit_composite(monkeypatch): diff --git a/tests/ttp/test_lifter_absence.py b/tests/ttp/test_lifter_absence.py index caf7f297..8653b52a 100644 --- a/tests/ttp/test_lifter_absence.py +++ b/tests/ttp/test_lifter_absence.py @@ -33,6 +33,18 @@ from decnet.ttp.impl.credential_lifter import CredentialLifter from decnet.ttp.impl.email_lifter import EmailLifter from decnet.ttp.impl.identity_lifter import IdentityLifter from decnet.ttp.impl.intel_lifter import IntelLifter +from tests.ttp._stub_store import StubRuleStore + + +def _make_lifter(cls: type[TolerantTagger]) -> TolerantTagger: + """Construct a lifter with whatever its current signature wants. + + Implemented lifters (E.3.9–E.3.12) take a :class:`RuleStore`; the + still-empty IdentityLifter / CredentialLifter (E.3.13) take no args. + """ + if cls is BehavioralLifter: + return cls(StubRuleStore()) # type: ignore[call-arg] + return cls() def _ev(source_kind: str, payload: dict[str, Any] | None = None) -> TaggerEvent: @@ -77,7 +89,7 @@ def test_lifter_tolerates_absence( ) -> None: caplog.clear() caplog.set_level(logging.DEBUG) - lifter = lifter_cls() + lifter = _make_lifter(lifter_cls) out = asyncio.run(lifter.tag(_ev(source_kind, payload))) assert out == [] # The load-bearing property: no ERROR-or-above records. WARNING diff --git a/tests/ttp/test_lifters.py b/tests/ttp/test_lifters.py index a97e525d..d5e2c4a4 100644 --- a/tests/ttp/test_lifters.py +++ b/tests/ttp/test_lifters.py @@ -20,6 +20,13 @@ from decnet.ttp.impl.credential_lifter import CredentialLifter from decnet.ttp.impl.email_lifter import EmailLifter from decnet.ttp.impl.identity_lifter import IdentityLifter from decnet.ttp.impl.intel_lifter import IntelLifter +from tests.ttp._stub_store import StubRuleStore + + +def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger: + if cls is BehavioralLifter: + return cls(StubRuleStore()) # type: ignore[call-arg] + return cls() ALL_LIFTERS = [ BehavioralLifter, @@ -65,7 +72,7 @@ def test_lifter_names_are_unique_and_non_empty(): @pytest.mark.parametrize("cls", ALL_LIFTERS) def test_lifter_tag_returns_empty_list_for_handled_event(cls): - lifter = cls() + lifter = _instantiate(cls) kind = next(iter(cls.HANDLES)) out = asyncio.run(lifter.tag(_ev(kind))) assert out == [] @@ -74,7 +81,7 @@ def test_lifter_tag_returns_empty_list_for_handled_event(cls): @pytest.mark.parametrize("cls", ALL_LIFTERS) def test_lifter_instantiable(cls): # No abstract methods left — concrete subclass must be constructible. - cls() + _instantiate(cls) # ── E.2.6 deferred absence-tolerance behavior ────────────────────── @@ -85,6 +92,10 @@ def test_e26_intel_lifter_partial_provider_nulls(): raise AssertionError("not yet implemented") -@pytest.mark.xfail(strict=True, reason="impl phase E.3 — BehavioralLifter empty join") def test_e26_behavioral_lifter_no_attacker_behavior_row(): - raise AssertionError("not yet implemented") + """E.3.9: a session event with no AttackerBehavior fields populated + must produce zero tags and zero errors. Was xfail-strict before + BehavioralLifter shipped; now a real assertion.""" + lifter = BehavioralLifter(StubRuleStore()) + out = asyncio.run(lifter.tag(_ev("session"))) + assert out == []