diff --git a/decnet/templates/smtp/server.py b/decnet/templates/smtp/server.py index fbc888ef..d3b0d422 100644 --- a/decnet/templates/smtp/server.py +++ b/decnet/templates/smtp/server.py @@ -121,24 +121,79 @@ def _decode_header(raw: str | None) -> str: # accept SMTP drops unchanged: __. The basename # always ends in .eml so operators can open it in any MUA. _STORED_AS_BASE_RE = re.compile(r"[^A-Za-z0-9._-]") +# Body-URL extraction. Tight enough to skip stray text that happens to +# start with "http"; loose enough to catch IDN punycode, query strings, +# and the trailing-paren / trailing-period tokens that bare-URL regexes +# typically over-capture. Anchored on whitespace / quote / angle-bracket +# boundaries so URLs inside `` round-trip cleanly. +_URL_RE = re.compile(r"https?://[^\s<>\"'\)\]]+") +# Authentication-Results parsing. We only care about the binary +# pass-or-not for dkim and spf — finer-grained verdicts (neutral / +# softfail / temperror) are evidence at best and the EmailLifter does +# not key on them. +_DKIM_PASS_RE = re.compile(r"\bdkim\s*=\s*pass\b", re.IGNORECASE) +_SPF_PASS_RE = re.compile(r"\bspf\s*=\s*pass\b", re.IGNORECASE) + + +def _empty_summary() -> dict: + return { + "subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "", + "message_id_hdr": "", "content_type": "", + "return_path": "", "x_mailer": "", + "dkim_signed": False, "spf_pass": False, + "attachments": [], "urls": [], + } + + +def _extract_urls(msg: Message) -> list[str]: + """Walk text/* parts and return the unique http(s) URLs found. + + Order is preserved (first-seen wins) so the lifter's IDN-punycode + check and the SIEM evidence list are stable across runs. The walker + intentionally skips non-text parts: HTML-smuggling decode of binary + blobs is a heavyweight detector deferred to the EmailLifter follow- + up DEBT entry, not in scope for the cheap projection. + """ + seen: dict[str, None] = {} + for part in msg.walk(): + if part.is_multipart(): + continue + ctype = (part.get_content_type() or "").lower() + if not ctype.startswith("text/"): + continue + try: + raw = part.get_payload(decode=True) or b"" + text = raw.decode("utf-8", errors="replace") if isinstance(raw, bytes) else "" + except Exception: + text = "" + for match in _URL_RE.findall(text): + # Strip trailing punctuation that frequently rides on URLs in + # natural-language bodies ("see https://x.com."). + url = match.rstrip(".,;:!?") + if url and url not in seen: + seen[url] = None + return list(seen.keys()) def _summarize_message(body: bytes, msg_id: str) -> dict: """Parse the DATA body and extract forensic metadata. Returns a dict with: - subject, from_hdr, to_hdr, date_hdr, message_id_hdr, content_type, - attachments: list of {filename, content_type, size, sha256}. + subject, from_hdr, to_hdr, date_hdr, message_id_hdr, + content_type, return_path, x_mailer, dkim_signed, spf_pass, + attachments (list of {filename, content_type, size, sha256}), + urls (list of http(s) URLs from text/* parts). + Headers are RFC 2047 decoded. Attachment hashing uses the *decoded* payload so operators can match against VT / MalwareBazaar directly. + `dkim_signed` / `spf_pass` are derived from any + ``Authentication-Results:`` header line (multiple lines tolerated; + a positive verdict on any line counts). """ try: msg: Message = message_from_bytes(body) except Exception: - return { - "subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "", - "message_id_hdr": "", "content_type": "", "attachments": [], - } + return _empty_summary() attachments: list[dict] = [] for part in msg.walk(): @@ -162,6 +217,9 @@ def _summarize_message(body: bytes, msg_id: str) -> dict: "sha256": hashlib.sha256(payload).hexdigest() if payload else "", }) + auth_results = " | ".join( + v for v in msg.get_all("Authentication-Results") or [] if v + ) return { "subject": _decode_header(msg.get("Subject")), "from_hdr": _decode_header(msg.get("From")), @@ -169,7 +227,12 @@ def _summarize_message(body: bytes, msg_id: str) -> dict: "date_hdr": _decode_header(msg.get("Date")), "message_id_hdr": _decode_header(msg.get("Message-ID")), "content_type": msg.get_content_type(), + "return_path": _decode_header(msg.get("Return-Path")), + "x_mailer": _decode_header(msg.get("X-Mailer")), + "dkim_signed": bool(_DKIM_PASS_RE.search(auth_results)), + "spf_pass": bool(_SPF_PASS_RE.search(auth_results)), "attachments": attachments, + "urls": _extract_urls(msg), } @@ -289,11 +352,24 @@ class SMTPProtocol(asyncio.Protocol): date_hdr=summary["date_hdr"][:64], message_id_hdr=summary["message_id_hdr"][:256], content_type=summary["content_type"], + # Header-derived signals consumed by EmailLifter + # R0043 / R0044 / R0045. Truncated to bound the + # SD-value size; the lifter only needs presence + # + domain extraction. + return_path=summary["return_path"][:256], + x_mailer=summary["x_mailer"][:256], + dkim_signed=int(summary["dkim_signed"]), + spf_pass=int(summary["spf_pass"]), attachment_count=len(summary["attachments"]), # Full manifest (filename/sha256/size/content_type) # rides as a compact JSON blob — the SD-value escape # in syslog_bridge handles the quotes and brackets. attachments_json=json.dumps(summary["attachments"], separators=(",", ":")), + # URL list extracted from text/* body parts; + # capped at 64 entries to bound the syslog SD + # value. Spam kits with hundreds of unique URLs + # are rare and the cap is loud-friendly. + urls_json=json.dumps(summary["urls"][:64], separators=(",", ":")), ) # Real MTAs take tens of ms to queue; instantaneous replies # on DATA are a tell. diff --git a/tests/service_testing/test_smtp.py b/tests/service_testing/test_smtp.py index 240d45a4..58a80aa1 100644 --- a/tests/service_testing/test_smtp.py +++ b/tests/service_testing/test_smtp.py @@ -605,6 +605,66 @@ class TestMessageCapture: assert manifest[0]["sha256"] == _hashlib.sha256(payload).hexdigest() assert manifest[0]["size"] == len(payload) + def test_message_stored_carries_layer2_signals(self, tmp_path): + """Cheap Layer 2 fields the EmailLifter consumes (R0043 / R0044 / + R0045): X-Mailer, Return-Path, Authentication-Results dkim/spf + verdicts, and URLs lifted from text body parts.""" + mod = _load_smtp_with_quarantine(str(tmp_path)) + proto, _, _ = _make_protocol(mod) + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: phish", + "From: ceo@bigcorp.com", + "Return-Path: ", + "X-Mailer: PHPMailer 6.0.7", + "Authentication-Results: relay.example; dkim=pass header.d=evil.com; spf=pass smtp.mailfrom=mailer@kit.evil", + "", + "Click https://xn--80ak6aa92e.example/login. and also http://safe.test/ok", + ".", + ) + events = _logged_events(mod) + stored = [f for t, f in events if t == "message_stored"] + assert len(stored) == 1 + rec = stored[0] + assert rec["x_mailer"] == "PHPMailer 6.0.7" + assert rec["return_path"] == "" + assert rec["dkim_signed"] == 1 + assert rec["spf_pass"] == 1 + import json as _json + urls = _json.loads(rec["urls_json"]) + assert "https://xn--80ak6aa92e.example/login" in urls + assert "http://safe.test/ok" in urls + + def test_message_stored_dkim_spf_default_false_when_no_auth_header( + self, tmp_path, + ): + mod = _load_smtp_with_quarantine(str(tmp_path)) + proto, _, _ = _make_protocol(mod) + _send( + proto, + "EHLO x.com", + "MAIL FROM:", + "RCPT TO:", + "DATA", + "Subject: bare", + "", + "no auth header here", + ".", + ) + events = _logged_events(mod) + stored = [f for t, f in events if t == "message_stored"] + rec = stored[0] + assert rec["dkim_signed"] == 0 + assert rec["spf_pass"] == 0 + assert rec["x_mailer"] == "" + assert rec["return_path"] == "" + import json as _json + assert _json.loads(rec["urls_json"]) == [] + def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod): """With SMTP_QUARANTINE_DIR unset, message_accepted fires but no message_stored event and no files are written."""