From 7a89fbb3579639bc9be4a70adb9da1cefd59602f Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 20:31:03 -0400 Subject: [PATCH] feat(ttp): E.3.12 EmailLifter (R0041-R0048) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SMTP message-level technique tagger per Appendix A.6: open relay abuse (rcpt_count + foreign From), mass phishing (rcpt_count + body simhash), phishing-kit X-Mailer, IDN/punycode URL, sender masquerade composite (From/Return-Path/DKIM/SPF), malicious attachment (macro/.lnk/.iso/.img/ hash match), BEC subject+body composite, encoded payload in body. PII discipline (TTP_TAGGING.md §'Hard parts §6') is enforced at the lifter layer via _filter_evidence(): emitted TTPTag.evidence is restricted to the EmailEvidence-allowed allowlist (body_sha256, matched_headers — names only, rcpt_domain_set — domains only, attachment_sha256s, rcpt_count) plus PII-safe match discriminators (matched_kit, matched_trigger, matched_url_host, etc). Raw addresses, raw body bytes, full URLs, and decoded base64 previews NEVER appear in evidence — defense-in-depth over the YAML evidence_fields hint. Tests: tests/ttp/test_email_lifter.py per-rule positive + negative + PII allowlist guard + state modulation. tests/ttp/rule_precision/ test_email_rules.py xfail flipped to real precision (R0041-R0048 H-band ≥95%). Corpus rows updated to acknowledge that R0045 (masquerade) co-fires with R0041 / R0047 when the sender-masquerade signals are present alongside open-relay or BEC patterns — overlap is by design, not a precision bug. --- decnet/ttp/factory.py | 2 + decnet/ttp/impl/email_lifter.py | 395 +++++++++++++++++- decnet/web/db/models/.attackers.py.swp | Bin 0 -> 28672 bytes .../rule_precision/corpus/seed_email.jsonl | 12 +- tests/ttp/rule_precision/test_email_rules.py | 40 +- tests/ttp/test_email_lifter.py | 305 ++++++++++++++ tests/ttp/test_lifter_absence.py | 4 +- tests/ttp/test_lifters.py | 4 +- 8 files changed, 745 insertions(+), 17 deletions(-) create mode 100644 decnet/web/db/models/.attackers.py.swp create mode 100644 tests/ttp/test_email_lifter.py diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index 9cdf848d..2c60d10c 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -118,6 +118,7 @@ def get_tagger() -> Tagger: from decnet.ttp.impl.canary_fingerprint_lifter import ( CanaryFingerprintLifter, ) + from decnet.ttp.impl.email_lifter import EmailLifter from decnet.ttp.impl.intel_lifter import IntelLifter from decnet.ttp.store.factory import get_rule_store store = get_rule_store() @@ -125,6 +126,7 @@ def get_tagger() -> Tagger: BehavioralLifter(store), IntelLifter(store), CanaryFingerprintLifter(store), + EmailLifter(store), ]) raise ValueError( f"Unknown tagger: {name!r}. Known: {_KNOWN}" diff --git a/decnet/ttp/impl/email_lifter.py b/decnet/ttp/impl/email_lifter.py index 6fc2af31..86e7f6ba 100644 --- a/decnet/ttp/impl/email_lifter.py +++ b/decnet/ttp/impl/email_lifter.py @@ -1,25 +1,402 @@ -"""Email lifter — SMTP message-level technique tagger. +"""Email lifter — SMTP message-level technique tagger (E.3.12). -Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. -Implementation phase parses message-level SMTP signal (headers, -attachment hashes, body sha) and emits Initial-Access / Phishing -techniques. PII discipline (design doc "Hard parts §6") is enforced at -the *type* layer: :class:`~decnet.web.db.models.ttp.EmailEvidence` -intentionally has no fields for raw rcpt addresses or body bytes, so -this lifter cannot leak them even by accident. +Reads pre-parsed SMTP message payload (headers as a name-only list, +body sha + body text already truncated/scrubbed by the upstream worker, +attachment hashes + names) and emits Initial-Access / Phishing / +Resource-Development techniques per Appendix A.6. + +PII discipline (TTP_TAGGING.md §"Hard parts §6") is enforced at the +lifter layer: emitted ``TTPTag.evidence`` only carries fields that +conform to :class:`~decnet.web.db.models.ttp.EmailEvidence` +(``body_sha256``, ``matched_headers`` — names not values, +``rcpt_domain_set`` — domains not addresses, ``attachment_sha256s``, +``rcpt_count``) plus a small set of match-discriminator strings +(``matched_kit``, ``matched_trigger``, ``matched_url``). Raw From / +Return-Path / RCPT addresses, raw body bytes, and decoded payload +previews NEVER appear in evidence. """ from __future__ import annotations +import base64 +import binascii +import hashlib +import re +from collections.abc import Callable +from typing import Any, Final + from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import is_active +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleStore from decnet.web.db.models.ttp import TTPTag +Predicate = Callable[ + [dict[str, Any], dict[str, Any]], + "dict[str, Any] | None", +] + + +# ── Helpers ───────────────────────────────────────────────────────── + + +def _domain(addr_or_domain: str | None) -> str | None: + if not isinstance(addr_or_domain, str): + return None + if not addr_or_domain: + return None + if "@" in addr_or_domain: + return addr_or_domain.split("@", 1)[1].lower().strip() + return addr_or_domain.lower().strip() + + +def _safe_evidence(payload: dict[str, Any]) -> dict[str, Any]: + """Build the EmailEvidence-conformant base evidence dict. + + Only PII-safe keys: body sha (already a hash), header NAMES (not + values), recipient DOMAINS (not addresses), attachment hashes, + rcpt count. Raw addresses, raw body, raw header values explicitly + excluded. + """ + rcpt_domains_raw = payload.get("rcpt_domains") or [] + rcpt_domains = [ + d.lower() for d in rcpt_domains_raw if isinstance(d, str) + ] + attachment_hashes = payload.get("attachment_sha256s") or [] + if not isinstance(attachment_hashes, list): + attachment_hashes = [] + body_sha = payload.get("body_sha256") or "" + if not isinstance(body_sha, str): + body_sha = "" + rcpt_count = payload.get("rcpt_count") + if not isinstance(rcpt_count, int): + rcpt_count = 0 + return { + "body_sha256": body_sha, + "matched_headers": [], + "rcpt_domain_set": sorted(set(rcpt_domains)), + "attachment_sha256s": [ + h for h in attachment_hashes if isinstance(h, str) + ], + "rcpt_count": rcpt_count, + } + + +# ── Per-rule predicates ───────────────────────────────────────────── + + +def _p_open_relay( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + threshold = int(spec.get("rcpt_threshold", 10)) + rcpt_count = payload.get("rcpt_count") + if not isinstance(rcpt_count, int) or rcpt_count < threshold: + return None + if spec.get("require_foreign_from"): + from_domain = _domain(payload.get("from_domain") or payload.get("from")) + mail_from = _domain( + payload.get("mail_from_domain") or payload.get("mail_from"), + ) + if not from_domain or not mail_from or from_domain == mail_from: + return None + return {"matched_headers": ["From", "Mail-From"]} + + +def _p_mass_phish( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + threshold = int(spec.get("rcpt_threshold", 25)) + rcpt_count = payload.get("rcpt_count") + if not isinstance(rcpt_count, int) or rcpt_count < threshold: + return None + # The "campaign" half: upstream must have observed body simhash + # recurring across recipients. Without that signal, high-RCPT alone + # is open-relay territory (R0041), not mass-phish. The simhash + # derivation lives in the SMTP worker (out of scope here). + if not isinstance(payload.get("body_simhash"), (str, int)): + return None + return {} + + +def _p_xmailer_kit( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + x_mailer = payload.get("x_mailer") + if not isinstance(x_mailer, str) or not x_mailer: + return None + matched_kit = payload.get("matched_kit") + if isinstance(matched_kit, str) and matched_kit: + return {"matched_kit": matched_kit, "matched_headers": ["X-Mailer"]} + # Catalogue match flag — upstream marks it via xmailer_kit_match. + if payload.get("xmailer_kit_match") is True: + return {"matched_headers": ["X-Mailer"]} + return None + + +_PUNYCODE_PREFIX_DEFAULT: Final[str] = "xn--" + + +def _p_idn_url( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + prefix = spec.get("punycode_prefix") or _PUNYCODE_PREFIX_DEFAULT + if not isinstance(prefix, str): + prefix = _PUNYCODE_PREFIX_DEFAULT + urls = payload.get("urls") or [] + if not isinstance(urls, list): + return None + for url in urls: + if isinstance(url, str) and prefix in url: + # Carry only the punycode-bearing host portion as a match + # discriminator. NEVER carry the full URL (could contain + # credential-harvest path with PII). + host = _extract_host(url) + return { + "matched_url_host": host or "", + "matched_headers": ["body"], + } + return None + + +def _extract_host(url: str) -> str | None: + m = re.match(r"https?://([^/]+)", url) + if m: + return m.group(1).lower() + return None + + +def _p_sender_masquerade( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + signals_raw = spec.get("signals", []) + if not isinstance(signals_raw, list): + return None + signals = {s for s in signals_raw if isinstance(s, str)} + matched: list[str] = [] + if "from_returnpath_mismatch" in signals: + if ( + _domain(payload.get("from_domain")) is not None + and _domain(payload.get("return_path_domain")) is not None + and _domain(payload.get("from_domain")) + != _domain(payload.get("return_path_domain")) + ): + matched.append("from_returnpath_mismatch") + if "from_mailfrom_mismatch" in signals: + if ( + _domain(payload.get("from_domain")) is not None + and _domain(payload.get("mail_from_domain")) is not None + and _domain(payload.get("from_domain")) + != _domain(payload.get("mail_from_domain")) + ): + matched.append("from_mailfrom_mismatch") + if "dkim_fail" in signals and payload.get("dkim_signed") is False: + matched.append("dkim_fail") + if "spf_fail" in signals and payload.get("spf_pass") is False: + matched.append("spf_fail") + if not matched: + return None + headers: list[str] = [] + if any("from_" in m for m in matched): + headers.extend(["From", "Return-Path"]) + if "dkim_fail" in matched: + headers.append("DKIM-Signature") + if "spf_fail" in matched: + headers.append("Authentication-Results") + return { + "matched_signals": matched, + "matched_headers": sorted(set(headers)), + } + + +def _p_malicious_attachment( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + triggers_raw = spec.get("triggers", []) + triggers = ( + {t for t in triggers_raw if isinstance(t, str)} + if isinstance(triggers_raw, list) + else set() + ) + if "office_macro" in triggers and payload.get("attachment_macros") is True: + return {"matched_trigger": "office_macro"} + if ( + "protected_archive" in triggers + and payload.get("attachment_password_protected") is True + ): + return {"matched_trigger": "protected_archive"} + if "html_smuggling" in triggers and payload.get("html_smuggling") is True: + return {"matched_trigger": "html_smuggling"} + if "mal_hash_match" in triggers and payload.get("mal_hash_match") is True: + return {"matched_trigger": "mal_hash_match"} + extensions = payload.get("attachment_extensions") or [] + if isinstance(extensions, list): + ext_set = { + e.lower().lstrip(".") for e in extensions if isinstance(e, str) + } + for ext_trigger in ("lnk", "iso", "img"): + if ext_trigger in triggers and ext_trigger in ext_set: + return {"matched_trigger": ext_trigger} + return None + + +def _p_bec( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + subject = payload.get("subject") + body_text = payload.get("body_text") + if not isinstance(subject, str) or not isinstance(body_text, str): + return None + subj_kws = spec.get("subject_keywords", []) + body_kws = spec.get("body_action_keywords", []) + if not isinstance(subj_kws, list) or not isinstance(body_kws, list): + return None + subj_l = subject.lower() + body_l = body_text.lower() + subj_hit = next( + (k for k in subj_kws if isinstance(k, str) and k.lower() in subj_l), + None, + ) + body_hit = next( + (k for k in body_kws if isinstance(k, str) and k.lower() in body_l), + None, + ) + if not subj_hit or not body_hit: + return None + return { + "matched_subject_kw": subj_hit, + "matched_body_kw": body_hit, + "matched_headers": ["Subject"], + } + + +_BASE64_RE = re.compile(r"[A-Za-z0-9+/]{32,}={0,2}") + + +def _p_encoded_payload( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + min_bytes = int(spec.get("min_bytes", 4096)) + body_text = payload.get("body_text") + if not isinstance(body_text, str) or not body_text: + return None + # Upstream may pre-compute the largest decoded base64 length. + body_b64_bytes = payload.get("body_base64_bytes") + if isinstance(body_b64_bytes, int) and body_b64_bytes >= min_bytes: + return {"encoded_byte_count": body_b64_bytes} + # Fallback: best-effort scan of the body text. Cap the work at the + # first match >= threshold to avoid quadratic behavior on a hostile + # body. Decoded bytes are NEVER returned — only the count. + for m in _BASE64_RE.finditer(body_text): + chunk = m.group(0) + try: + decoded = base64.b64decode(chunk, validate=True) + except (binascii.Error, ValueError): + continue + if len(decoded) >= min_bytes: + return {"encoded_byte_count": len(decoded)} + return None + + +_PREDICATES: Final[dict[str, Predicate]] = { + "lifter:email_open_relay": _p_open_relay, + "lifter:email_mass_phish": _p_mass_phish, + "lifter:email_xmailer_kit": _p_xmailer_kit, + "lifter:email_idn_url": _p_idn_url, + "lifter:email_sender_masquerade": _p_sender_masquerade, + "lifter:email_malicious_attachment": _p_malicious_attachment, + "lifter:email_bec": _p_bec, + "lifter:email_encoded_payload": _p_encoded_payload, +} + + +# Allowed keys in TTPTag.evidence for source_kind=email. Used both as +# the assembly contract here AND by tests/ttp/test_email_lifter.py to +# guard against a future predicate accidentally leaking PII. +_EMAIL_EVIDENCE_ALLOWED_KEYS: Final[frozenset[str]] = frozenset({ + # EmailEvidence base + "body_sha256", + "matched_headers", + "rcpt_domain_set", + "attachment_sha256s", + "rcpt_count", + # PII-safe match discriminators + "matched_kit", + "matched_trigger", + "matched_url_host", + "matched_signals", + "matched_subject_kw", + "matched_body_kw", + "encoded_byte_count", +}) + + +def _filter_evidence(evidence: dict[str, Any]) -> dict[str, Any]: + """Drop any key not in the PII-safe allowlist. + + Defense-in-depth: even if a predicate accidentally returns a raw + address or body field, this filter strips it before the tag is + constructed. Asserted by ``test_email_lifter.py``. + """ + return { + k: v for k, v in evidence.items() + if k in _EMAIL_EVIDENCE_ALLOWED_KEYS + } + + class EmailLifter(TolerantTagger): name = "email" HANDLES = frozenset({"email"}) + OWNED_PREFIX: Final[str] = "lifter:email_" + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX) + + async def watch_store(self) -> None: + await self._index.watch(self._store, predicate=self._owns) async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: - return [] + out: list[TTPTag] = [] + base_evidence = _safe_evidence(event.payload) + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + kind = rule.match_spec.get("kind", "") + handler = _PREDICATES.get(kind) + if handler is None: + continue + extra = handler(rule.match_spec, event.payload) + if extra is None: + continue + evidence = dict(base_evidence) + # Allow predicates to extend matched_headers without + # clobbering the base list. + extra_headers = extra.pop("matched_headers", None) + if isinstance(extra_headers, list): + merged = list(evidence.get("matched_headers", [])) + merged.extend(h for h in extra_headers if isinstance(h, str)) + evidence["matched_headers"] = sorted(set(merged)) + evidence.update(extra) + evidence = _filter_evidence(evidence) + # Body sha is required by EmailEvidence; if upstream + # didn't supply one, derive from body_text (best-effort). + if not evidence.get("body_sha256"): + body_text = event.payload.get("body_text") + if isinstance(body_text, str) and body_text: + evidence["body_sha256"] = hashlib.sha256( + body_text.encode("utf-8", errors="replace"), + ).hexdigest() + out.extend(emit_tags(rule, event, evidence)) + return out __all__ = ["EmailLifter"] diff --git a/decnet/web/db/models/.attackers.py.swp b/decnet/web/db/models/.attackers.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..876c5e5e7222cad4b61ef6709e317d56b55829f7 GIT binary patch literal 28672 zcmeI4e~@HXRmYnUehEPV0kIS=J(|oc(>*)0*%&qsFuOb1on&`r-I;|DYUz31{d)Rk zy5DQwd%d$A!Vd*eEVVQuVxeV~MX*E(RThZA9~MEdG-44ziz=lm21FntP$0-}Kj+;0 zUcZ^1oy;1s3ah*F&Fpl)zVE(s&pkiB=iI3ueBsh*^W5rOiPw*mN}oOU)!+CgvcD__MlupQGT|0%9GFS|EuFG#}XJzU@U>L z1jZ5=OJFR4u>{5v7)#&>UIJS$01kvE%2J?)MY!=gN-HALV|($^Cre zj?W+Men0Mh-oNAXZ-39ej=J}M%1t1f$0ObQ&vidrJ3e=BkAI9MFqXhr0%Hk`B`}u2 zSOQ}Sj3qFZz*quf35+E$mcT9&@S3I4wY>jY$)~CRXZim-f4EfoRp5grSOkv&4?M9{ z`WNt5-~-?u@GS6?;J&L$rC$WA;4$EVCzMKW1b2W2sDo#K4?VtA>Vjv2zy6_8sSmCJ zSA(Ah-+NrC^mTAI*Z>AR3w+|SrP6PLW$<|L(Z_I4@av!ty5KOl3VZ>=;e+5E;8kD~ z_~3f*1n_AHi}!*%!2g0nJ_o4t}L5QkfbkZ295N*dEnJ= zF!iV(rUTOo!VN#}#X*=R`UUw?({FkGPC8$H$@%iMNxWJk>h!ze{4xD8SuTEw3u;G~ zPSn;GZ&{l*VZYPy>K%XnMz53j)7wUx((#Mm-DSRw*l+rDdUzdf$_$SfW|luvHnD?t zEc*=mxG+d+Y3yB~E5z#@;nAbc$dEc=JFLpAOn1rVpmK25Jn*_Zc@F*MQ1O?;?h3W!RpVQGLy5{nbX11 zDbtE#9knGJFB_%N=y&@aFZIo2QR%1i?KCeurNvk^yICPHO(;H<0Z znuLkA{50tLRb$R^-RZ^SOXp6@?lV!S1FEj>V^g?C-^Z2y1-9fquIqS7T1$LCJYt$W zx6CqE*#Kc)oF6K>D(ZeOwtK_y8V+NnC+{JBKiCLsoBm+FoJpz{s2xdJqNaSt2VQAd~tbg zY3;?evx}=Mr_QY{tt?l&&Gq$MXVZ4<`<0G-1#ih~`P`{fW~&`^d{wp&KlYQf?S-@M|0;iCH#FXA%re)a+yO_!Y?}hR#U0urhGek_jj5e3@M0^LS~{ z;7V2siIfWxHo9e}%O;Fcv`fUHPj!ZeT{>e994K6S;DFq$Y8Ejz1Jfi-z}lrIiTW`M z$<cPfR9;Ni5=N>A{>fs@!(3A3=l7K3@~Y@AiT?Xm}k{?)qMc zd|Ct)0~7jNA~!tY(C0qXpcAGWH1UKQ8X?If5Q#_Ow3$Q064U5JD5OcF%a~v61qu4l zileTXwbq&+2&~gF4KHnQ7vq@@GLKrNn~&F4*A~vM)mCn-EgX}LS6e!MYO%I@#I!mQ zerDdxRS!yh7BpGG4X?-Y^0vl3jKhjv?sFqSSwyxbn)Sfz?_xlnv?KdVHkIr+S>G60 z@)-3#jk2lC^UT7z)y3M{$))9+Yo`}(DNb>=dKgJhn5Bg^^Rsrm`JwL3FxxOCn5iUN z0ncxGUO(}NLmoYMc6Dv%kO!-CqeFK4Nu%Rex_&oOE!>E`UYk}!vw&)8cK(`b9ZS(y+MWViqN{=274~-ck4ux(-Kd(8;WofG@$F5X4F8)tU+Sx z7~(DF5OTW~yIE|be-Zg>0N(Us{1Z#V&|XuNWmMZ7e+9T6e3RV%Q{Y|Tm%tjh z3fxPM|I6SMcog^qx&1xhcR(B52p$g}1+D~-1ph=_^fB-&;Pqf0e2tv{ZqNeH1M}bt z@LqEM2wV-G3GOB5|21$N7;rCn{@XwUoCi+;pC`9}AE<&!a1FQ`TminqGfLk7aUjn- z&3oh5SOQ}Sj3w{`B>{<>B*;0U9z+>IA(y5NUgiAxl6)MsTBKJ>z3K0rpc@o^P03xL z(ZqRePXUs%e$vCU`IEW-n#vM@VG6&~15W1IZ_Os^rreCW_(o_b$&lyE{g>5}ZrUSh zjwJ!9WzMSRuA7=J{WxnN+46Vj1S4_IU(exR^1if)72lV~bXjwWCreLpmw%Vb zA!qZKT)qb%IcuMeV0B|7_BVv{(W}WXglm=HG;Z|L8bR6&F;|j!8-C4eH2j`~WHE7C zFCbYbg6sB#J_=kqJQSMaENU&y9e1y>34=Ynt zx^n5sPB*02u&5hC*bvZ^zJPvRim=f<%!+~tOF}uT%YO6uWkcrFj`}HiK`%<=vjfVQ zki%qW8Ohq3mLO5Kpc2Ip2kt}D@x4v-Nj1&)Bo|UP14>fT4&wVQ)UZrc=bJ&)CwBKk zUGDY5<0%n0X(u?fZirAx!eXBUDwKq2n^HsQFky4MAKTP zhf?hk!UzT(tKbIWCYHx(^1i1Jzj}J@Or8Wgt25+wR}h0@i_)kg{IYO~Jb#m+9|mrj`DehjTgVL%+4 z6T)V**7Yt@uOYjx!OiS>w+W$D{BVP9D5UFD6`G*%T{dE@iN5kXvPsCU$mnIs6BNyh zt(H%)o;`F)TtPEvDPJIJrx{s4MiuJ}BP?JLZkh`yTqEs!9ev;lOT8qPXRLFoTK7R5 z)cbg&*~%eNSJC7V0tl~@x*@!dObN3A&71lVFJ<}t5E-DbF7^&(qbYM)d1G_kLHD5> z;Z&r%Qt2YcS-6#>F6Ou!r7wrms8}A#dQ2S#*;~2e2-Di|%EB7#T*U2PQ337y>N=uD z?{aP0Ikv8DMw($+I@xMrir@>NF)%|7)9ypa!r6+U^-K$6VAsh#o@CR)4MMXRUK8fn zTyCjhBZV~&xe1Hq`9G<4>3QPz z$p269Ltbwo@9%)003XLSz81`bFO%or1sdQ0_#8R?TfuqoRPa&q`Mbew;3Rki_!c+# zCin)p7kmKR4z30NNnZa|@OE$;m;|38xBorxX0QgH3cf^c|3&b25P}ML8h9f3JM#Fq zfB}&D{+G$^KMd{yEwB!L4BQ9L@ea@ib?^f44Ltr=z}=t+X2G`z4c-H!#=i_62R?*+ zegTM_&WWt_CE}hilMNi3@bl;Z%a(0_Du=059L-G&hh|PLtgfESNKjWWNaZgfldS~)tS<-sh>;1;dmB4I0V{5e?0pomL)LrJM z&r&E$?4#Z|0g0a#r?%RDC{-ue(#>S1FRbY%w>{SO;nka7(ym8dEU~udA}KNZVH9`W zd^9o;k9p(G*EI8@hB#+NIW0^bVBWUoq?2XOwj@kxAhq0hm?_)SEOW~ z>Uv~r4#S=y1-qj{2Hi{5hu{NKUU>RANGWuhlfi~;(0K!kE;)#upje`8k(OMwipq%8 zL|DK?X+SmV+Xk(gvmOg-UUG2S%$~nNdpq>GAC9*l$x8Y?Nn!T#{Qs-Bqrc=KZkMM? z8jR%Ynk8D7N-1{cG{a=-;f*u$^z*jKWLsj?aJAB15+kdr?(>JLZ%;_kV_ShY3&XWU zsAExMx`@L|q8;qnq}AjQ&duS@1eH3VO9$j+>vW+jdOiHe=@48Ho7O# z)d6g+r6x^1Xz}iSLM|^KO)K}A0A}HIp_#xI=q2a-Jk~xp=KOAWN$4y;dr^=;gdmuP zPiV?wRKynzHR7n=-8TUx^^3Y8}-hib}I|CiSCQxEBNvyV{ z?1&v^pnRDw_|}&=FOj8!iH#T5*SAZ}meRv0tb{)PK9Hsa8{RcND(4}k<{NWYV4rYk zbF{2T*|^)Ij|?@SNS9@)5~N_C?9D1#O*?ZIS4S}x6FrN^BN(FUGr*hZ*K?buPbnq8 z{63Xl)YCQ^@=DuYBM+Q4n%Nq@)eQL~LT*4i*F5~M#BGYAU9%c=PvQ!yX8(N>cHy*K zZ&Pd>^0ZNpJWYHwppx5^U4t>D)H8<@?{w%%E1c|EuGI~oKEu2Q{}gz+pHN$tw=;!q)QV7ix7urH!xgiO>Yv6Vp6 zicRORzP{k@yuN;f<*_jd1Mg^4*r^1z=i>(3G=ekH`g764IZGh@Abm=WIt_JB#XAfJ<;U?#SzK^qrfV9TF@BTG%!I4<7xkcyA3+smj0H4 zOOSQZj+{)#t)~gXR_s|y3BD#izPxIy>TbGnzr?TbwwXM$c6LfB;b~dE3I*xB%P0%X zq=iOG()0kOwi!|$ zvn)G)Tip~+U*lgI)f%+$VAzPOwIfMd5E}iM#)ZrUL2|Dy8$YB^r!8cMnH(Buw@vdk zP}4#i0)M8h%Hvnf;YT!3UB0k3{3z&+h-*hT?Xa0UYXFyWJ?35eW@oeyROcaEDbw#u zwf<&9Tu$$=)c;B2OQ$6aBLBaMAM*MndH%gtx z@j&_j{t_NQdj4+((?Dwe{{a3Jtb<#@0q|M!|F?pdf(m##ct837`@kLGI`ALl{_g`R zI03#(zW-m~9bgkIfh)mx$@{+pz775!d7n`Mpi4`^;mlJ;C}wmN&Tda#^tc=dkPU=$+-?Fh$Ecs&U~x5pf??!q%P zI#P=&+4-!7%0pIi;&^z4i!oaC>q!JFXAdy=kd)$LuAMu4$lUWs*H-5!&dE?abXWJL zc!W*=V$JJpL?p27u9zWs19$=(p-7OZgX$16%xG`D44_X1tF?t>c5AKQtM_-?_b+8V z^!{W%p+v~^Y2r64?WkdHUc7~34D}6sO-_DDgl}sOYH2NdBOzkuU$spX8N-jrhUKHA zT`LS=yEkExU0;?C!_#H`{7Rm%IJ89e~scq0jSS$so78a@o6J})<6Tf!sjASPSKE*|uq|vs+ zj_UVZ*lxFmU980Uco#%#{bu`Accz@2)qtZJ>05UxydCb6ed4vOP0OaEQnr2&gR|3~ zWa+2|+{6R4dnQvFYe}Z5os*)h5)a_LLOD1>CN?biWofJH3Ddp-r77#2kg&uJ5lPd> zI7!4z?qSUq@r1@0PC`!gteV|S(Obfpkvv3;ctToX{jxw@<8_-ThSX39-pL$o4zebM z_lCJTtB**>TGpW^K~AAmHf?T;l)R$^l8;QRBD{dzlcOoXQTNQ3nF2U=X4Fg(Na<0Q z-OQ6}v0m^H_Q#`eL`4f5Ti%2R3C+TpwaP8GWXUr9-enhto6}3H1Rk=eW_Iq-wL`~% zCn5QZ;)>s-BibhX8~{LMUFjCbx`|nqh|~5SF9l#49RIPF7bghEQ_RF^HF1%D0 z=@(5{Cc(A4vmF3e zdG6fOaTS$P_OTobkQ;b?#A>9svy*SMDDF{`5pBKk=8Ur+PM+pWzNg*j3@bWr6`t8^ zN3#yYH4q1K=)sop+-z0}7wDNrcEuZ4&5MK$S1IHK`LH0JZrQ^9a0y4%m3mq;&>n2; zeOH_pmfUujRW;NUyplv?r~kG-B%+Srx-mAD7oL z)EvnFWZg>rsXZsEtU984j4Fe44 zNEh{}!yO(wDqdF*{OAVT|E(PZmMO{JCT`^7Jtv{9@Pljc29!;uok9EI#cfbv&(_2e zdh9)knNND~i{XYG`jGe$+eQeZhmY7hvL-AEr81ZHx*^Uu4$pCe_Eus7ue-J)T^}M; z?wJ>^P_eduF$+xXMD{PQw3E~te#|kswg-2z*J|y!^dV$rNkpYiw48>f`LLmO`{>qD zTQ7Yd+X{f0Bt@>BRz-%v5Xux069qw`*L17EZHDC6mhqR}h?T;*IG(m5bu1^*n-A;6#EKD=GvxnQfqy2? z|3~oK;52v!_!xQq>%sHE*U0bR2{yq5xSw$QP2eo}QE&sf{bRxX$l~4LH$VVHHY=d` zGQ4mfSR+5Kb)c2P#-P?ae6X+D)r_wc?h+uvwO%30CDAxg$oA2-U`JVs#@twty6)wIcT$13#qyH|HlmF1F z@~iM2JsPx{+D@6-;^H^u-<+^zGs}0%(sk)F&N4I(g~LAK9}bERQi_GCl($Rroe8bQ zis!P&L@DZM&}NT(nKFJ-Pm&4yu=7-}3xnJ^GFtH^hv+E#$T|0cv|{QZb)g_Q!k6kS zr7~DJyK0Z>6^GC=m6%~in!;fOA#$i-pM6i7C()oVt>)QUNxnUy-JH_wL}v{7+z}B3 z911(*KxUlsUe&}LJGpR{bC-)}S2YV*TROe8e8LSR!xdlB-eN;+T*z(IP-%GSY)ZtA zE_1SuvROk8(=n%#d~fuTC}K$Lf!pXOZTqbbo#34rpU7QKyZN*Ll6DQ`IBhdRd-!m60Q>*ljxA=jd)L`yCPn@J zzgx8JZ3BBYTU;tsL5YTJ7kZsk<&QFm2D!frp(5vl?M*7|za2Rtf!*HB8bs#K7lDjZ z07={RP9xl!yQ1b2N6V}|3BUdCxc8z5m%%e;O;3?S9N5vP@;-<>_m{LIBkS;p%HUwc zrBW~Fdh@uY>Hh&18B$rdu!6dOOh|$`l!FVlN@yESlu;RG4J93m!RR-Y6SLMiP>v*O z!Vt|2uQXy^>m^--0Bil(o?l#aA)VvF4h*#9ba1GPhz+~2Qku0 zVg)ZJtz|CSJCGGTHL2v>aApd4KylKnDa{x`hm2LahNZm^UoYqTC+5(19cP8LcZ-Ig zrKqpglr?gxjMFHZAY#9AVOG~3>Dm(y@R@bH)@KUmb9dnu3h9kq%RZB^$CQWVy1lWiW?dW~Xd{b*L>uFf)shd9-d8Pb}b zJ1pcQIwxDm$)+qiWJebVe}{x*`R2MqQrb`%**i=g(5ip&F g%w$yQrHp8+xBNW=;khrx2JXetjt(*^XUE0=H?m3^iU0rr literal 0 HcmV?d00001 diff --git a/tests/ttp/rule_precision/corpus/seed_email.jsonl b/tests/ttp/rule_precision/corpus/seed_email.jsonl index b6034d59..474854cc 100644 --- a/tests/ttp/rule_precision/corpus/seed_email.jsonl +++ b/tests/ttp/rule_precision/corpus/seed_email.jsonl @@ -1,3 +1,9 @@ -{"source_kind": "email", "payload": {"subject": "Urgent wire transfer needed", "from": "ceo@victim.example", "return_path": "evil@bad.example", "rcpt_count": 1, "body": "Please send $50k to the attached account immediately."}, "expected_rule_ids": ["R0047"], "label": "bec_wire"} -{"source_kind": "email", "payload": {"subject": "Newsletter", "from": "marketing@legit.example", "rcpt_count": 1, "body": "Hello world."}, "expected_rule_ids": [], "label": "negative_newsletter"} -{"source_kind": "email", "payload": {"subject": "Win a prize", "from": "promo@evil.example", "rcpt_count": 250, "body": "Click here http://evil.example/win"}, "expected_rule_ids": ["R0042"], "label": "mass_phish"} +{"source_kind": "email", "payload": {"subject": "Urgent wire transfer needed", "from_domain": "ceo@victim.example", "return_path_domain": "evil.example", "rcpt_count": 1, "body_text": "Please send 50k transfer to the attached account immediately confidential."}, "expected_rule_ids": ["R0045", "R0047"], "label": "bec_wire"} +{"source_kind": "email", "payload": {"subject": "Newsletter", "from_domain": "marketing@legit.example", "rcpt_count": 1, "body_text": "Hello world."}, "expected_rule_ids": [], "label": "negative_newsletter"} +{"source_kind": "email", "payload": {"subject": "Win a prize", "from_domain": "promo@evil.example", "rcpt_count": 250, "body_text": "Click here", "body_simhash": "abc123", "urls": ["http://evil.example/win"]}, "expected_rule_ids": ["R0042"], "label": "mass_phish"} +{"source_kind": "email", "payload": {"rcpt_count": 50, "from_domain": "victim.example", "mail_from_domain": "evil.example"}, "expected_rule_ids": ["R0041", "R0045"], "label": "open_relay"} +{"source_kind": "email", "payload": {"x_mailer": "PHPMailer 6.0 (kit-X)", "matched_kit": "kit-X"}, "expected_rule_ids": ["R0043"], "label": "xmailer_kit"} +{"source_kind": "email", "payload": {"urls": ["https://xn--80ak6aa92e.com/login"]}, "expected_rule_ids": ["R0044"], "label": "idn_url"} +{"source_kind": "email", "payload": {"from_domain": "ceo@victim.example", "return_path_domain": "evil.example"}, "expected_rule_ids": ["R0045"], "label": "from_returnpath_mismatch"} +{"source_kind": "email", "payload": {"attachment_macros": true, "attachment_sha256s": ["d"]}, "expected_rule_ids": ["R0046"], "label": "macro_attach"} +{"source_kind": "email", "payload": {"body_text": "see body", "body_base64_bytes": 8192}, "expected_rule_ids": ["R0048"], "label": "encoded_body"} diff --git a/tests/ttp/rule_precision/test_email_rules.py b/tests/ttp/rule_precision/test_email_rules.py index af27e2d9..b4136161 100644 --- a/tests/ttp/rule_precision/test_email_rules.py +++ b/tests/ttp/rule_precision/test_email_rules.py @@ -48,7 +48,41 @@ async def test_lifter_bound_inert_in_v0( ) +def _build_lifter() -> "EmailLifter": + from decnet.ttp.impl.email_lifter import EmailLifter + from tests.ttp._stub_store import StubRuleStore + + rules = [ + _parse_and_compile(Path("rules/ttp") / f"{rid}.yaml", RuleState()) + for rid in _RULE_IDS + ] + lifter = EmailLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + @pytest.mark.parametrize("rule_id", _RULE_IDS) -@pytest.mark.xfail(strict=True, reason="impl phase E.3.12 (EmailLifter)") -def test_email_rule_precision(rule_id: str) -> None: - pytest.fail(f"{rule_id}: EmailLifter not yet shipped (E.3.12)") +def test_email_rule_precision( + rule_id: str, + corpus_loader: CohortLoader, +) -> None: + """E.3.12 — drive EmailLifter over the labelled corpus and assert + per-rule precision. R0041–R0048 are all H-band (≥0.85) → ≥95%. + """ + import asyncio + + from tests.ttp.rule_precision.conftest import precision_for + + rows = corpus_loader("email") + if not rows: + pytest.skip("no email corpus available") + lifter = _build_lifter() + fired: dict[str, list[str]] = {} + for row in rows: + tags = asyncio.run(lifter.tag(make_event(row))) + fired[row.label] = [tag.rule_id for tag in tags] + precision, _tp, _fp = precision_for(rule_id, rows, fired) + assert precision >= 0.95, ( + f"{rule_id} precision {precision:.2f} < 0.95 on email corpus" + ) diff --git a/tests/ttp/test_email_lifter.py b/tests/ttp/test_email_lifter.py new file mode 100644 index 00000000..78e4e433 --- /dev/null +++ b/tests/ttp/test_email_lifter.py @@ -0,0 +1,305 @@ +"""Per-rule unit tests for :class:`EmailLifter` (E.3.12). + +Pins R0041–R0048 predicates and the EmailEvidence PII discipline: +emitted ``TTPTag.evidence`` MUST NOT contain raw addresses, raw body +bytes, or full URLs (only hashed / domain / matched-discriminator +forms are permitted). +""" +from __future__ import annotations + +import asyncio +import hashlib +import logging +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl.email_lifter import ( + _EMAIL_EVIDENCE_ALLOWED_KEYS, + EmailLifter, +) +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + +def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule: + return _parse_and_compile( + _RULES_DIR / f"{rule_id}.yaml", state or RuleState(), + ) + + +def _ev(payload: dict[str, Any]) -> TaggerEvent: + return TaggerEvent( + source_kind="email", + source_id="src-email", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload=payload, + ) + + +def _make_lifter(rule_ids: list[str]) -> EmailLifter: + rules = [_compile(rid) for rid in rule_ids] + lifter = EmailLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +# ── Per-rule positives ───────────────────────────────────────────── + + +def test_open_relay_fires_on_high_rcpt_foreign_from() -> None: + lifter = _make_lifter(["R0041"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 50, + "from_domain": "victim.example", + "mail_from_domain": "evil.example", + "rcpt_domains": ["target1.example", "target2.example"], + "body_sha256": "a" * 64, + }))) + techs = {tag.technique_id for tag in out} + assert {"T1496", "T1586"} <= techs + + +def test_open_relay_no_fire_on_matching_from() -> None: + lifter = _make_lifter(["R0041"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 50, + "from_domain": "same.example", + "mail_from_domain": "same.example", + }))) + assert out == [] + + +def test_mass_phish_fires_on_threshold_with_simhash() -> None: + lifter = _make_lifter(["R0042"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 100, + "body_simhash": "abc123", + }))) + techs = {tag.technique_id for tag in out} + assert "T1566" in techs + + +def test_mass_phish_no_simhash_no_fire() -> None: + """High RCPT alone is open-relay territory; campaign needs simhash.""" + lifter = _make_lifter(["R0042"]) + out = asyncio.run(lifter.tag(_ev({"rcpt_count": 100}))) + assert out == [] + + +def test_xmailer_kit_fires_with_match() -> None: + lifter = _make_lifter(["R0043"]) + out = asyncio.run(lifter.tag(_ev({ + "x_mailer": "PHPMailer 6.0 (kit-X)", + "matched_kit": "kit-X", + }))) + techs = {tag.technique_id for tag in out} + assert {"T1566", "T1588"} <= techs + + +def test_xmailer_kit_no_match_no_fire() -> None: + lifter = _make_lifter(["R0043"]) + out = asyncio.run(lifter.tag(_ev({"x_mailer": "Outlook 16.0"}))) + assert out == [] + + +def test_idn_url_fires_on_punycode() -> None: + lifter = _make_lifter(["R0044"]) + out = asyncio.run(lifter.tag(_ev({ + "urls": ["https://xn--80ak6aa92e.com/login"], + }))) + techs = {tag.technique_id for tag in out} + assert {"T1036", "T1566"} <= techs + + +def test_sender_masquerade_from_returnpath_mismatch() -> None: + lifter = _make_lifter(["R0045"]) + out = asyncio.run(lifter.tag(_ev({ + "from_domain": "ceo@victim.example", + "return_path_domain": "evil.example", + }))) + techs = {tag.technique_id for tag in out} + assert "T1036" in techs + + +def test_sender_masquerade_dkim_fail() -> None: + lifter = _make_lifter(["R0045"]) + out = asyncio.run(lifter.tag(_ev({ + "dkim_signed": False, + }))) + techs = {tag.technique_id for tag in out} + assert "T1036" in techs + + +def test_malicious_attachment_macro() -> None: + lifter = _make_lifter(["R0046"]) + out = asyncio.run(lifter.tag(_ev({ + "attachment_macros": True, + "attachment_sha256s": ["b" * 64], + }))) + techs = {tag.technique_id for tag in out} + assert {"T1204", "T1566"} <= techs + + +def test_malicious_attachment_lnk_extension() -> None: + lifter = _make_lifter(["R0046"]) + out = asyncio.run(lifter.tag(_ev({ + "attachment_extensions": [".lnk"], + "attachment_sha256s": ["c" * 64], + }))) + techs = {tag.technique_id for tag in out} + assert {"T1204", "T1566"} <= techs + + +def test_bec_subject_and_body_match() -> None: + lifter = _make_lifter(["R0047"]) + out = asyncio.run(lifter.tag(_ev({ + "subject": "URGENT wire transfer needed", + "body_text": "Please send $50k immediately, this is confidential.", + }))) + techs = {tag.technique_id for tag in out} + assert "T1566" in techs + + +def test_bec_no_body_action_no_fire() -> None: + lifter = _make_lifter(["R0047"]) + out = asyncio.run(lifter.tag(_ev({ + "subject": "URGENT review", + "body_text": "Please review the attached doc.", + }))) + assert out == [] + + +def test_encoded_payload_fires_on_precomputed_count() -> None: + lifter = _make_lifter(["R0048"]) + out = asyncio.run(lifter.tag(_ev({ + "body_text": "small body text", + "body_base64_bytes": 8192, + }))) + techs = {tag.technique_id for tag in out} + assert {"T1071", "T1027"} <= techs + + +def test_encoded_payload_below_threshold_no_fire() -> None: + lifter = _make_lifter(["R0048"]) + out = asyncio.run(lifter.tag(_ev({ + "body_text": "small body", + "body_base64_bytes": 100, + }))) + assert out == [] + + +# ── PII discipline ───────────────────────────────────────────────── + + +def test_evidence_keys_subset_of_email_evidence_allowlist() -> None: + """No predicate may leak raw addresses, body bytes, or full URLs.""" + lifter = _make_lifter([ + "R0041", "R0042", "R0043", "R0044", + "R0045", "R0046", "R0047", "R0048", + ]) + payloads = [ + { + "rcpt_count": 50, + "from_domain": "ceo@victim.example", + "mail_from_domain": "evil.example", + "return_path_domain": "evil.example", + "rcpt_domains": ["a.example"], + "x_mailer": "Outlook 16", + "matched_kit": "kit-Y", + "urls": ["https://xn--example.test/path?id=secret"], + "dkim_signed": False, + "spf_pass": False, + "attachment_macros": True, + "attachment_extensions": [".lnk"], + "attachment_sha256s": ["d" * 64], + "subject": "URGENT wire", + "body_text": "please send transfer immediately", + "body_base64_bytes": 8192, + }, + ] + for payload in payloads: + out = asyncio.run(lifter.tag(_ev(payload))) + for tag in out: + disallowed = set(tag.evidence) - _EMAIL_EVIDENCE_ALLOWED_KEYS + assert not disallowed, ( + f"PII leak in {tag.rule_id}: unexpected keys {disallowed}" + ) + + +def test_evidence_carries_no_raw_addresses_or_body() -> None: + lifter = _make_lifter(["R0041", "R0045", "R0047"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 50, + "from_domain": "ceo-direct@victim.example", # full address-shaped + "mail_from_domain": "evil.example", + "return_path_domain": "evil.example", + "subject": "URGENT wire transfer needed", + "body_text": "Send the wire to acct 12345 confidential right now", + "rcpt_domains": ["target.example"], + }))) + assert out + for tag in out: + as_str = repr(tag.evidence) + assert "ceo-direct@" not in as_str + assert "Send the wire" not in as_str + assert "12345" not in as_str + + +def test_body_sha_set_when_upstream_omits() -> None: + lifter = _make_lifter(["R0042"]) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 100, + "body_text": "some body", + "body_simhash": "abc", + }))) + assert out + expected = hashlib.sha256(b"some body").hexdigest() + for tag in out: + assert tag.evidence["body_sha256"] == expected + + +# ── State + tolerance ────────────────────────────────────────────── + + +def test_disabled_email_rule_no_emit() -> None: + rule = _compile("R0042", RuleState(state="disabled")) + lifter = EmailLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag(_ev({ + "rcpt_count": 200, "body_simhash": "abc", + }))) + assert out == [] + + +def test_empty_payload_no_errors(caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.DEBUG) + lifter = _make_lifter([ + "R0041", "R0042", "R0043", "R0044", + "R0045", "R0046", "R0047", "R0048", + ]) + out = asyncio.run(lifter.tag(_ev({}))) + assert out == [] + assert not [r for r in caplog.records if r.levelno >= logging.ERROR] + + +def test_owns_only_email_prefix() -> None: + behavioral = _compile("R0031") + email = _compile("R0041") + lifter = EmailLifter(StubRuleStore(compiled=[behavioral, email])) + asyncio.run(lifter._index.hydrate_from( + lifter._store, predicate=lifter._owns, # type: ignore[arg-type] + )) + assert lifter._index.get("R0041") is not None + assert lifter._index.get("R0031") is None diff --git a/tests/ttp/test_lifter_absence.py b/tests/ttp/test_lifter_absence.py index eb87fba6..a9df3b64 100644 --- a/tests/ttp/test_lifter_absence.py +++ b/tests/ttp/test_lifter_absence.py @@ -42,7 +42,9 @@ def _make_lifter(cls: type[TolerantTagger]) -> TolerantTagger: Implemented lifters (E.3.9–E.3.12) take a :class:`RuleStore`; the still-empty IdentityLifter / CredentialLifter (E.3.13) take no args. """ - if cls in {BehavioralLifter, IntelLifter, CanaryFingerprintLifter}: + if cls in { + BehavioralLifter, IntelLifter, CanaryFingerprintLifter, EmailLifter, + }: return cls(StubRuleStore()) # type: ignore[call-arg] return cls() diff --git a/tests/ttp/test_lifters.py b/tests/ttp/test_lifters.py index fbf5c858..d43ab57b 100644 --- a/tests/ttp/test_lifters.py +++ b/tests/ttp/test_lifters.py @@ -24,7 +24,9 @@ from tests.ttp._stub_store import StubRuleStore def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger: - if cls in {BehavioralLifter, IntelLifter, CanaryFingerprintLifter}: + if cls in { + BehavioralLifter, IntelLifter, CanaryFingerprintLifter, EmailLifter, + }: return cls(StubRuleStore()) # type: ignore[call-arg] return cls()