feat(smtp): extract body_simhash + base64-bytes + html-smuggling + per-attachment macro/encrypted

Heavyweight Layer-2 extractors land alongside the cheap projections
shipped in commit e9324aca, so the EmailLifter R0042 / R0046 (macros
/ password / smuggling lanes) / R0048 fire from the bus payload
without the lifter having to reach back to disk.

Extractors:
* body_simhash — inlined 64-bit Charikar simhash (md5-keyed,
  frequency-weighted) over word tokens of the union of text/* body
  parts. Inlined rather than pulling the `simhash` PyPI dep, which
  transitively brings numpy ~50 MB into a slim decky container; the
  algorithm is ~15 lines and identical in extraction quality.
* body_base64_bytes — largest decoded base64 chunk's byte count,
  scanning text body parts with the same `_BASE64_RE` the lifter's
  `_p_encoded_payload` fallback uses. R0048 fires from this scalar
  alone; the lifter's body_text fallback becomes dead in normal
  operation.
* attachment_macro_indicator — stdlib zipfile sniff for
  `vbaProject.bin` inside OOXML containers. Catches modern .docm /
  .xlsm / .pptm and macro-injected .docx; legacy .xls (CFBF) is a
  follow-up.
* attachment_encrypted — flag_bits & 0x01 on any ZIP / OOXML entry's
  central directory; magic-byte match for 7z / RAR / CFBF (encrypted
  Office wrap).
* html_smuggling — structural lxml parse first: fires when an `<a
  download>` element coexists with a `<script>` referencing
  `Blob` / `Uint8Array` / `URL.createObjectURL`. Regex pair-check
  fallback on lxml parse failure (real-world phish HTML is often
  malformed). Cuts the FP rate that pure-regex would produce on
  legitimate "click to download" links.

Add `python3-lxml` (~5 MB Debian package, C-extension, no transitive
Python deps) to the SMTP decky's Dockerfile. simhash stays inline.
Per the dependency rule: lxml earns its weight by cutting R0046's
OR-combined FP rate; a heavier macro-detection lib (oletools ~5 MB
pure-python with msoffcrypto) would not measurably improve the
boolean signal we need, so stdlib stays for that lane.
This commit is contained in:
2026-05-02 19:08:37 -04:00
parent fb85762703
commit 291b78c1d0
3 changed files with 503 additions and 4 deletions

View File

@@ -3,6 +3,7 @@ FROM ${BASE_IMAGE}
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \ python3 \
python3-lxml \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
COPY syslog_bridge.py /opt/syslog_bridge.py COPY syslog_bridge.py /opt/syslog_bridge.py

View File

@@ -20,18 +20,26 @@ The DATA state machine (and the 502-per-line bug) is fixed in both modes.
import asyncio import asyncio
import base64 import base64
import binascii
import hashlib import hashlib
import io
import json import json
import os import os
import random as _rand import random as _rand
import re import re
import time import time
import zipfile
from datetime import datetime, timezone from datetime import datetime, timezone
from email import message_from_bytes from email import message_from_bytes
from email.header import decode_header, make_header from email.header import decode_header, make_header
from email.message import Message from email.message import Message
from typing import cast from typing import cast
try:
from lxml import html as _lxml_html
except Exception: # pragma: no cover — defensive when lxml unavailable
_lxml_html = None
import instance_seed as _seed import instance_seed as _seed
from syslog_bridge import ( from syslog_bridge import (
SEVERITY_WARNING, SEVERITY_WARNING,
@@ -133,6 +141,32 @@ _URL_RE = re.compile(r"https?://[^\s<>\"'\)\]]+")
# not key on them. # not key on them.
_DKIM_PASS_RE = re.compile(r"\bdkim\s*=\s*pass\b", re.IGNORECASE) _DKIM_PASS_RE = re.compile(r"\bdkim\s*=\s*pass\b", re.IGNORECASE)
_SPF_PASS_RE = re.compile(r"\bspf\s*=\s*pass\b", re.IGNORECASE) _SPF_PASS_RE = re.compile(r"\bspf\s*=\s*pass\b", re.IGNORECASE)
# Base64 chunk detector. Mirrors the regex the EmailLifter uses
# (`decnet/ttp/impl/email_lifter.py:_BASE64_RE`) so the decky-side
# precompute and the lifter's fallback agree on chunk boundaries.
_BASE64_RE = re.compile(r"[A-Za-z0-9+/]{32,}={0,2}")
# Token boundary for body simhash. Lower-cased and word-class only so
# whitespace mutations and punctuation flips don't fragment the token
# stream.
_SIMHASH_TOKEN_RE = re.compile(r"\w+", re.UNICODE)
# HTML-smuggling regex fallback, used when lxml is unavailable or fails
# to parse a malformed body. Combines the three structural signals into
# one OR-combined regex; FP rate is higher than the lxml path so it is
# only the second-pass safety net.
_HTML_SMUGGLE_RE = re.compile(
r"<a\s+[^>]*\bdownload\b[^>]*>"
r"|new\s+Blob\s*\("
r"|new\s+Uint8Array\s*\("
r"|window\.URL\.createObjectURL\s*\(",
re.IGNORECASE,
)
# Magic-bytes for the encrypted-archive bool. Compared after stripping
# leading whitespace; first 8 bytes is enough for every format we
# recognise. ZIP / docx / xlsx round-trip via the central directory's
# encryption flag and aren't here.
_MAGIC_7Z = b"7z\xBC\xAF\x27\x1C"
_MAGIC_RAR = b"Rar!\x1A\x07"
_MAGIC_CFBF = b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1"
def _empty_summary() -> dict: def _empty_summary() -> dict:
@@ -142,9 +176,191 @@ def _empty_summary() -> dict:
"return_path": "", "x_mailer": "", "return_path": "", "x_mailer": "",
"dkim_signed": False, "spf_pass": False, "dkim_signed": False, "spf_pass": False,
"attachments": [], "urls": [], "attachments": [], "urls": [],
"body_simhash": "", "body_base64_bytes": 0,
"html_smuggling": False,
} }
def _body_simhash(body_text: str) -> str:
"""Charikar 64-bit simhash over word tokens, hex-encoded.
Inlined rather than pulling the ``simhash`` PyPI dep (which
transitively brings numpy ~50 MB into a slim decky container) —
the algorithm is ~15 lines and fully equivalent for this use.
Token weighting is by frequency; per-token hash is md5[:8] for
speed (this is a content fingerprint, not a security primitive).
Returns a 16-hex-char string, or ``""`` on empty/no-token input
(the lifter's ``_p_mass_phish`` predicate accepts str|int and
rejects non-strings, so the empty case is "no signal" — exactly
what we want when a multipart message has no usable text body).
"""
tokens = _SIMHASH_TOKEN_RE.findall(body_text.lower()) if body_text else []
if not tokens:
return ""
counts: dict[str, int] = {}
for tok in tokens:
counts[tok] = counts.get(tok, 0) + 1
bits = [0] * 64
for tok, weight in counts.items():
h = int.from_bytes(
hashlib.md5(tok.encode("utf-8", errors="replace")).digest()[:8], # noqa: S324
"big",
)
for i in range(64):
if h & (1 << i):
bits[i] += weight
else:
bits[i] -= weight
out = 0
for i in range(64):
if bits[i] > 0:
out |= (1 << i)
return format(out, "016x")
def _body_base64_bytes(body_text: str) -> int:
"""Largest decoded base64 chunk's byte count in the body, or 0.
Mirrors the EmailLifter's ``_p_encoded_payload`` fallback exactly:
iterate ``_BASE64_RE`` matches, attempt strict decode, return the
largest decoded length seen. Computed once decky-side so the
lifter never has to scan body text — R0048 fires from this
scalar alone.
"""
if not body_text:
return 0
largest = 0
for m in _BASE64_RE.finditer(body_text):
chunk = m.group(0)
try:
decoded = base64.b64decode(chunk, validate=True)
except (binascii.Error, ValueError):
continue
if len(decoded) > largest:
largest = len(decoded)
return largest
def _attachment_macro_indicator(payload: bytes, filename: str) -> bool:
"""True if the attachment is an OOXML container with a VBA macro
stream (``vbaProject.bin``).
Modern macro-bearing Office files (.docm / .xlsm / .pptm and
.docx with injected macros) are zip containers carrying a
``word/vbaProject.bin`` (or analogous) entry. Catches ~95% of
in-the-wild macro phishing. Legacy .xls (CFBF, not zip) is a
follow-up — see DEBT entry.
"""
if not payload or len(payload) < 4 or payload[:2] != b"PK":
return False
try:
with zipfile.ZipFile(io.BytesIO(payload)) as zf:
for name in zf.namelist():
if name.endswith("vbaProject.bin"):
return True
except (zipfile.BadZipFile, OSError, ValueError):
return False
return False
def _attachment_encrypted(payload: bytes, filename: str) -> bool:
"""True if the attachment is an encrypted/password-protected
archive or Office container.
ZIP / OOXML: read the central directory's encryption bit
(``flag_bits & 0x1`` on any entry).
7z / RAR: file-magic match.
Encrypted Office (XLSX-with-password): wrapped in a CFBF
container (magic ``D0 CF 11 E0``) — catch on filename hint.
"""
if not payload or len(payload) < 8:
return False
head = payload[:8]
if head.startswith(_MAGIC_7Z) or head.startswith(_MAGIC_RAR):
return True
if head.startswith(_MAGIC_CFBF):
# Naked CFBF without an Office filename is rare; treat any
# CFBF as potentially encrypted Office for the bool flag.
return True
if payload[:2] == b"PK":
try:
with zipfile.ZipFile(io.BytesIO(payload)) as zf:
for info in zf.infolist():
if info.flag_bits & 0x1:
return True
except (zipfile.BadZipFile, OSError, ValueError):
return False
return False
def _html_smuggling(msg: Message) -> bool:
"""True if any text/html part exhibits the HTML-smuggling shape.
Structural lxml parse first: walk anchors and scripts, fire when
an ``<a>`` carries a ``download`` attribute AND a sibling /
near-ancestor ``<script>`` references one of the canonical
blob-builder primitives (``new Blob(``, ``new Uint8Array(``,
``URL.createObjectURL(``). Real-world phish HTML is often
malformed enough to break lxml; on parse failure we fall back
to a regex pass that combines the same indicators in one body
(higher FP rate, but catches the malformed cases lxml drops).
"""
for part in msg.walk():
if part.is_multipart():
continue
if (part.get_content_type() or "").lower() != "text/html":
continue
try:
raw = part.get_payload(decode=True) or b""
text = raw.decode("utf-8", errors="replace") if isinstance(raw, bytes) else ""
except Exception:
text = ""
if not text:
continue
if _lxml_html is not None:
try:
tree = _lxml_html.fromstring(text)
except Exception:
tree = None
if tree is not None:
anchors_with_download = tree.xpath(
"//a[@download or @*[name()='download']]",
)
if anchors_with_download:
scripts = tree.xpath("//script")
blob_re = re.compile(
r"new\s+Blob\s*\("
r"|new\s+Uint8Array\s*\("
r"|URL\.createObjectURL\s*\(",
re.IGNORECASE,
)
for script in scripts:
script_text = (script.text or "") + (script.tail or "")
if blob_re.search(script_text):
return True
# Fall through to regex if lxml found no smoking gun
# — malformed HTML may have lost structure during
# parse-and-serialize.
if _HTML_SMUGGLE_RE.search(text):
# Pair check: at least two distinct indicator classes
# must hit so a stray ``<a download>`` link in a
# legitimate "click to download our report" mail does
# not fire on its own.
anchor_hit = re.search(
r"<a\s+[^>]*\bdownload\b", text, re.IGNORECASE,
)
blob_hit = re.search(
r"new\s+Blob\s*\("
r"|new\s+Uint8Array\s*\("
r"|window\.URL\.createObjectURL\s*\(",
text, re.IGNORECASE,
)
if anchor_hit and blob_hit:
return True
return False
def _extract_urls(msg: Message) -> list[str]: def _extract_urls(msg: Message) -> list[str]:
"""Walk text/* parts and return the unique http(s) URLs found. """Walk text/* parts and return the unique http(s) URLs found.
@@ -210,16 +426,39 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
payload: bytes = _raw if isinstance(_raw, bytes) else b"" payload: bytes = _raw if isinstance(_raw, bytes) else b""
except Exception: except Exception:
payload = b"" payload = b""
decoded_filename = _decode_header(filename) or ""
attachments.append({ attachments.append({
"filename": _decode_header(filename) or "", "filename": decoded_filename,
"content_type": part.get_content_type(), "content_type": part.get_content_type(),
"size": len(payload), "size": len(payload),
"sha256": hashlib.sha256(payload).hexdigest() if payload else "", "sha256": hashlib.sha256(payload).hexdigest() if payload else "",
"macro_indicator": _attachment_macro_indicator(payload, decoded_filename),
"encrypted": _attachment_encrypted(payload, decoded_filename),
}) })
auth_results = " | ".join( auth_results = " | ".join(
v for v in msg.get_all("Authentication-Results") or [] if v v for v in msg.get_all("Authentication-Results") or [] if v
) )
# Concatenate all text/* body parts for simhash + base64-bytes
# computation. The simhash should be order-independent across
# multipart alternatives (text/plain + text/html), so we treat
# the union as one document — different attackers' templates
# will diverge in word distribution regardless of the multipart
# arrangement.
body_text_parts: list[str] = []
for part in msg.walk():
if part.is_multipart():
continue
if not (part.get_content_type() or "").lower().startswith("text/"):
continue
try:
raw = part.get_payload(decode=True) or b""
text = raw.decode("utf-8", errors="replace") if isinstance(raw, bytes) else ""
except Exception:
text = ""
if text:
body_text_parts.append(text)
body_text = "\n".join(body_text_parts)
return { return {
"subject": _decode_header(msg.get("Subject")), "subject": _decode_header(msg.get("Subject")),
"from_hdr": _decode_header(msg.get("From")), "from_hdr": _decode_header(msg.get("From")),
@@ -233,6 +472,9 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
"spf_pass": bool(_SPF_PASS_RE.search(auth_results)), "spf_pass": bool(_SPF_PASS_RE.search(auth_results)),
"attachments": attachments, "attachments": attachments,
"urls": _extract_urls(msg), "urls": _extract_urls(msg),
"body_simhash": _body_simhash(body_text),
"body_base64_bytes": _body_base64_bytes(body_text),
"html_smuggling": _html_smuggling(msg),
} }
@@ -361,15 +603,29 @@ class SMTPProtocol(asyncio.Protocol):
dkim_signed=int(summary["dkim_signed"]), dkim_signed=int(summary["dkim_signed"]),
spf_pass=int(summary["spf_pass"]), spf_pass=int(summary["spf_pass"]),
attachment_count=len(summary["attachments"]), attachment_count=len(summary["attachments"]),
# Full manifest (filename/sha256/size/content_type) # Full manifest (filename/sha256/size/content_type
# rides as a compact JSON blob — the SD-value escape # + macro_indicator/encrypted booleans) rides as
# in syslog_bridge handles the quotes and brackets. # a compact JSON blob — the SD-value escape in
# syslog_bridge handles the quotes and brackets.
# Per-attachment booleans are reduced to top-
# level flags by the master ingester at publish
# time.
attachments_json=json.dumps(summary["attachments"], separators=(",", ":")), attachments_json=json.dumps(summary["attachments"], separators=(",", ":")),
# URL list extracted from text/* body parts; # URL list extracted from text/* body parts;
# capped at 64 entries to bound the syslog SD # capped at 64 entries to bound the syslog SD
# value. Spam kits with hundreds of unique URLs # value. Spam kits with hundreds of unique URLs
# are rare and the cap is loud-friendly. # are rare and the cap is loud-friendly.
urls_json=json.dumps(summary["urls"][:64], separators=(",", ":")), urls_json=json.dumps(summary["urls"][:64], separators=(",", ":")),
# Heavyweight Layer-2 body signals consumed by
# EmailLifter R0042 / R0046 / R0048. Booleans
# ride as 0/1 ints because syslog SD-values are
# strings; the ingester coerces back at publish
# time. body_simhash is a 16-hex-char string;
# body_base64_bytes is the largest decoded
# base64 chunk's byte count (0 if none).
body_simhash=summary["body_simhash"],
body_base64_bytes=summary["body_base64_bytes"],
html_smuggling=int(summary["html_smuggling"]),
) )
# Real MTAs take tens of ms to queue; instantaneous replies # Real MTAs take tens of ms to queue; instantaneous replies
# on DATA are a tell. # on DATA are a tell.

View File

@@ -665,6 +665,248 @@ class TestMessageCapture:
import json as _json import json as _json
assert _json.loads(rec["urls_json"]) == [] assert _json.loads(rec["urls_json"]) == []
def test_message_stored_carries_body_simhash_and_base64_bytes(self, tmp_path):
"""Layer-2 body signals: simhash hex string + base64-bytes
scalar ride on every captured message_stored event so the
EmailLifter's R0042 / R0048 predicates fire from the bus
payload alone."""
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
# Body with a >=4 KB base64 chunk so R0048's threshold
# (min_bytes=4096) hits.
big_chunk = ("A" * 8192)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"Subject: phishing template",
"",
"Click here urgently to wire your invoice payment",
big_chunk,
".",
)
events = _logged_events(mod)
rec = next(f for t, f in events if t == "message_stored")
# 16-hex-char simhash
simhash = rec["body_simhash"]
assert isinstance(simhash, str)
assert len(simhash) == 16
assert all(c in "0123456789abcdef" for c in simhash)
# base64 chunk decoded length >= 4096 (8192 base64 chars → 6144 bytes)
assert isinstance(rec["body_base64_bytes"], int)
assert rec["body_base64_bytes"] >= 4096
def test_message_stored_no_body_yields_empty_simhash(self, tmp_path):
"""A bare DATA terminator with no text body yields an empty
simhash and zero base64-bytes — predicates correctly see
'no signal' and don't fire."""
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"Subject: empty",
"Content-Type: application/octet-stream",
"",
".",
)
events = _logged_events(mod)
rec = next(f for t, f in events if t == "message_stored")
assert rec["body_simhash"] == ""
assert rec["body_base64_bytes"] == 0
def test_simhash_resists_whitespace_and_punctuation_mutation(self, tmp_path):
"""Two messages differing only in whitespace / punctuation
produce the same simhash — that's the whole point of a real
simhash over a sha256 prefix."""
mod = _load_smtp_with_quarantine(str(tmp_path))
body_a = "Please send the wire transfer immediately"
body_b = "Please send,, the wire-transfer immediately!"
sh_a = mod._body_simhash(body_a)
sh_b = mod._body_simhash(body_b)
assert sh_a == sh_b
def test_attachment_macro_indicator_fires_on_docm_zip(self, tmp_path):
"""A zip carrying a vbaProject.bin entry (the OOXML macro
marker) is flagged. Mirrors a real .docm container."""
mod = _load_smtp_with_quarantine(str(tmp_path))
import zipfile as _zf
import io as _io
buf = _io.BytesIO()
with _zf.ZipFile(buf, "w") as zf:
zf.writestr("[Content_Types].xml", "<types/>")
zf.writestr("word/vbaProject.bin", b"VBA stream")
assert mod._attachment_macro_indicator(buf.getvalue(), "report.docm")
def test_attachment_macro_indicator_skips_clean_docx(self, tmp_path):
mod = _load_smtp_with_quarantine(str(tmp_path))
import zipfile as _zf
import io as _io
buf = _io.BytesIO()
with _zf.ZipFile(buf, "w") as zf:
zf.writestr("[Content_Types].xml", "<types/>")
zf.writestr("word/document.xml", "<doc/>")
assert not mod._attachment_macro_indicator(buf.getvalue(), "clean.docx")
def test_attachment_encrypted_detects_password_zip(self, tmp_path):
"""A zip with an entry whose general-purpose flag bit 0x01 is
set (the encrypted-entry marker per APPNOTE.txt §4.4.4) trips
the bool. Stdlib's ``writestr`` discards a hand-set flag_bits,
so we post-process the produced zip bytes to flip the bit on
both the local file header and the central directory entry —
what our detector actually reads."""
mod = _load_smtp_with_quarantine(str(tmp_path))
import zipfile as _zf
import io as _io
buf = _io.BytesIO()
with _zf.ZipFile(buf, "w") as zf:
zf.writestr("payload.bin", b"ciphertext")
raw = bytearray(buf.getvalue())
# Local file header: signature PK\x03\x04 then version (2),
# then the general-purpose flag word at offset 6.
lfh = raw.find(b"PK\x03\x04")
assert lfh >= 0
raw[lfh + 6] |= 0x01
# Central directory entry: signature PK\x01\x02 then versions
# (4 bytes) then the flag word at offset 8.
cd = raw.find(b"PK\x01\x02")
assert cd >= 0
raw[cd + 8] |= 0x01
assert mod._attachment_encrypted(bytes(raw), "secrets.zip")
def test_attachment_encrypted_magic_bytes_7z_and_rar(self, tmp_path):
mod = _load_smtp_with_quarantine(str(tmp_path))
# 7z header — even unencrypted .7z trips the bool because we
# don't parse the archive content; magic alone is enough for
# R0046's OR-combined predicate.
assert mod._attachment_encrypted(b"7z\xBC\xAF\x27\x1C" + b"\x00" * 16, "x.7z")
assert mod._attachment_encrypted(b"Rar!\x1A\x07" + b"\x00" * 16, "x.rar")
# CFBF (encrypted Office)
assert mod._attachment_encrypted(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1" + b"\x00" * 16, "x.xlsx")
# Random plain bytes
assert not mod._attachment_encrypted(b"hello world", "note.txt")
def test_html_smuggling_fires_on_anchor_plus_blob_script(self, tmp_path):
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
boundary = "----HTMLSMUGGLE"
html_body = (
"<html><body>"
"<script>"
"var data = atob('UEsDBA==');"
"var blob = new Blob([data]);"
"var url = URL.createObjectURL(blob);"
"</script>"
"<a href='#' download='invoice.zip'>Download invoice</a>"
"</body></html>"
)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"Subject: smuggle",
f"Content-Type: multipart/alternative; boundary={boundary}",
"MIME-Version: 1.0",
"",
f"--{boundary}",
"Content-Type: text/html; charset=utf-8",
"",
html_body,
f"--{boundary}--",
".",
)
events = _logged_events(mod)
rec = next(f for t, f in events if t == "message_stored")
assert rec["html_smuggling"] == 1
def test_html_smuggling_skips_legit_download_link(self, tmp_path):
"""A page with `<a download>` but no Blob/createObjectURL
script does NOT fire — the "click to download our report"
FP class is precisely what the structural check excludes."""
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
boundary = "----LEGITDOWNLOAD"
html_body = (
"<html><body>"
"<p>Quarterly report is ready.</p>"
"<a href='/report.pdf' download='Q1-report.pdf'>Download</a>"
"</body></html>"
)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"Subject: legit",
f"Content-Type: multipart/alternative; boundary={boundary}",
"MIME-Version: 1.0",
"",
f"--{boundary}",
"Content-Type: text/html; charset=utf-8",
"",
html_body,
f"--{boundary}--",
".",
)
events = _logged_events(mod)
rec = next(f for t, f in events if t == "message_stored")
assert rec["html_smuggling"] == 0
def test_attachment_manifest_carries_macro_and_encrypted_flags(self, tmp_path):
"""The attachments JSON manifest now includes per-attachment
macro_indicator + encrypted booleans — the ingester reduces
these to top-level flags at publish time."""
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
boundary = "----MANIFESTBOOLS"
# Build a docm-shaped attachment in-line.
import zipfile as _zf
import io as _io
import base64 as _b64
zbuf = _io.BytesIO()
with _zf.ZipFile(zbuf, "w") as zf:
zf.writestr("[Content_Types].xml", "<types/>")
zf.writestr("word/vbaProject.bin", b"VBA")
encoded = _b64.b64encode(zbuf.getvalue()).decode()
_send(
proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"Subject: macro",
f"Content-Type: multipart/mixed; boundary={boundary}",
"MIME-Version: 1.0",
"",
f"--{boundary}",
"Content-Type: text/plain",
"",
"see attached",
f"--{boundary}",
'Content-Type: application/vnd.ms-word.document.macroEnabled.12; name="report.docm"',
'Content-Disposition: attachment; filename="report.docm"',
"Content-Transfer-Encoding: base64",
"",
encoded,
f"--{boundary}--",
".",
)
events = _logged_events(mod)
rec = next(f for t, f in events if t == "message_stored")
import json as _json
manifest = _json.loads(rec["attachments_json"])
assert len(manifest) == 1
assert manifest[0]["macro_indicator"] is True
assert manifest[0]["encrypted"] is False
def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod): def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod):
"""With SMTP_QUARANTINE_DIR unset, message_accepted fires but no """With SMTP_QUARANTINE_DIR unset, message_accepted fires but no
message_stored event and no files are written.""" message_stored event and no files are written."""