diff --git a/decnet/ttp/impl/credential_lifter.py b/decnet/ttp/impl/credential_lifter.py index 81f8509c..8d9a7666 100644 --- a/decnet/ttp/impl/credential_lifter.py +++ b/decnet/ttp/impl/credential_lifter.py @@ -1,24 +1,185 @@ -"""Credential lifter — credential-capture / reuse technique tagger. +"""Credential lifter — credential-capture / reuse / brute-force tagger. -Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. -Implementation phase reads ``Credential`` and ``CredentialReuse`` rows -populated by the reuse-correlator and emits Credential-Access / -Lateral-Movement techniques. Tolerates absence of the reuse-correlator -output by inheriting :class:`TolerantTagger` — the correlator is a -sibling worker, not a hard dependency. +E.3.13 of ``development/TTP_TAGGING.md``. Owns rules whose +``match.kind`` starts with ``lifter:credential_``. Currently: + +* R0001 ``lifter:credential_auth_brute_generic`` — repeated failed + auth across services / accounts on a single attacker. +* R0002 ``lifter:credential_password_guessing`` — many passwords + tried against one username. +* R0004 ``lifter:credential_reuse`` — credential observed re-used + across attackers (``CredentialReuse`` row on the bus). +* R0005 ``lifter:credential_valid_account_use`` — successful login + on an account previously brute-forced (``T1078`` valid account). +* R0006 ``lifter:credential_default_credentials`` — login pair + matches a known default (``root/root``, ``admin/admin``, …). + +Tolerates absence by inheriting :class:`TolerantTagger` — the +reuse-correlator is a sibling worker, not a hard dependency. +Predicates accept payloads from either ``credential.reuse.detected`` +events (``credential`` source kind) or session-aggregated auth +streams (``auth_attempt`` source kind); each rule's ``applies_to`` +gates the dispatch. """ from __future__ import annotations +from collections.abc import Callable +from typing import Any, Final + from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import is_active +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleStore from decnet.web.db.models.ttp import TTPTag +Predicate = Callable[[dict[str, Any], dict[str, Any]], "dict[str, Any] | None"] + + +def _p_auth_brute_generic( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + """R0001 — total auth failures over a window cross a threshold.""" + fail_count = payload.get("fail_count") + if not isinstance(fail_count, int): + return None + threshold = spec.get("fail_threshold", 5) + if not isinstance(threshold, int) or fail_count < threshold: + return None + out: dict[str, Any] = {"fail_count": fail_count} + service = payload.get("service") + if isinstance(service, str) and service: + out["service"] = service + return out + + +def _p_password_guessing( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + """R0002 — many distinct passwords tried against one username.""" + pw_count = payload.get("password_count") + username = payload.get("username") + if not isinstance(pw_count, int) or not isinstance(username, str): + return None + if not username: + return None + threshold = spec.get("pw_threshold", 5) + if not isinstance(threshold, int) or pw_count < threshold: + return None + return {"username": username, "password_count": pw_count} + + +def _p_credential_reuse( + _spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + """R0004 — ``CredentialReuse`` row indicates a hash seen on ≥2 attackers.""" + cred_hash = payload.get("credential_hash") + reuse_count = payload.get("reuse_count") + if not isinstance(cred_hash, str) or not cred_hash: + return None + if not isinstance(reuse_count, int) or reuse_count < 1: + return None + return {"credential_hash": cred_hash, "reuse_count": reuse_count} + + +def _p_valid_account_use( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + """R0005 — successful login on a previously-brute-forced account.""" + if payload.get("result") != "success": + return None + if spec.get("require_prior_brute"): + if payload.get("prior_brute") is not True: + return None + out: dict[str, Any] = {} + username = payload.get("username") + service = payload.get("service") + if isinstance(username, str) and username: + out["username"] = username + if isinstance(service, str) and service: + out["service"] = service + return out + + +def _p_default_credentials( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + """R0006 — login pair matches one of the known-default pairs.""" + username = payload.get("username") + password = payload.get("password") + if not isinstance(username, str) or not isinstance(password, str): + return None + pairs = spec.get("pairs", []) + if not isinstance(pairs, list): + return None + for pair in pairs: + if not isinstance(pair, list) or len(pair) != 2: + continue + u, p = pair + if not isinstance(u, str) or not isinstance(p, str): + continue + if username == u and password == p: + out: dict[str, Any] = {"username": username} + service = payload.get("service") + if isinstance(service, str) and service: + out["service"] = service + return out + return None + + +_PREDICATES: Final[dict[str, Predicate]] = { + "lifter:credential_auth_brute_generic": _p_auth_brute_generic, + "lifter:credential_password_guessing": _p_password_guessing, + "lifter:credential_reuse": _p_credential_reuse, + "lifter:credential_valid_account_use": _p_valid_account_use, + "lifter:credential_default_credentials": _p_default_credentials, +} + + class CredentialLifter(TolerantTagger): name = "credential" - HANDLES = frozenset({"credential"}) + #: Auth-attempt streams plus credential-reuse events both flow + #: through this lifter — the per-rule ``applies_to`` filter + #: routes each rule to the correct source kind. + HANDLES = frozenset({"credential", "auth_attempt"}) + OWNED_PREFIX: Final[str] = "lifter:credential_" + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX) + + async def watch_store(self) -> None: + await self._index.watch(self._store, predicate=self._owns) async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: - return [] + out: list[TTPTag] = [] + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + kind = rule.match_spec.get("kind", "") + handler = _PREDICATES.get(kind) + if handler is None: + continue + extra = handler(rule.match_spec, event.payload) + if extra is None: + continue + evidence: dict[str, Any] = { + field: event.payload.get(field) + for field in rule.evidence_fields + if field in event.payload + } + evidence.update(extra) + out.extend(emit_tags(rule, event, evidence)) + return out __all__ = ["CredentialLifter"] diff --git a/decnet/ttp/impl/identity_lifter.py b/decnet/ttp/impl/identity_lifter.py index 09f8ff21..0272d3f3 100644 --- a/decnet/ttp/impl/identity_lifter.py +++ b/decnet/ttp/impl/identity_lifter.py @@ -1,26 +1,114 @@ """Identity lifter — cross-attacker identity-rollup tagger. -Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. -Implementation phase reads identity-formation events (the clusterer -publishing ``identity.formed``) and emits techniques that are only -visible at the identity scope, never per-attacker — for example, -infrastructure rotation or credential reuse across IPs that were -clustered into one identity. Tags carry ``identity_uuid`` and a NULL -``attacker_uuid`` per the design doc's "identity rollup" worked -example. +E.3.13 of ``development/TTP_TAGGING.md``. Owns rules whose +``match.kind`` starts with ``lifter:identity_`` (currently R0003, +password spraying). Reads identity-rollup payloads delivered when +the clusterer publishes ``identity.formed`` / ``identity.merged``: +shape carries ``identity_uuid`` plus aggregate fields the rule's +predicate inspects (``shared_password_hash``, ``account_count``, +member ``attacker_uuid`` set, etc.). + +Tags emitted by this lifter carry ``identity_uuid`` populated and +``attacker_uuid=NULL`` per the design doc's "identity rollup" +worked example — the tag belongs to the Identity, not to any one +member IP. """ from __future__ import annotations +from collections.abc import Callable +from typing import Any, Final + from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import is_active +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleStore from decnet.web.db.models.ttp import TTPTag +# Predicate returns supplemental evidence on a fire (may be empty), +# or ``None`` when the rule does not fire on this event. +Predicate = Callable[[dict[str, Any], dict[str, Any]], "dict[str, Any] | None"] + + +def _p_password_spraying( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + """R0003 — same password tried across many accounts. + + Predicate fires when the clusterer-supplied ``account_count`` + meets or exceeds the rule's ``account_threshold`` AND a + ``shared_password_hash`` is present (so the tag points at a + specific reused-password observation, not just a count). The + threshold defaults to 1 only as a safety net — production + YAML pins ``account_threshold: 3``. + """ + shared_hash = payload.get("shared_password_hash") + account_count = payload.get("account_count") + if not isinstance(shared_hash, str) or not shared_hash: + return None + if not isinstance(account_count, int): + return None + threshold = spec.get("account_threshold", 1) + if not isinstance(threshold, int): + return None + if account_count < threshold: + return None + return { + "shared_password_hash": shared_hash, + "account_count": account_count, + } + + +_PREDICATES: Final[dict[str, Predicate]] = { + "lifter:identity_password_spraying": _p_password_spraying, +} + + class IdentityLifter(TolerantTagger): name = "identity" HANDLES = frozenset({"identity"}) + OWNED_PREFIX: Final[str] = "lifter:identity_" + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX) + + async def watch_store(self) -> None: + await self._index.watch(self._store, predicate=self._owns) async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: - return [] + out: list[TTPTag] = [] + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + kind = rule.match_spec.get("kind", "") + handler = _PREDICATES.get(kind) + if handler is None: + continue + extra = handler(rule.match_spec, event.payload) + if extra is None: + continue + evidence: dict[str, Any] = { + field: event.payload.get(field) + for field in rule.evidence_fields + if field in event.payload + } + evidence.update(extra) + # Identity-rollup tags carry identity_uuid, never an + # attacker_uuid — null out whatever the upstream event + # carried so the worked-example invariant holds. + rolled = event._replace(attacker_uuid=None) + out.extend(emit_tags(rule, rolled, evidence)) + return out __all__ = ["IdentityLifter"] diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index efb38702..44421a75 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -3024,7 +3024,15 @@ Order: open-relay (R0041) territory. 13. **IdentityLifter + CredentialLifter** — cross-Attacker rollups. Bus-wake on `identity.formed` / `identity.merged` / - `credential.reuse.detected`. + `credential.reuse.detected`. ✅ done. IdentityLifter owns + `lifter:identity_*` (R0003 password spraying); CredentialLifter + owns `lifter:credential_*` (R0001 generic auth brute, R0002 + password guessing, R0004 reuse, R0005 valid-account use, R0006 + default credentials). Identity-rollup tags null `attacker_uuid` + on emit so the worked-example invariant holds. R0001/R0002/R0005/ + R0006 YAML kinds were normalised to the `lifter:credential_` + prefix in this commit (the doc-promised "YAMLs normalised in a + separate refactor commit" lands here, not in E.3.9). 14. **Worker bootstrap** — wire up the loop, the `CompositeTagger`, the bus subscriptions, the `RuleEngine` watching the `RuleStore`. `test_worker_bus.py` green diff --git a/rules/ttp/R0001.yaml b/rules/ttp/R0001.yaml index f79fd2a5..b9d655da 100644 --- a/rules/ttp/R0001.yaml +++ b/rules/ttp/R0001.yaml @@ -3,12 +3,12 @@ rule_version: 1 name: generic_auth_brute description: | Repeated failed auth across services/accounts. Cross-event; - emitted by the BehavioralLifter (E.3.9) — v0 RuleEngine cannot + emitted by the CredentialLifter (E.3.13) — v0 RuleEngine cannot count. applies_to: - auth_attempt match: - kind: lifter:auth_brute_generic + kind: lifter:credential_auth_brute_generic fail_threshold: 5 window_minutes: 5 emits: diff --git a/rules/ttp/R0002.yaml b/rules/ttp/R0002.yaml index ae42f9e7..c6324ec9 100644 --- a/rules/ttp/R0002.yaml +++ b/rules/ttp/R0002.yaml @@ -3,11 +3,11 @@ rule_version: 1 name: password_guessing description: | Multiple passwords tried against a single account in a window. - Cross-event; BehavioralLifter (E.3.9). + Cross-event; CredentialLifter (E.3.13). applies_to: - auth_attempt match: - kind: lifter:password_guessing + kind: lifter:credential_password_guessing pw_threshold: 5 window_minutes: 5 emits: diff --git a/rules/ttp/R0003.yaml b/rules/ttp/R0003.yaml index 608c410d..cdcc469d 100644 --- a/rules/ttp/R0003.yaml +++ b/rules/ttp/R0003.yaml @@ -7,7 +7,7 @@ description: | applies_to: - identity match: - kind: lifter:password_spraying + kind: lifter:identity_password_spraying account_threshold: 3 emits: - tactic: TA0006 diff --git a/rules/ttp/R0005.yaml b/rules/ttp/R0005.yaml index d8a34a60..0c7e1259 100644 --- a/rules/ttp/R0005.yaml +++ b/rules/ttp/R0005.yaml @@ -3,11 +3,11 @@ rule_version: 1 name: valid_account_use description: | Successful authentication on a previously-brute-forced account. - BehavioralLifter (E.3.9). + CredentialLifter (E.3.13). applies_to: - auth_attempt match: - kind: lifter:valid_account_use + kind: lifter:credential_valid_account_use require_prior_brute: true emits: - tactic: TA0001 diff --git a/rules/ttp/R0006.yaml b/rules/ttp/R0006.yaml index 72984f95..51a73800 100644 --- a/rules/ttp/R0006.yaml +++ b/rules/ttp/R0006.yaml @@ -3,12 +3,12 @@ rule_version: 1 name: default_credentials description: | Login attempt with a known default credential pair (root/root, - admin/admin, etc.). BehavioralLifter (E.3.9) reads credentials + admin/admin, etc.). CredentialLifter (E.3.13) reads credentials table. applies_to: - auth_attempt match: - kind: lifter:default_credentials + kind: lifter:credential_default_credentials pairs: - [root, root] - [admin, admin] diff --git a/tests/ttp/rule_precision/test_command_rules.py b/tests/ttp/rule_precision/test_command_rules.py index a9eaf2ce..55d7c969 100644 --- a/tests/ttp/rule_precision/test_command_rules.py +++ b/tests/ttp/rule_precision/test_command_rules.py @@ -38,12 +38,12 @@ CohortLoader = Callable[[str], list[CorpusRow]] # Lifter-bound rules: cannot fire from the v0 engine. _LIFTER_BOUND: dict[str, str] = { - "R0001": "impl phase E.3.9 (BehavioralLifter — auth brute count)", - "R0002": "impl phase E.3.9 (BehavioralLifter — password guessing)", + "R0001": "impl phase E.3.13 (CredentialLifter — auth brute count)", + "R0002": "impl phase E.3.13 (CredentialLifter — password guessing)", "R0003": "impl phase E.3.13 (IdentityLifter — password spraying)", "R0004": "impl phase E.3.13 (CredentialLifter — credential reuse)", - "R0005": "impl phase E.3.9 (BehavioralLifter — valid account use)", - "R0006": "impl phase E.3.9 (BehavioralLifter — default creds)", + "R0005": "impl phase E.3.13 (CredentialLifter — valid account use)", + "R0006": "impl phase E.3.13 (CredentialLifter — default creds)", "R0030": "impl phase E.3.9 (BehavioralLifter — JARM/HASSH match)", } diff --git a/tests/ttp/test_credential_lifter.py b/tests/ttp/test_credential_lifter.py new file mode 100644 index 00000000..b2af1905 --- /dev/null +++ b/tests/ttp/test_credential_lifter.py @@ -0,0 +1,204 @@ +"""Per-rule unit tests for :class:`CredentialLifter` (E.3.13).""" +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl.credential_lifter import CredentialLifter +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + +def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule: + return _parse_and_compile( + _RULES_DIR / f"{rule_id}.yaml", state or RuleState(), + ) + + +def _ev(source_kind: str, payload: dict[str, Any]) -> TaggerEvent: + return TaggerEvent( + source_kind=source_kind, + source_id=f"src-{source_kind}", + attacker_uuid="att1", + identity_uuid="id1", + session_id="sess1", + decky_id="d1", + payload=payload, + ) + + +def _make_lifter(rule_ids: list[str]) -> CredentialLifter: + rules = [_compile(rid) for rid in rule_ids] + lifter = CredentialLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +# ── R0001 generic auth brute ──────────────────────────────────────── + + +def test_auth_brute_fires_above_threshold() -> None: + lifter = _make_lifter(["R0001"]) + out = asyncio.run(lifter.tag(_ev("auth_attempt", { + "fail_count": 12, "service": "ssh", + }))) + assert len(out) == 1 + assert out[0].technique_id == "T1110" + assert out[0].evidence["fail_count"] == 12 + assert out[0].evidence["service"] == "ssh" + + +def test_auth_brute_below_threshold() -> None: + lifter = _make_lifter(["R0001"]) + assert asyncio.run(lifter.tag(_ev("auth_attempt", { + "fail_count": 2, "service": "ssh", + }))) == [] + + +# ── R0002 password guessing ───────────────────────────────────────── + + +def test_password_guessing_fires() -> None: + lifter = _make_lifter(["R0002"]) + out = asyncio.run(lifter.tag(_ev("auth_attempt", { + "username": "root", "password_count": 8, + }))) + assert len(out) == 1 + assert out[0].sub_technique_id == "T1110.001" + assert out[0].evidence["password_count"] == 8 + + +def test_password_guessing_no_username() -> None: + lifter = _make_lifter(["R0002"]) + assert asyncio.run(lifter.tag(_ev("auth_attempt", { + "password_count": 8, + }))) == [] + + +# ── R0004 credential reuse ────────────────────────────────────────── + + +def test_credential_reuse_fires() -> None: + lifter = _make_lifter(["R0004"]) + out = asyncio.run(lifter.tag(_ev("credential", { + "credential_hash": "sha256:abc", "reuse_count": 3, + }))) + assert len(out) == 1 + assert out[0].sub_technique_id == "T1110.004" + assert out[0].evidence["reuse_count"] == 3 + + +def test_credential_reuse_zero_count() -> None: + lifter = _make_lifter(["R0004"]) + assert asyncio.run(lifter.tag(_ev("credential", { + "credential_hash": "sha256:abc", "reuse_count": 0, + }))) == [] + + +def test_credential_reuse_wrong_source_kind() -> None: + """R0004 applies_to=credential — an auth_attempt event must not fire it.""" + lifter = _make_lifter(["R0004"]) + assert asyncio.run(lifter.tag(_ev("auth_attempt", { + "credential_hash": "x", "reuse_count": 5, + }))) == [] + + +# ── R0005 valid account use ───────────────────────────────────────── + + +def test_valid_account_requires_prior_brute() -> None: + lifter = _make_lifter(["R0005"]) + # Successful login but no prior_brute — must not fire. + assert asyncio.run(lifter.tag(_ev("auth_attempt", { + "result": "success", "username": "root", "service": "ssh", + }))) == [] + out = asyncio.run(lifter.tag(_ev("auth_attempt", { + "result": "success", "prior_brute": True, + "username": "root", "service": "ssh", + }))) + assert len(out) == 1 + assert out[0].technique_id == "T1078" + + +def test_valid_account_failed_login_does_not_fire() -> None: + lifter = _make_lifter(["R0005"]) + assert asyncio.run(lifter.tag(_ev("auth_attempt", { + "result": "fail", "prior_brute": True, + "username": "root", "service": "ssh", + }))) == [] + + +# ── R0006 default credentials ─────────────────────────────────────── + + +def test_default_credentials_match() -> None: + lifter = _make_lifter(["R0006"]) + out = asyncio.run(lifter.tag(_ev("auth_attempt", { + "username": "root", "password": "root", "service": "ssh", + }))) + assert len(out) == 1 + assert out[0].sub_technique_id == "T1078.001" + assert out[0].evidence["username"] == "root" + + +def test_default_credentials_no_match() -> None: + lifter = _make_lifter(["R0006"]) + assert asyncio.run(lifter.tag(_ev("auth_attempt", { + "username": "root", "password": "hunter2", "service": "ssh", + }))) == [] + + +# ── State modulation (one rule covers the path) ───────────────────── + + +def test_disabled_rule_skipped() -> None: + rule = _compile("R0004", RuleState(state="disabled")) + lifter = CredentialLifter(StubRuleStore(compiled=[rule])) + lifter._index.install(rule) + assert asyncio.run(lifter.tag(_ev("credential", { + "credential_hash": "x", "reuse_count": 3, + }))) == [] + + +def test_clipped_rule_caps_confidence() -> None: + rule = _compile("R0004", RuleState(state="clipped", confidence_max=0.5)) + lifter = CredentialLifter(StubRuleStore(compiled=[rule])) + lifter._index.install(rule) + out = asyncio.run(lifter.tag(_ev("credential", { + "credential_hash": "x", "reuse_count": 3, + }))) + assert len(out) == 1 + # Base 0.9 × 0.5 ceiling. + assert out[0].confidence == pytest.approx(0.45) + + +# ── Ownership predicate ───────────────────────────────────────────── + + +def test_owns_skips_foreign_prefix() -> None: + """Lifter must not pick up rules whose match.kind is in another lifter's prefix.""" + behavioral_rule = _compile("R0031") # lifter:behavioral_beaconing + assert not CredentialLifter._owns(behavioral_rule) + own = _compile("R0001") + assert CredentialLifter._owns(own) + + +# ── Idempotency ───────────────────────────────────────────────────── + + +def test_replay_produces_same_tag_uuid() -> None: + lifter = _make_lifter(["R0001"]) + payload = {"fail_count": 12, "service": "ssh"} + a = asyncio.run(lifter.tag(_ev("auth_attempt", payload))) + b = asyncio.run(lifter.tag(_ev("auth_attempt", payload))) + assert [t.uuid for t in a] == [t.uuid for t in b] diff --git a/tests/ttp/test_identity_lifter.py b/tests/ttp/test_identity_lifter.py new file mode 100644 index 00000000..d56472d9 --- /dev/null +++ b/tests/ttp/test_identity_lifter.py @@ -0,0 +1,141 @@ +"""Per-rule unit tests for :class:`IdentityLifter` (E.3.13). + +Identity-rollup tags carry ``identity_uuid`` populated and +``attacker_uuid=NULL`` per the design doc's worked example — +asserted explicitly here. +""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl.identity_lifter import IdentityLifter +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + +def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule: + return _parse_and_compile( + _RULES_DIR / f"{rule_id}.yaml", state or RuleState(), + ) + + +def _ev(payload: dict[str, Any]) -> TaggerEvent: + return TaggerEvent( + source_kind="identity", + source_id="src-identity", + attacker_uuid="att-irrelevant", + identity_uuid="id-spray-1", + session_id=None, + decky_id=None, + payload=payload, + ) + + +def _make_lifter(rule_ids: list[str]) -> IdentityLifter: + rules = [_compile(rid) for rid in rule_ids] + lifter = IdentityLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +# ── R0003 password spraying ───────────────────────────────────────── + + +def test_password_spraying_fires_when_threshold_met() -> None: + lifter = _make_lifter(["R0003"]) + payload = {"shared_password_hash": "deadbeef", "account_count": 5} + out = asyncio.run(lifter.tag(_ev(payload))) + assert len(out) == 1 + tag = out[0] + assert tag.technique_id == "T1110" + assert tag.sub_technique_id == "T1110.003" + assert tag.tactic == "TA0006" + # Identity-rollup invariant: tag belongs to the Identity, never + # to one member IP. + assert tag.attacker_uuid is None + assert tag.identity_uuid == "id-spray-1" + assert tag.evidence["shared_password_hash"] == "deadbeef" + assert tag.evidence["account_count"] == 5 + + +def test_password_spraying_below_threshold() -> None: + lifter = _make_lifter(["R0003"]) + # account_threshold is 3; account_count=2 must not fire. + payload = {"shared_password_hash": "deadbeef", "account_count": 2} + assert asyncio.run(lifter.tag(_ev(payload))) == [] + + +def test_password_spraying_missing_hash() -> None: + lifter = _make_lifter(["R0003"]) + payload = {"account_count": 9} + assert asyncio.run(lifter.tag(_ev(payload))) == [] + + +def test_password_spraying_wrong_source_kind() -> None: + """Rule applies_to=identity; an event with source_kind=session is ignored.""" + lifter = _make_lifter(["R0003"]) + ev = _ev({"shared_password_hash": "x", "account_count": 9})._replace( + source_kind="session", + ) + assert asyncio.run(lifter.tag(ev)) == [] + + +# ── State modulation ──────────────────────────────────────────────── + + +def test_disabled_rule_does_not_fire() -> None: + rule = _compile("R0003", RuleState(state="disabled")) + lifter = IdentityLifter(StubRuleStore(compiled=[rule])) + lifter._index.install(rule) + payload = {"shared_password_hash": "x", "account_count": 9} + assert asyncio.run(lifter.tag(_ev(payload))) == [] + + +def test_clipped_rule_caps_confidence() -> None: + rule = _compile( + "R0003", + RuleState(state="clipped", confidence_max=0.5), + ) + lifter = IdentityLifter(StubRuleStore(compiled=[rule])) + lifter._index.install(rule) + payload = {"shared_password_hash": "x", "account_count": 9} + out = asyncio.run(lifter.tag(_ev(payload))) + assert len(out) == 1 + # Base confidence 0.9 × 0.5 ceiling clamp. + assert out[0].confidence == pytest.approx(0.45) + + +def test_expired_rule_does_not_fire() -> None: + expired = datetime.now(tz=timezone.utc) - timedelta(hours=1) + rule = _compile( + "R0003", + RuleState(state="enabled", expires_at=expired), + ) + lifter = IdentityLifter(StubRuleStore(compiled=[rule])) + lifter._index.install(rule) + payload = {"shared_password_hash": "x", "account_count": 9} + assert asyncio.run(lifter.tag(_ev(payload))) == [] + + +# ── Idempotency ───────────────────────────────────────────────────── + + +def test_replay_produces_same_tag_uuid() -> None: + """Same source event replayed → identical tag UUID (idempotent).""" + lifter = _make_lifter(["R0003"]) + payload = {"shared_password_hash": "deadbeef", "account_count": 5} + a = asyncio.run(lifter.tag(_ev(payload))) + b = asyncio.run(lifter.tag(_ev(payload))) + assert [t.uuid for t in a] == [t.uuid for t in b] diff --git a/tests/ttp/test_lifter_absence.py b/tests/ttp/test_lifter_absence.py index a9df3b64..dc2d7bc7 100644 --- a/tests/ttp/test_lifter_absence.py +++ b/tests/ttp/test_lifter_absence.py @@ -39,14 +39,9 @@ from tests.ttp._stub_store import StubRuleStore def _make_lifter(cls: type[TolerantTagger]) -> TolerantTagger: """Construct a lifter with whatever its current signature wants. - Implemented lifters (E.3.9–E.3.12) take a :class:`RuleStore`; the - still-empty IdentityLifter / CredentialLifter (E.3.13) take no args. + Every shipped lifter (E.3.9–E.3.13) takes a :class:`RuleStore`. """ - if cls in { - BehavioralLifter, IntelLifter, CanaryFingerprintLifter, EmailLifter, - }: - return cls(StubRuleStore()) # type: ignore[call-arg] - return cls() + return cls(StubRuleStore()) # type: ignore[call-arg] def _ev(source_kind: str, payload: dict[str, Any] | None = None) -> TaggerEvent: diff --git a/tests/ttp/test_lifters.py b/tests/ttp/test_lifters.py index d43ab57b..31c9ef71 100644 --- a/tests/ttp/test_lifters.py +++ b/tests/ttp/test_lifters.py @@ -24,11 +24,8 @@ from tests.ttp._stub_store import StubRuleStore def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger: - if cls in { - BehavioralLifter, IntelLifter, CanaryFingerprintLifter, EmailLifter, - }: - return cls(StubRuleStore()) # type: ignore[call-arg] - return cls() + """Every shipped lifter (E.3.9–E.3.13) takes a :class:`RuleStore`.""" + return cls(StubRuleStore()) # type: ignore[call-arg] ALL_LIFTERS = [ BehavioralLifter,