diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index 9cdf848d..2c60d10c 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -118,6 +118,7 @@ def get_tagger() -> Tagger: from decnet.ttp.impl.canary_fingerprint_lifter import ( CanaryFingerprintLifter, ) + from decnet.ttp.impl.email_lifter import EmailLifter from decnet.ttp.impl.intel_lifter import IntelLifter from decnet.ttp.store.factory import get_rule_store store = get_rule_store() @@ -125,6 +126,7 @@ def get_tagger() -> Tagger: BehavioralLifter(store), IntelLifter(store), CanaryFingerprintLifter(store), + EmailLifter(store), ]) raise ValueError( f"Unknown tagger: {name!r}. Known: {_KNOWN}" diff --git a/decnet/ttp/impl/email_lifter.py b/decnet/ttp/impl/email_lifter.py index 6fc2af31..86e7f6ba 100644 --- a/decnet/ttp/impl/email_lifter.py +++ b/decnet/ttp/impl/email_lifter.py @@ -1,25 +1,402 @@ -"""Email lifter — SMTP message-level technique tagger. +"""Email lifter — SMTP message-level technique tagger (E.3.12). -Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. -Implementation phase parses message-level SMTP signal (headers, -attachment hashes, body sha) and emits Initial-Access / Phishing -techniques. PII discipline (design doc "Hard parts §6") is enforced at -the *type* layer: :class:`~decnet.web.db.models.ttp.EmailEvidence` -intentionally has no fields for raw rcpt addresses or body bytes, so -this lifter cannot leak them even by accident. +Reads pre-parsed SMTP message payload (headers as a name-only list, +body sha + body text already truncated/scrubbed by the upstream worker, +attachment hashes + names) and emits Initial-Access / Phishing / +Resource-Development techniques per Appendix A.6. + +PII discipline (TTP_TAGGING.md §"Hard parts §6") is enforced at the +lifter layer: emitted ``TTPTag.evidence`` only carries fields that +conform to :class:`~decnet.web.db.models.ttp.EmailEvidence` +(``body_sha256``, ``matched_headers`` — names not values, +``rcpt_domain_set`` — domains not addresses, ``attachment_sha256s``, +``rcpt_count``) plus a small set of match-discriminator strings +(``matched_kit``, ``matched_trigger``, ``matched_url``). Raw From / +Return-Path / RCPT addresses, raw body bytes, and decoded payload +previews NEVER appear in evidence. """ from __future__ import annotations +import base64 +import binascii +import hashlib +import re +from collections.abc import Callable +from typing import Any, Final + from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import is_active +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleStore from decnet.web.db.models.ttp import TTPTag +Predicate = Callable[ + [dict[str, Any], dict[str, Any]], + "dict[str, Any] | None", +] + + +# ── Helpers ───────────────────────────────────────────────────────── + + +def _domain(addr_or_domain: str | None) -> str | None: + if not isinstance(addr_or_domain, str): + return None + if not addr_or_domain: + return None + if "@" in addr_or_domain: + return addr_or_domain.split("@", 1)[1].lower().strip() + return addr_or_domain.lower().strip() + + +def _safe_evidence(payload: dict[str, Any]) -> dict[str, Any]: + """Build the EmailEvidence-conformant base evidence dict. + + Only PII-safe keys: body sha (already a hash), header NAMES (not + values), recipient DOMAINS (not addresses), attachment hashes, + rcpt count. Raw addresses, raw body, raw header values explicitly + excluded. + """ + rcpt_domains_raw = payload.get("rcpt_domains") or [] + rcpt_domains = [ + d.lower() for d in rcpt_domains_raw if isinstance(d, str) + ] + attachment_hashes = payload.get("attachment_sha256s") or [] + if not isinstance(attachment_hashes, list): + attachment_hashes = [] + body_sha = payload.get("body_sha256") or "" + if not isinstance(body_sha, str): + body_sha = "" + rcpt_count = payload.get("rcpt_count") + if not isinstance(rcpt_count, int): + rcpt_count = 0 + return { + "body_sha256": body_sha, + "matched_headers": [], + "rcpt_domain_set": sorted(set(rcpt_domains)), + "attachment_sha256s": [ + h for h in attachment_hashes if isinstance(h, str) + ], + "rcpt_count": rcpt_count, + } + + +# ── Per-rule predicates ───────────────────────────────────────────── + + +def _p_open_relay( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + threshold = int(spec.get("rcpt_threshold", 10)) + rcpt_count = payload.get("rcpt_count") + if not isinstance(rcpt_count, int) or rcpt_count < threshold: + return None + if spec.get("require_foreign_from"): + from_domain = _domain(payload.get("from_domain") or payload.get("from")) + mail_from = _domain( + payload.get("mail_from_domain") or payload.get("mail_from"), + ) + if not from_domain or not mail_from or from_domain == mail_from: + return None + return {"matched_headers": ["From", "Mail-From"]} + + +def _p_mass_phish( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + threshold = int(spec.get("rcpt_threshold", 25)) + rcpt_count = payload.get("rcpt_count") + if not isinstance(rcpt_count, int) or rcpt_count < threshold: + return None + # The "campaign" half: upstream must have observed body simhash + # recurring across recipients. Without that signal, high-RCPT alone + # is open-relay territory (R0041), not mass-phish. The simhash + # derivation lives in the SMTP worker (out of scope here). + if not isinstance(payload.get("body_simhash"), (str, int)): + return None + return {} + + +def _p_xmailer_kit( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + x_mailer = payload.get("x_mailer") + if not isinstance(x_mailer, str) or not x_mailer: + return None + matched_kit = payload.get("matched_kit") + if isinstance(matched_kit, str) and matched_kit: + return {"matched_kit": matched_kit, "matched_headers": ["X-Mailer"]} + # Catalogue match flag — upstream marks it via xmailer_kit_match. + if payload.get("xmailer_kit_match") is True: + return {"matched_headers": ["X-Mailer"]} + return None + + +_PUNYCODE_PREFIX_DEFAULT: Final[str] = "xn--" + + +def _p_idn_url( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + prefix = spec.get("punycode_prefix") or _PUNYCODE_PREFIX_DEFAULT + if not isinstance(prefix, str): + prefix = _PUNYCODE_PREFIX_DEFAULT + urls = payload.get("urls") or [] + if not isinstance(urls, list): + return None + for url in urls: + if isinstance(url, str) and prefix in url: + # Carry only the punycode-bearing host portion as a match + # discriminator. NEVER carry the full URL (could contain + # credential-harvest path with PII). + host = _extract_host(url) + return { + "matched_url_host": host or "", + "matched_headers": ["body"], + } + return None + + +def _extract_host(url: str) -> str | None: + m = re.match(r"https?://([^/]+)", url) + if m: + return m.group(1).lower() + return None + + +def _p_sender_masquerade( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + signals_raw = spec.get("signals", []) + if not isinstance(signals_raw, list): + return None + signals = {s for s in signals_raw if isinstance(s, str)} + matched: list[str] = [] + if "from_returnpath_mismatch" in signals: + if ( + _domain(payload.get("from_domain")) is not None + and _domain(payload.get("return_path_domain")) is not None + and _domain(payload.get("from_domain")) + != _domain(payload.get("return_path_domain")) + ): + matched.append("from_returnpath_mismatch") + if "from_mailfrom_mismatch" in signals: + if ( + _domain(payload.get("from_domain")) is not None + and _domain(payload.get("mail_from_domain")) is not None + and _domain(payload.get("from_domain")) + != _domain(payload.get("mail_from_domain")) + ): + matched.append("from_mailfrom_mismatch") + if "dkim_fail" in signals and payload.get("dkim_signed") is False: + matched.append("dkim_fail") + if "spf_fail" in signals and payload.get("spf_pass") is False: + matched.append("spf_fail") + if not matched: + return None + headers: list[str] = [] + if any("from_" in m for m in matched): + headers.extend(["From", "Return-Path"]) + if "dkim_fail" in matched: + headers.append("DKIM-Signature") + if "spf_fail" in matched: + headers.append("Authentication-Results") + return { + "matched_signals": matched, + "matched_headers": sorted(set(headers)), + } + + +def _p_malicious_attachment( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + triggers_raw = spec.get("triggers", []) + triggers = ( + {t for t in triggers_raw if isinstance(t, str)} + if isinstance(triggers_raw, list) + else set() + ) + if "office_macro" in triggers and payload.get("attachment_macros") is True: + return {"matched_trigger": "office_macro"} + if ( + "protected_archive" in triggers + and payload.get("attachment_password_protected") is True + ): + return {"matched_trigger": "protected_archive"} + if "html_smuggling" in triggers and payload.get("html_smuggling") is True: + return {"matched_trigger": "html_smuggling"} + if "mal_hash_match" in triggers and payload.get("mal_hash_match") is True: + return {"matched_trigger": "mal_hash_match"} + extensions = payload.get("attachment_extensions") or [] + if isinstance(extensions, list): + ext_set = { + e.lower().lstrip(".") for e in extensions if isinstance(e, str) + } + for ext_trigger in ("lnk", "iso", "img"): + if ext_trigger in triggers and ext_trigger in ext_set: + return {"matched_trigger": ext_trigger} + return None + + +def _p_bec( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + subject = payload.get("subject") + body_text = payload.get("body_text") + if not isinstance(subject, str) or not isinstance(body_text, str): + return None + subj_kws = spec.get("subject_keywords", []) + body_kws = spec.get("body_action_keywords", []) + if not isinstance(subj_kws, list) or not isinstance(body_kws, list): + return None + subj_l = subject.lower() + body_l = body_text.lower() + subj_hit = next( + (k for k in subj_kws if isinstance(k, str) and k.lower() in subj_l), + None, + ) + body_hit = next( + (k for k in body_kws if isinstance(k, str) and k.lower() in body_l), + None, + ) + if not subj_hit or not body_hit: + return None + return { + "matched_subject_kw": subj_hit, + "matched_body_kw": body_hit, + "matched_headers": ["Subject"], + } + + +_BASE64_RE = re.compile(r"[A-Za-z0-9+/]{32,}={0,2}") + + +def _p_encoded_payload( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + min_bytes = int(spec.get("min_bytes", 4096)) + body_text = payload.get("body_text") + if not isinstance(body_text, str) or not body_text: + return None + # Upstream may pre-compute the largest decoded base64 length. + body_b64_bytes = payload.get("body_base64_bytes") + if isinstance(body_b64_bytes, int) and body_b64_bytes >= min_bytes: + return {"encoded_byte_count": body_b64_bytes} + # Fallback: best-effort scan of the body text. Cap the work at the + # first match >= threshold to avoid quadratic behavior on a hostile + # body. Decoded bytes are NEVER returned — only the count. + for m in _BASE64_RE.finditer(body_text): + chunk = m.group(0) + try: + decoded = base64.b64decode(chunk, validate=True) + except (binascii.Error, ValueError): + continue + if len(decoded) >= min_bytes: + return {"encoded_byte_count": len(decoded)} + return None + + +_PREDICATES: Final[dict[str, Predicate]] = { + "lifter:email_open_relay": _p_open_relay, + "lifter:email_mass_phish": _p_mass_phish, + "lifter:email_xmailer_kit": _p_xmailer_kit, + "lifter:email_idn_url": _p_idn_url, + "lifter:email_sender_masquerade": _p_sender_masquerade, + "lifter:email_malicious_attachment": _p_malicious_attachment, + "lifter:email_bec": _p_bec, + "lifter:email_encoded_payload": _p_encoded_payload, +} + + +# Allowed keys in TTPTag.evidence for source_kind=email. Used both as +# the assembly contract here AND by tests/ttp/test_email_lifter.py to +# guard against a future predicate accidentally leaking PII. +_EMAIL_EVIDENCE_ALLOWED_KEYS: Final[frozenset[str]] = frozenset({ + # EmailEvidence base + "body_sha256", + "matched_headers", + "rcpt_domain_set", + "attachment_sha256s", + "rcpt_count", + # PII-safe match discriminators + "matched_kit", + "matched_trigger", + "matched_url_host", + "matched_signals", + "matched_subject_kw", + "matched_body_kw", + "encoded_byte_count", +}) + + +def _filter_evidence(evidence: dict[str, Any]) -> dict[str, Any]: + """Drop any key not in the PII-safe allowlist. + + Defense-in-depth: even if a predicate accidentally returns a raw + address or body field, this filter strips it before the tag is + constructed. Asserted by ``test_email_lifter.py``. + """ + return { + k: v for k, v in evidence.items() + if k in _EMAIL_EVIDENCE_ALLOWED_KEYS + } + + class EmailLifter(TolerantTagger): name = "email" HANDLES = frozenset({"email"}) + OWNED_PREFIX: Final[str] = "lifter:email_" + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX) + + async def watch_store(self) -> None: + await self._index.watch(self._store, predicate=self._owns) async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: - return [] + out: list[TTPTag] = [] + base_evidence = _safe_evidence(event.payload) + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + kind = rule.match_spec.get("kind", "") + handler = _PREDICATES.get(kind) + if handler is None: + continue + extra = handler(rule.match_spec, event.payload) + if extra is None: + continue + evidence = dict(base_evidence) + # Allow predicates to extend matched_headers without + # clobbering the base list. + extra_headers = extra.pop("matched_headers", None) + if isinstance(extra_headers, list): + merged = list(evidence.get("matched_headers", [])) + merged.extend(h for h in extra_headers if isinstance(h, str)) + evidence["matched_headers"] = sorted(set(merged)) + evidence.update(extra) + evidence = _filter_evidence(evidence) + # Body sha is required by EmailEvidence; if upstream + # didn't supply one, derive from body_text (best-effort). + if not evidence.get("body_sha256"): + body_text = event.payload.get("body_text") + if isinstance(body_text, str) and body_text: + evidence["body_sha256"] = hashlib.sha256( + body_text.encode("utf-8", errors="replace"), + ).hexdigest() + out.extend(emit_tags(rule, event, evidence)) + return out __all__ = ["EmailLifter"] diff --git a/decnet/web/db/models/.attackers.py.swp b/decnet/web/db/models/.attackers.py.swp new file mode 100644 index 00000000..876c5e5e Binary files /dev/null and b/decnet/web/db/models/.attackers.py.swp differ diff --git a/tests/ttp/rule_precision/corpus/seed_email.jsonl b/tests/ttp/rule_precision/corpus/seed_email.jsonl index b6034d59..474854cc 100644 --- a/tests/ttp/rule_precision/corpus/seed_email.jsonl +++ b/tests/ttp/rule_precision/corpus/seed_email.jsonl @@ -1,3 +1,9 @@ -{"source_kind": "email", "payload": {"subject": "Urgent wire transfer needed", "from": "ceo@victim.example", "return_path": "evil@bad.example", "rcpt_count": 1, "body": "Please send $50k to the attached account immediately."}, "expected_rule_ids": ["R0047"], "label": "bec_wire"} -{"source_kind": "email", "payload": {"subject": "Newsletter", "from": "marketing@legit.example", "rcpt_count": 1, "body": "Hello world."}, "expected_rule_ids": [], "label": "negative_newsletter"} -{"source_kind": "email", "payload": {"subject": "Win a prize", "from": "promo@evil.example", "rcpt_count": 250, "body": "Click here http://evil.example/win"}, "expected_rule_ids": ["R0042"], "label": "mass_phish"} +{"source_kind": "email", "payload": {"subject": "Urgent wire transfer needed", "from_domain": "ceo@victim.example", "return_path_domain": "evil.example", "rcpt_count": 1, "body_text": "Please send 50k transfer to the attached account immediately confidential."}, "expected_rule_ids": ["R0045", "R0047"], "label": "bec_wire"} +{"source_kind": "email", "payload": {"subject": "Newsletter", "from_domain": "marketing@legit.example", "rcpt_count": 1, "body_text": "Hello world."}, "expected_rule_ids": [], "label": "negative_newsletter"} +{"source_kind": "email", "payload": {"subject": "Win a prize", "from_domain": "promo@evil.example", "rcpt_count": 250, "body_text": "Click here", "body_simhash": "abc123", "urls": ["http://evil.example/win"]}, "expected_rule_ids": ["R0042"], "label": "mass_phish"} +{"source_kind": "email", "payload": {"rcpt_count": 50, "from_domain": "victim.example", "mail_from_domain": "evil.example"}, "expected_rule_ids": ["R0041", "R0045"], "label": "open_relay"} +{"source_kind": "email", "payload": {"x_mailer": "PHPMailer 6.0 (kit-X)", "matched_kit": "kit-X"}, "expected_rule_ids": ["R0043"], "label": "xmailer_kit"} +{"source_kind": "email", "payload": {"urls": ["https://xn--80ak6aa92e.com/login"]}, "expected_rule_ids": ["R0044"], "label": "idn_url"} +{"source_kind": "email", "payload": {"from_domain": "ceo@victim.example", "return_path_domain": "evil.example"}, "expected_rule_ids": ["R0045"], "label": "from_returnpath_mismatch"} +{"source_kind": "email", "payload": {"attachment_macros": true, "attachment_sha256s": ["d"]}, "expected_rule_ids": ["R0046"], "label": "macro_attach"} +{"source_kind": "email", "payload": {"body_text": "see body", "body_base64_bytes": 8192}, "expected_rule_ids": ["R0048"], "label": "encoded_body"} diff --git a/tests/ttp/rule_precision/test_email_rules.py b/tests/ttp/rule_precision/test_email_rules.py index af27e2d9..b4136161 100644 --- a/tests/ttp/rule_precision/test_email_rules.py +++ b/tests/ttp/rule_precision/test_email_rules.py @@ -48,7 +48,41 @@ async def test_lifter_bound_inert_in_v0( ) +def _build_lifter() -> "EmailLifter": + from decnet.ttp.impl.email_lifter import EmailLifter + from tests.ttp._stub_store import StubRuleStore + + rules = [ + _parse_and_compile(Path("rules/ttp") / f"{rid}.yaml", RuleState()) + for rid in _RULE_IDS + ] + lifter = EmailLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + @pytest.mark.parametrize("rule_id", _RULE_IDS) -@pytest.mark.xfail(strict=True, reason="impl phase E.3.12 (EmailLifter)") -def test_email_rule_precision(rule_id: str) -> None: - pytest.fail(f"{rule_id}: EmailLifter not yet shipped (E.3.12)") +def test_email_rule_precision( + rule_id: str, + corpus_loader: CohortLoader, +) -> None: + """E.3.12 — drive EmailLifter over the labelled corpus and assert + per-rule precision. R0041–R0048 are all H-band (≥0.85) → ≥95%. + """ + import asyncio + + from tests.ttp.rule_precision.conftest import precision_for + + rows = corpus_loader("email") + if not rows: + pytest.skip("no email corpus available") + lifter = _build_lifter() + fired: dict[str, list[str]] = {} + for row in rows: + tags = asyncio.run(lifter.tag(make_event(row))) + fired[row.label] = [tag.rule_id for tag in tags] + precision, _tp, _fp = precision_for(rule_id, rows, fired) + assert precision >= 0.95, ( + f"{rule_id} precision {precision:.2f} < 0.95 on email corpus" + ) diff --git a/tests/ttp/test_email_lifter.py b/tests/ttp/test_email_lifter.py new file mode 100644 index 00000000..78e4e433 --- /dev/null +++ b/tests/ttp/test_email_lifter.py @@ -0,0 +1,305 @@ +"""Per-rule unit tests for :class:`EmailLifter` (E.3.12). + +Pins R0041–R0048 predicates and the EmailEvidence PII discipline: +emitted ``TTPTag.evidence`` MUST NOT contain raw addresses, raw body +bytes, or full URLs (only hashed / domain / matched-discriminator +forms are permitted). +""" +from __future__ import annotations + +import asyncio +import hashlib +import logging +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl.email_lifter import ( + _EMAIL_EVIDENCE_ALLOWED_KEYS, + EmailLifter, +) +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + +def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule: + return _parse_and_compile( + _RULES_DIR / f"{rule_id}.yaml", state or RuleState(), + ) + + +def _ev(payload: dict[str, Any]) -> TaggerEvent: + return TaggerEvent( + source_kind="email", + source_id="src-email", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload=payload, + ) + + +def _make_lifter(rule_ids: list[str]) -> EmailLifter: + rules = [_compile(rid) for rid in rule_ids] + lifter = EmailLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +# ── Per-rule positives ───────────────────────────────────────────── + + +def test_open_relay_fires_on_high_rcpt_foreign_from() -> None: + lifter = _make_lifter(["R0041"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 50, + "from_domain": "victim.example", + "mail_from_domain": "evil.example", + "rcpt_domains": ["target1.example", "target2.example"], + "body_sha256": "a" * 64, + }))) + techs = {tag.technique_id for tag in out} + assert {"T1496", "T1586"} <= techs + + +def test_open_relay_no_fire_on_matching_from() -> None: + lifter = _make_lifter(["R0041"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 50, + "from_domain": "same.example", + "mail_from_domain": "same.example", + }))) + assert out == [] + + +def test_mass_phish_fires_on_threshold_with_simhash() -> None: + lifter = _make_lifter(["R0042"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 100, + "body_simhash": "abc123", + }))) + techs = {tag.technique_id for tag in out} + assert "T1566" in techs + + +def test_mass_phish_no_simhash_no_fire() -> None: + """High RCPT alone is open-relay territory; campaign needs simhash.""" + lifter = _make_lifter(["R0042"]) + out = asyncio.run(lifter.tag(_ev({"rcpt_count": 100}))) + assert out == [] + + +def test_xmailer_kit_fires_with_match() -> None: + lifter = _make_lifter(["R0043"]) + out = asyncio.run(lifter.tag(_ev({ + "x_mailer": "PHPMailer 6.0 (kit-X)", + "matched_kit": "kit-X", + }))) + techs = {tag.technique_id for tag in out} + assert {"T1566", "T1588"} <= techs + + +def test_xmailer_kit_no_match_no_fire() -> None: + lifter = _make_lifter(["R0043"]) + out = asyncio.run(lifter.tag(_ev({"x_mailer": "Outlook 16.0"}))) + assert out == [] + + +def test_idn_url_fires_on_punycode() -> None: + lifter = _make_lifter(["R0044"]) + out = asyncio.run(lifter.tag(_ev({ + "urls": ["https://xn--80ak6aa92e.com/login"], + }))) + techs = {tag.technique_id for tag in out} + assert {"T1036", "T1566"} <= techs + + +def test_sender_masquerade_from_returnpath_mismatch() -> None: + lifter = _make_lifter(["R0045"]) + out = asyncio.run(lifter.tag(_ev({ + "from_domain": "ceo@victim.example", + "return_path_domain": "evil.example", + }))) + techs = {tag.technique_id for tag in out} + assert "T1036" in techs + + +def test_sender_masquerade_dkim_fail() -> None: + lifter = _make_lifter(["R0045"]) + out = asyncio.run(lifter.tag(_ev({ + "dkim_signed": False, + }))) + techs = {tag.technique_id for tag in out} + assert "T1036" in techs + + +def test_malicious_attachment_macro() -> None: + lifter = _make_lifter(["R0046"]) + out = asyncio.run(lifter.tag(_ev({ + "attachment_macros": True, + "attachment_sha256s": ["b" * 64], + }))) + techs = {tag.technique_id for tag in out} + assert {"T1204", "T1566"} <= techs + + +def test_malicious_attachment_lnk_extension() -> None: + lifter = _make_lifter(["R0046"]) + out = asyncio.run(lifter.tag(_ev({ + "attachment_extensions": [".lnk"], + "attachment_sha256s": ["c" * 64], + }))) + techs = {tag.technique_id for tag in out} + assert {"T1204", "T1566"} <= techs + + +def test_bec_subject_and_body_match() -> None: + lifter = _make_lifter(["R0047"]) + out = asyncio.run(lifter.tag(_ev({ + "subject": "URGENT wire transfer needed", + "body_text": "Please send $50k immediately, this is confidential.", + }))) + techs = {tag.technique_id for tag in out} + assert "T1566" in techs + + +def test_bec_no_body_action_no_fire() -> None: + lifter = _make_lifter(["R0047"]) + out = asyncio.run(lifter.tag(_ev({ + "subject": "URGENT review", + "body_text": "Please review the attached doc.", + }))) + assert out == [] + + +def test_encoded_payload_fires_on_precomputed_count() -> None: + lifter = _make_lifter(["R0048"]) + out = asyncio.run(lifter.tag(_ev({ + "body_text": "small body text", + "body_base64_bytes": 8192, + }))) + techs = {tag.technique_id for tag in out} + assert {"T1071", "T1027"} <= techs + + +def test_encoded_payload_below_threshold_no_fire() -> None: + lifter = _make_lifter(["R0048"]) + out = asyncio.run(lifter.tag(_ev({ + "body_text": "small body", + "body_base64_bytes": 100, + }))) + assert out == [] + + +# ── PII discipline ───────────────────────────────────────────────── + + +def test_evidence_keys_subset_of_email_evidence_allowlist() -> None: + """No predicate may leak raw addresses, body bytes, or full URLs.""" + lifter = _make_lifter([ + "R0041", "R0042", "R0043", "R0044", + "R0045", "R0046", "R0047", "R0048", + ]) + payloads = [ + { + "rcpt_count": 50, + "from_domain": "ceo@victim.example", + "mail_from_domain": "evil.example", + "return_path_domain": "evil.example", + "rcpt_domains": ["a.example"], + "x_mailer": "Outlook 16", + "matched_kit": "kit-Y", + "urls": ["https://xn--example.test/path?id=secret"], + "dkim_signed": False, + "spf_pass": False, + "attachment_macros": True, + "attachment_extensions": [".lnk"], + "attachment_sha256s": ["d" * 64], + "subject": "URGENT wire", + "body_text": "please send transfer immediately", + "body_base64_bytes": 8192, + }, + ] + for payload in payloads: + out = asyncio.run(lifter.tag(_ev(payload))) + for tag in out: + disallowed = set(tag.evidence) - _EMAIL_EVIDENCE_ALLOWED_KEYS + assert not disallowed, ( + f"PII leak in {tag.rule_id}: unexpected keys {disallowed}" + ) + + +def test_evidence_carries_no_raw_addresses_or_body() -> None: + lifter = _make_lifter(["R0041", "R0045", "R0047"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 50, + "from_domain": "ceo-direct@victim.example", # full address-shaped + "mail_from_domain": "evil.example", + "return_path_domain": "evil.example", + "subject": "URGENT wire transfer needed", + "body_text": "Send the wire to acct 12345 confidential right now", + "rcpt_domains": ["target.example"], + }))) + assert out + for tag in out: + as_str = repr(tag.evidence) + assert "ceo-direct@" not in as_str + assert "Send the wire" not in as_str + assert "12345" not in as_str + + +def test_body_sha_set_when_upstream_omits() -> None: + lifter = _make_lifter(["R0042"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 100, + "body_text": "some body", + "body_simhash": "abc", + }))) + assert out + expected = hashlib.sha256(b"some body").hexdigest() + for tag in out: + assert tag.evidence["body_sha256"] == expected + + +# ── State + tolerance ────────────────────────────────────────────── + + +def test_disabled_email_rule_no_emit() -> None: + rule = _compile("R0042", RuleState(state="disabled")) + lifter = EmailLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 200, "body_simhash": "abc", + }))) + assert out == [] + + +def test_empty_payload_no_errors(caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.DEBUG) + lifter = _make_lifter([ + "R0041", "R0042", "R0043", "R0044", + "R0045", "R0046", "R0047", "R0048", + ]) + out = asyncio.run(lifter.tag(_ev({}))) + assert out == [] + assert not [r for r in caplog.records if r.levelno >= logging.ERROR] + + +def test_owns_only_email_prefix() -> None: + behavioral = _compile("R0031") + email = _compile("R0041") + lifter = EmailLifter(StubRuleStore(compiled=[behavioral, email])) + asyncio.run(lifter._index.hydrate_from( + lifter._store, predicate=lifter._owns, # type: ignore[arg-type] + )) + assert lifter._index.get("R0041") is not None + assert lifter._index.get("R0031") is None diff --git a/tests/ttp/test_lifter_absence.py b/tests/ttp/test_lifter_absence.py index eb87fba6..a9df3b64 100644 --- a/tests/ttp/test_lifter_absence.py +++ b/tests/ttp/test_lifter_absence.py @@ -42,7 +42,9 @@ def _make_lifter(cls: type[TolerantTagger]) -> TolerantTagger: Implemented lifters (E.3.9–E.3.12) take a :class:`RuleStore`; the still-empty IdentityLifter / CredentialLifter (E.3.13) take no args. """ - if cls in {BehavioralLifter, IntelLifter, CanaryFingerprintLifter}: + if cls in { + BehavioralLifter, IntelLifter, CanaryFingerprintLifter, EmailLifter, + }: return cls(StubRuleStore()) # type: ignore[call-arg] return cls() diff --git a/tests/ttp/test_lifters.py b/tests/ttp/test_lifters.py index fbf5c858..d43ab57b 100644 --- a/tests/ttp/test_lifters.py +++ b/tests/ttp/test_lifters.py @@ -24,7 +24,9 @@ from tests.ttp._stub_store import StubRuleStore def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger: - if cls in {BehavioralLifter, IntelLifter, CanaryFingerprintLifter}: + if cls in { + BehavioralLifter, IntelLifter, CanaryFingerprintLifter, EmailLifter, + }: return cls(StubRuleStore()) # type: ignore[call-arg] return cls()