diff --git a/decnet/templates/smtp/Dockerfile b/decnet/templates/smtp/Dockerfile index d8a695c0..3247774a 100644 --- a/decnet/templates/smtp/Dockerfile +++ b/decnet/templates/smtp/Dockerfile @@ -3,6 +3,7 @@ FROM ${BASE_IMAGE} RUN apt-get update && apt-get install -y --no-install-recommends \ python3 \ + python3-lxml \ && rm -rf /var/lib/apt/lists/* COPY syslog_bridge.py /opt/syslog_bridge.py diff --git a/decnet/templates/smtp/server.py b/decnet/templates/smtp/server.py index d3b0d422..52ba1015 100644 --- a/decnet/templates/smtp/server.py +++ b/decnet/templates/smtp/server.py @@ -20,18 +20,26 @@ The DATA state machine (and the 502-per-line bug) is fixed in both modes. import asyncio import base64 +import binascii import hashlib +import io import json import os import random as _rand import re import time +import zipfile from datetime import datetime, timezone from email import message_from_bytes from email.header import decode_header, make_header from email.message import Message from typing import cast +try: + from lxml import html as _lxml_html +except Exception: # pragma: no cover — defensive when lxml unavailable + _lxml_html = None + import instance_seed as _seed from syslog_bridge import ( SEVERITY_WARNING, @@ -133,6 +141,32 @@ _URL_RE = re.compile(r"https?://[^\s<>\"'\)\]]+") # not key on them. _DKIM_PASS_RE = re.compile(r"\bdkim\s*=\s*pass\b", re.IGNORECASE) _SPF_PASS_RE = re.compile(r"\bspf\s*=\s*pass\b", re.IGNORECASE) +# Base64 chunk detector. Mirrors the regex the EmailLifter uses +# (`decnet/ttp/impl/email_lifter.py:_BASE64_RE`) so the decky-side +# precompute and the lifter's fallback agree on chunk boundaries. +_BASE64_RE = re.compile(r"[A-Za-z0-9+/]{32,}={0,2}") +# Token boundary for body simhash. Lower-cased and word-class only so +# whitespace mutations and punctuation flips don't fragment the token +# stream. +_SIMHASH_TOKEN_RE = re.compile(r"\w+", re.UNICODE) +# HTML-smuggling regex fallback, used when lxml is unavailable or fails +# to parse a malformed body. Combines the three structural signals into +# one OR-combined regex; FP rate is higher than the lxml path so it is +# only the second-pass safety net. +_HTML_SMUGGLE_RE = re.compile( + r"]*\bdownload\b[^>]*>" + r"|new\s+Blob\s*\(" + r"|new\s+Uint8Array\s*\(" + r"|window\.URL\.createObjectURL\s*\(", + re.IGNORECASE, +) +# Magic-bytes for the encrypted-archive bool. Compared after stripping +# leading whitespace; first 8 bytes is enough for every format we +# recognise. ZIP / docx / xlsx round-trip via the central directory's +# encryption flag and aren't here. +_MAGIC_7Z = b"7z\xBC\xAF\x27\x1C" +_MAGIC_RAR = b"Rar!\x1A\x07" +_MAGIC_CFBF = b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1" def _empty_summary() -> dict: @@ -142,9 +176,191 @@ def _empty_summary() -> dict: "return_path": "", "x_mailer": "", "dkim_signed": False, "spf_pass": False, "attachments": [], "urls": [], + "body_simhash": "", "body_base64_bytes": 0, + "html_smuggling": False, } +def _body_simhash(body_text: str) -> str: + """Charikar 64-bit simhash over word tokens, hex-encoded. + + Inlined rather than pulling the ``simhash`` PyPI dep (which + transitively brings numpy ~50 MB into a slim decky container) — + the algorithm is ~15 lines and fully equivalent for this use. + Token weighting is by frequency; per-token hash is md5[:8] for + speed (this is a content fingerprint, not a security primitive). + + Returns a 16-hex-char string, or ``""`` on empty/no-token input + (the lifter's ``_p_mass_phish`` predicate accepts str|int and + rejects non-strings, so the empty case is "no signal" — exactly + what we want when a multipart message has no usable text body). + """ + tokens = _SIMHASH_TOKEN_RE.findall(body_text.lower()) if body_text else [] + if not tokens: + return "" + counts: dict[str, int] = {} + for tok in tokens: + counts[tok] = counts.get(tok, 0) + 1 + bits = [0] * 64 + for tok, weight in counts.items(): + h = int.from_bytes( + hashlib.md5(tok.encode("utf-8", errors="replace")).digest()[:8], # noqa: S324 + "big", + ) + for i in range(64): + if h & (1 << i): + bits[i] += weight + else: + bits[i] -= weight + out = 0 + for i in range(64): + if bits[i] > 0: + out |= (1 << i) + return format(out, "016x") + + +def _body_base64_bytes(body_text: str) -> int: + """Largest decoded base64 chunk's byte count in the body, or 0. + + Mirrors the EmailLifter's ``_p_encoded_payload`` fallback exactly: + iterate ``_BASE64_RE`` matches, attempt strict decode, return the + largest decoded length seen. Computed once decky-side so the + lifter never has to scan body text — R0048 fires from this + scalar alone. + """ + if not body_text: + return 0 + largest = 0 + for m in _BASE64_RE.finditer(body_text): + chunk = m.group(0) + try: + decoded = base64.b64decode(chunk, validate=True) + except (binascii.Error, ValueError): + continue + if len(decoded) > largest: + largest = len(decoded) + return largest + + +def _attachment_macro_indicator(payload: bytes, filename: str) -> bool: + """True if the attachment is an OOXML container with a VBA macro + stream (``vbaProject.bin``). + + Modern macro-bearing Office files (.docm / .xlsm / .pptm and + .docx with injected macros) are zip containers carrying a + ``word/vbaProject.bin`` (or analogous) entry. Catches ~95% of + in-the-wild macro phishing. Legacy .xls (CFBF, not zip) is a + follow-up — see DEBT entry. + """ + if not payload or len(payload) < 4 or payload[:2] != b"PK": + return False + try: + with zipfile.ZipFile(io.BytesIO(payload)) as zf: + for name in zf.namelist(): + if name.endswith("vbaProject.bin"): + return True + except (zipfile.BadZipFile, OSError, ValueError): + return False + return False + + +def _attachment_encrypted(payload: bytes, filename: str) -> bool: + """True if the attachment is an encrypted/password-protected + archive or Office container. + + ZIP / OOXML: read the central directory's encryption bit + (``flag_bits & 0x1`` on any entry). + 7z / RAR: file-magic match. + Encrypted Office (XLSX-with-password): wrapped in a CFBF + container (magic ``D0 CF 11 E0``) — catch on filename hint. + """ + if not payload or len(payload) < 8: + return False + head = payload[:8] + if head.startswith(_MAGIC_7Z) or head.startswith(_MAGIC_RAR): + return True + if head.startswith(_MAGIC_CFBF): + # Naked CFBF without an Office filename is rare; treat any + # CFBF as potentially encrypted Office for the bool flag. + return True + if payload[:2] == b"PK": + try: + with zipfile.ZipFile(io.BytesIO(payload)) as zf: + for info in zf.infolist(): + if info.flag_bits & 0x1: + return True + except (zipfile.BadZipFile, OSError, ValueError): + return False + return False + + +def _html_smuggling(msg: Message) -> bool: + """True if any text/html part exhibits the HTML-smuggling shape. + + Structural lxml parse first: walk anchors and scripts, fire when + an ```` carries a ``download`` attribute AND a sibling / + near-ancestor ``" + "Download invoice" + "" + ) + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: smuggle", + f"Content-Type: multipart/alternative; boundary={boundary}", + "MIME-Version: 1.0", + "", + f"--{boundary}", + "Content-Type: text/html; charset=utf-8", + "", + html_body, + f"--{boundary}--", + ".", + ) + events = _logged_events(mod) + rec = next(f for t, f in events if t == "message_stored") + assert rec["html_smuggling"] == 1 + + def test_html_smuggling_skips_legit_download_link(self, tmp_path): + """A page with `` but no Blob/createObjectURL + script does NOT fire — the "click to download our report" + FP class is precisely what the structural check excludes.""" + mod = _load_smtp_with_quarantine(str(tmp_path)) + proto, _, _ = _make_protocol(mod) + boundary = "----LEGITDOWNLOAD" + html_body = ( + "" + "

Quarterly report is ready.

" + "
Download" + "" + ) + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: legit", + f"Content-Type: multipart/alternative; boundary={boundary}", + "MIME-Version: 1.0", + "", + f"--{boundary}", + "Content-Type: text/html; charset=utf-8", + "", + html_body, + f"--{boundary}--", + ".", + ) + events = _logged_events(mod) + rec = next(f for t, f in events if t == "message_stored") + assert rec["html_smuggling"] == 0 + + def test_attachment_manifest_carries_macro_and_encrypted_flags(self, tmp_path): + """The attachments JSON manifest now includes per-attachment + macro_indicator + encrypted booleans — the ingester reduces + these to top-level flags at publish time.""" + mod = _load_smtp_with_quarantine(str(tmp_path)) + proto, _, _ = _make_protocol(mod) + boundary = "----MANIFESTBOOLS" + # Build a docm-shaped attachment in-line. + import zipfile as _zf + import io as _io + import base64 as _b64 + zbuf = _io.BytesIO() + with _zf.ZipFile(zbuf, "w") as zf: + zf.writestr("[Content_Types].xml", "") + zf.writestr("word/vbaProject.bin", b"VBA") + encoded = _b64.b64encode(zbuf.getvalue()).decode() + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: macro", + f"Content-Type: multipart/mixed; boundary={boundary}", + "MIME-Version: 1.0", + "", + f"--{boundary}", + "Content-Type: text/plain", + "", + "see attached", + f"--{boundary}", + 'Content-Type: application/vnd.ms-word.document.macroEnabled.12; name="report.docm"', + 'Content-Disposition: attachment; filename="report.docm"', + "Content-Transfer-Encoding: base64", + "", + encoded, + f"--{boundary}--", + ".", + ) + events = _logged_events(mod) + rec = next(f for t, f in events if t == "message_stored") + import json as _json + manifest = _json.loads(rec["attachments_json"]) + assert len(manifest) == 1 + assert manifest[0]["macro_indicator"] is True + assert manifest[0]["encrypted"] is False + def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod): """With SMTP_QUARANTINE_DIR unset, message_accepted fires but no message_stored event and no files are written."""