feat(smtp): extract body_simhash + base64-bytes + html-smuggling + per-attachment macro/encrypted
Heavyweight Layer-2 extractors land alongside the cheap projections
shipped in commit e9324aca, so the EmailLifter R0042 / R0046 (macros
/ password / smuggling lanes) / R0048 fire from the bus payload
without the lifter having to reach back to disk.
Extractors:
* body_simhash — inlined 64-bit Charikar simhash (md5-keyed,
frequency-weighted) over word tokens of the union of text/* body
parts. Inlined rather than pulling the `simhash` PyPI dep, which
transitively brings numpy ~50 MB into a slim decky container; the
algorithm is ~15 lines and identical in extraction quality.
* body_base64_bytes — largest decoded base64 chunk's byte count,
scanning text body parts with the same `_BASE64_RE` the lifter's
`_p_encoded_payload` fallback uses. R0048 fires from this scalar
alone; the lifter's body_text fallback becomes dead in normal
operation.
* attachment_macro_indicator — stdlib zipfile sniff for
`vbaProject.bin` inside OOXML containers. Catches modern .docm /
.xlsm / .pptm and macro-injected .docx; legacy .xls (CFBF) is a
follow-up.
* attachment_encrypted — flag_bits & 0x01 on any ZIP / OOXML entry's
central directory; magic-byte match for 7z / RAR / CFBF (encrypted
Office wrap).
* html_smuggling — structural lxml parse first: fires when an `<a
download>` element coexists with a `<script>` referencing
`Blob` / `Uint8Array` / `URL.createObjectURL`. Regex pair-check
fallback on lxml parse failure (real-world phish HTML is often
malformed). Cuts the FP rate that pure-regex would produce on
legitimate "click to download" links.
Add `python3-lxml` (~5 MB Debian package, C-extension, no transitive
Python deps) to the SMTP decky's Dockerfile. simhash stays inline.
Per the dependency rule: lxml earns its weight by cutting R0046's
OR-combined FP rate; a heavier macro-detection lib (oletools ~5 MB
pure-python with msoffcrypto) would not measurably improve the
boolean signal we need, so stdlib stays for that lane.
This commit is contained in:
@@ -665,6 +665,248 @@ class TestMessageCapture:
|
||||
import json as _json
|
||||
assert _json.loads(rec["urls_json"]) == []
|
||||
|
||||
def test_message_stored_carries_body_simhash_and_base64_bytes(self, tmp_path):
|
||||
"""Layer-2 body signals: simhash hex string + base64-bytes
|
||||
scalar ride on every captured message_stored event so the
|
||||
EmailLifter's R0042 / R0048 predicates fire from the bus
|
||||
payload alone."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
proto, _, _ = _make_protocol(mod)
|
||||
# Body with a >=4 KB base64 chunk so R0048's threshold
|
||||
# (min_bytes=4096) hits.
|
||||
big_chunk = ("A" * 8192)
|
||||
_send(
|
||||
proto,
|
||||
"EHLO x.com",
|
||||
"MAIL FROM:<a@b.com>",
|
||||
"RCPT TO:<c@d.com>",
|
||||
"DATA",
|
||||
"Subject: phishing template",
|
||||
"",
|
||||
"Click here urgently to wire your invoice payment",
|
||||
big_chunk,
|
||||
".",
|
||||
)
|
||||
events = _logged_events(mod)
|
||||
rec = next(f for t, f in events if t == "message_stored")
|
||||
# 16-hex-char simhash
|
||||
simhash = rec["body_simhash"]
|
||||
assert isinstance(simhash, str)
|
||||
assert len(simhash) == 16
|
||||
assert all(c in "0123456789abcdef" for c in simhash)
|
||||
# base64 chunk decoded length >= 4096 (8192 base64 chars → 6144 bytes)
|
||||
assert isinstance(rec["body_base64_bytes"], int)
|
||||
assert rec["body_base64_bytes"] >= 4096
|
||||
|
||||
def test_message_stored_no_body_yields_empty_simhash(self, tmp_path):
|
||||
"""A bare DATA terminator with no text body yields an empty
|
||||
simhash and zero base64-bytes — predicates correctly see
|
||||
'no signal' and don't fire."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
proto, _, _ = _make_protocol(mod)
|
||||
_send(
|
||||
proto,
|
||||
"EHLO x.com",
|
||||
"MAIL FROM:<a@b.com>",
|
||||
"RCPT TO:<c@d.com>",
|
||||
"DATA",
|
||||
"Subject: empty",
|
||||
"Content-Type: application/octet-stream",
|
||||
"",
|
||||
".",
|
||||
)
|
||||
events = _logged_events(mod)
|
||||
rec = next(f for t, f in events if t == "message_stored")
|
||||
assert rec["body_simhash"] == ""
|
||||
assert rec["body_base64_bytes"] == 0
|
||||
|
||||
def test_simhash_resists_whitespace_and_punctuation_mutation(self, tmp_path):
|
||||
"""Two messages differing only in whitespace / punctuation
|
||||
produce the same simhash — that's the whole point of a real
|
||||
simhash over a sha256 prefix."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
body_a = "Please send the wire transfer immediately"
|
||||
body_b = "Please send,, the wire-transfer immediately!"
|
||||
sh_a = mod._body_simhash(body_a)
|
||||
sh_b = mod._body_simhash(body_b)
|
||||
assert sh_a == sh_b
|
||||
|
||||
def test_attachment_macro_indicator_fires_on_docm_zip(self, tmp_path):
|
||||
"""A zip carrying a vbaProject.bin entry (the OOXML macro
|
||||
marker) is flagged. Mirrors a real .docm container."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
import zipfile as _zf
|
||||
import io as _io
|
||||
buf = _io.BytesIO()
|
||||
with _zf.ZipFile(buf, "w") as zf:
|
||||
zf.writestr("[Content_Types].xml", "<types/>")
|
||||
zf.writestr("word/vbaProject.bin", b"VBA stream")
|
||||
assert mod._attachment_macro_indicator(buf.getvalue(), "report.docm")
|
||||
|
||||
def test_attachment_macro_indicator_skips_clean_docx(self, tmp_path):
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
import zipfile as _zf
|
||||
import io as _io
|
||||
buf = _io.BytesIO()
|
||||
with _zf.ZipFile(buf, "w") as zf:
|
||||
zf.writestr("[Content_Types].xml", "<types/>")
|
||||
zf.writestr("word/document.xml", "<doc/>")
|
||||
assert not mod._attachment_macro_indicator(buf.getvalue(), "clean.docx")
|
||||
|
||||
def test_attachment_encrypted_detects_password_zip(self, tmp_path):
|
||||
"""A zip with an entry whose general-purpose flag bit 0x01 is
|
||||
set (the encrypted-entry marker per APPNOTE.txt §4.4.4) trips
|
||||
the bool. Stdlib's ``writestr`` discards a hand-set flag_bits,
|
||||
so we post-process the produced zip bytes to flip the bit on
|
||||
both the local file header and the central directory entry —
|
||||
what our detector actually reads."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
import zipfile as _zf
|
||||
import io as _io
|
||||
buf = _io.BytesIO()
|
||||
with _zf.ZipFile(buf, "w") as zf:
|
||||
zf.writestr("payload.bin", b"ciphertext")
|
||||
raw = bytearray(buf.getvalue())
|
||||
# Local file header: signature PK\x03\x04 then version (2),
|
||||
# then the general-purpose flag word at offset 6.
|
||||
lfh = raw.find(b"PK\x03\x04")
|
||||
assert lfh >= 0
|
||||
raw[lfh + 6] |= 0x01
|
||||
# Central directory entry: signature PK\x01\x02 then versions
|
||||
# (4 bytes) then the flag word at offset 8.
|
||||
cd = raw.find(b"PK\x01\x02")
|
||||
assert cd >= 0
|
||||
raw[cd + 8] |= 0x01
|
||||
assert mod._attachment_encrypted(bytes(raw), "secrets.zip")
|
||||
|
||||
def test_attachment_encrypted_magic_bytes_7z_and_rar(self, tmp_path):
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
# 7z header — even unencrypted .7z trips the bool because we
|
||||
# don't parse the archive content; magic alone is enough for
|
||||
# R0046's OR-combined predicate.
|
||||
assert mod._attachment_encrypted(b"7z\xBC\xAF\x27\x1C" + b"\x00" * 16, "x.7z")
|
||||
assert mod._attachment_encrypted(b"Rar!\x1A\x07" + b"\x00" * 16, "x.rar")
|
||||
# CFBF (encrypted Office)
|
||||
assert mod._attachment_encrypted(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1" + b"\x00" * 16, "x.xlsx")
|
||||
# Random plain bytes
|
||||
assert not mod._attachment_encrypted(b"hello world", "note.txt")
|
||||
|
||||
def test_html_smuggling_fires_on_anchor_plus_blob_script(self, tmp_path):
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
proto, _, _ = _make_protocol(mod)
|
||||
boundary = "----HTMLSMUGGLE"
|
||||
html_body = (
|
||||
"<html><body>"
|
||||
"<script>"
|
||||
"var data = atob('UEsDBA==');"
|
||||
"var blob = new Blob([data]);"
|
||||
"var url = URL.createObjectURL(blob);"
|
||||
"</script>"
|
||||
"<a href='#' download='invoice.zip'>Download invoice</a>"
|
||||
"</body></html>"
|
||||
)
|
||||
_send(
|
||||
proto,
|
||||
"EHLO x.com",
|
||||
"MAIL FROM:<a@b.com>",
|
||||
"RCPT TO:<c@d.com>",
|
||||
"DATA",
|
||||
"Subject: smuggle",
|
||||
f"Content-Type: multipart/alternative; boundary={boundary}",
|
||||
"MIME-Version: 1.0",
|
||||
"",
|
||||
f"--{boundary}",
|
||||
"Content-Type: text/html; charset=utf-8",
|
||||
"",
|
||||
html_body,
|
||||
f"--{boundary}--",
|
||||
".",
|
||||
)
|
||||
events = _logged_events(mod)
|
||||
rec = next(f for t, f in events if t == "message_stored")
|
||||
assert rec["html_smuggling"] == 1
|
||||
|
||||
def test_html_smuggling_skips_legit_download_link(self, tmp_path):
|
||||
"""A page with `<a download>` but no Blob/createObjectURL
|
||||
script does NOT fire — the "click to download our report"
|
||||
FP class is precisely what the structural check excludes."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
proto, _, _ = _make_protocol(mod)
|
||||
boundary = "----LEGITDOWNLOAD"
|
||||
html_body = (
|
||||
"<html><body>"
|
||||
"<p>Quarterly report is ready.</p>"
|
||||
"<a href='/report.pdf' download='Q1-report.pdf'>Download</a>"
|
||||
"</body></html>"
|
||||
)
|
||||
_send(
|
||||
proto,
|
||||
"EHLO x.com",
|
||||
"MAIL FROM:<a@b.com>",
|
||||
"RCPT TO:<c@d.com>",
|
||||
"DATA",
|
||||
"Subject: legit",
|
||||
f"Content-Type: multipart/alternative; boundary={boundary}",
|
||||
"MIME-Version: 1.0",
|
||||
"",
|
||||
f"--{boundary}",
|
||||
"Content-Type: text/html; charset=utf-8",
|
||||
"",
|
||||
html_body,
|
||||
f"--{boundary}--",
|
||||
".",
|
||||
)
|
||||
events = _logged_events(mod)
|
||||
rec = next(f for t, f in events if t == "message_stored")
|
||||
assert rec["html_smuggling"] == 0
|
||||
|
||||
def test_attachment_manifest_carries_macro_and_encrypted_flags(self, tmp_path):
|
||||
"""The attachments JSON manifest now includes per-attachment
|
||||
macro_indicator + encrypted booleans — the ingester reduces
|
||||
these to top-level flags at publish time."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
proto, _, _ = _make_protocol(mod)
|
||||
boundary = "----MANIFESTBOOLS"
|
||||
# Build a docm-shaped attachment in-line.
|
||||
import zipfile as _zf
|
||||
import io as _io
|
||||
import base64 as _b64
|
||||
zbuf = _io.BytesIO()
|
||||
with _zf.ZipFile(zbuf, "w") as zf:
|
||||
zf.writestr("[Content_Types].xml", "<types/>")
|
||||
zf.writestr("word/vbaProject.bin", b"VBA")
|
||||
encoded = _b64.b64encode(zbuf.getvalue()).decode()
|
||||
_send(
|
||||
proto,
|
||||
"EHLO x.com",
|
||||
"MAIL FROM:<a@b.com>",
|
||||
"RCPT TO:<c@d.com>",
|
||||
"DATA",
|
||||
"Subject: macro",
|
||||
f"Content-Type: multipart/mixed; boundary={boundary}",
|
||||
"MIME-Version: 1.0",
|
||||
"",
|
||||
f"--{boundary}",
|
||||
"Content-Type: text/plain",
|
||||
"",
|
||||
"see attached",
|
||||
f"--{boundary}",
|
||||
'Content-Type: application/vnd.ms-word.document.macroEnabled.12; name="report.docm"',
|
||||
'Content-Disposition: attachment; filename="report.docm"',
|
||||
"Content-Transfer-Encoding: base64",
|
||||
"",
|
||||
encoded,
|
||||
f"--{boundary}--",
|
||||
".",
|
||||
)
|
||||
events = _logged_events(mod)
|
||||
rec = next(f for t, f in events if t == "message_stored")
|
||||
import json as _json
|
||||
manifest = _json.loads(rec["attachments_json"])
|
||||
assert len(manifest) == 1
|
||||
assert manifest[0]["macro_indicator"] is True
|
||||
assert manifest[0]["encrypted"] is False
|
||||
|
||||
def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod):
|
||||
"""With SMTP_QUARANTINE_DIR unset, message_accepted fires but no
|
||||
message_stored event and no files are written."""
|
||||
|
||||
Reference in New Issue
Block a user