From 291b78c1d0ac10863ca8ada7b6f923c6b474ba4a Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 2 May 2026 19:08:37 -0400 Subject: [PATCH] feat(smtp): extract body_simhash + base64-bytes + html-smuggling + per-attachment macro/encrypted MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Heavyweight Layer-2 extractors land alongside the cheap projections shipped in commit e9324aca, so the EmailLifter R0042 / R0046 (macros / password / smuggling lanes) / R0048 fire from the bus payload without the lifter having to reach back to disk. Extractors: * body_simhash — inlined 64-bit Charikar simhash (md5-keyed, frequency-weighted) over word tokens of the union of text/* body parts. Inlined rather than pulling the `simhash` PyPI dep, which transitively brings numpy ~50 MB into a slim decky container; the algorithm is ~15 lines and identical in extraction quality. * body_base64_bytes — largest decoded base64 chunk's byte count, scanning text body parts with the same `_BASE64_RE` the lifter's `_p_encoded_payload` fallback uses. R0048 fires from this scalar alone; the lifter's body_text fallback becomes dead in normal operation. * attachment_macro_indicator — stdlib zipfile sniff for `vbaProject.bin` inside OOXML containers. Catches modern .docm / .xlsm / .pptm and macro-injected .docx; legacy .xls (CFBF) is a follow-up. * attachment_encrypted — flag_bits & 0x01 on any ZIP / OOXML entry's central directory; magic-byte match for 7z / RAR / CFBF (encrypted Office wrap). * html_smuggling — structural lxml parse first: fires when an `` element coexists with a `" + "Download invoice" + "" + ) + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: smuggle", + f"Content-Type: multipart/alternative; boundary={boundary}", + "MIME-Version: 1.0", + "", + f"--{boundary}", + "Content-Type: text/html; charset=utf-8", + "", + html_body, + f"--{boundary}--", + ".", + ) + events = _logged_events(mod) + rec = next(f for t, f in events if t == "message_stored") + assert rec["html_smuggling"] == 1 + + def test_html_smuggling_skips_legit_download_link(self, tmp_path): + """A page with `` but no Blob/createObjectURL + script does NOT fire — the "click to download our report" + FP class is precisely what the structural check excludes.""" + mod = _load_smtp_with_quarantine(str(tmp_path)) + proto, _, _ = _make_protocol(mod) + boundary = "----LEGITDOWNLOAD" + html_body = ( + "" + "

Quarterly report is ready.

" + "
Download" + "" + ) + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: legit", + f"Content-Type: multipart/alternative; boundary={boundary}", + "MIME-Version: 1.0", + "", + f"--{boundary}", + "Content-Type: text/html; charset=utf-8", + "", + html_body, + f"--{boundary}--", + ".", + ) + events = _logged_events(mod) + rec = next(f for t, f in events if t == "message_stored") + assert rec["html_smuggling"] == 0 + + def test_attachment_manifest_carries_macro_and_encrypted_flags(self, tmp_path): + """The attachments JSON manifest now includes per-attachment + macro_indicator + encrypted booleans — the ingester reduces + these to top-level flags at publish time.""" + mod = _load_smtp_with_quarantine(str(tmp_path)) + proto, _, _ = _make_protocol(mod) + boundary = "----MANIFESTBOOLS" + # Build a docm-shaped attachment in-line. + import zipfile as _zf + import io as _io + import base64 as _b64 + zbuf = _io.BytesIO() + with _zf.ZipFile(zbuf, "w") as zf: + zf.writestr("[Content_Types].xml", "") + zf.writestr("word/vbaProject.bin", b"VBA") + encoded = _b64.b64encode(zbuf.getvalue()).decode() + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: macro", + f"Content-Type: multipart/mixed; boundary={boundary}", + "MIME-Version: 1.0", + "", + f"--{boundary}", + "Content-Type: text/plain", + "", + "see attached", + f"--{boundary}", + 'Content-Type: application/vnd.ms-word.document.macroEnabled.12; name="report.docm"', + 'Content-Disposition: attachment; filename="report.docm"', + "Content-Transfer-Encoding: base64", + "", + encoded, + f"--{boundary}--", + ".", + ) + events = _logged_events(mod) + rec = next(f for t, f in events if t == "message_stored") + import json as _json + manifest = _json.loads(rec["attachments_json"]) + assert len(manifest) == 1 + assert manifest[0]["macro_indicator"] is True + assert manifest[0]["encrypted"] is False + def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod): """With SMTP_QUARANTINE_DIR unset, message_accepted fires but no message_stored event and no files are written."""