feat(smtp): emit X-Mailer / Return-Path / dkim+spf / URLs on message_stored

The EmailLifter (R0041–R0048) keys on header-derived signals that the
v0 _summarize_message did not extract. Add cheap Layer 2 projections
inside the existing single-pass parse:

* return_path / x_mailer — direct header reads, decoded RFC 2047
* dkim_signed / spf_pass — booleans derived from any
  Authentication-Results header (multiple lines tolerated; positive
  verdict on any line wins)
* urls — http(s) URLs lifted from text/* body parts via a tight
  regex, deduplicated first-seen-wins, capped at 64 in the wire
  payload to bound the syslog SD value

Heavyweight extraction (body simhash, office-macro detection,
HTML-smuggling, password-protected archives, mal-hash-match,
body_text projection) stays deferred per the EmailLifter heavyweight
DEBT entry — those rules need privacy / extractor decisions before
they ship.
This commit is contained in:
2026-05-02 18:37:11 -04:00
parent 2ce150a53e
commit e9324acac7
2 changed files with 142 additions and 6 deletions

View File

@@ -121,24 +121,79 @@ def _decode_header(raw: str | None) -> str:
# accept SMTP drops unchanged: <iso_ts>_<sha12>_<basename>. The basename # accept SMTP drops unchanged: <iso_ts>_<sha12>_<basename>. The basename
# always ends in .eml so operators can open it in any MUA. # always ends in .eml so operators can open it in any MUA.
_STORED_AS_BASE_RE = re.compile(r"[^A-Za-z0-9._-]") _STORED_AS_BASE_RE = re.compile(r"[^A-Za-z0-9._-]")
# Body-URL extraction. Tight enough to skip stray text that happens to
# start with "http"; loose enough to catch IDN punycode, query strings,
# and the trailing-paren / trailing-period tokens that bare-URL regexes
# typically over-capture. Anchored on whitespace / quote / angle-bracket
# boundaries so URLs inside `<a href="...">` round-trip cleanly.
_URL_RE = re.compile(r"https?://[^\s<>\"'\)\]]+")
# Authentication-Results parsing. We only care about the binary
# pass-or-not for dkim and spf — finer-grained verdicts (neutral /
# softfail / temperror) are evidence at best and the EmailLifter does
# not key on them.
_DKIM_PASS_RE = re.compile(r"\bdkim\s*=\s*pass\b", re.IGNORECASE)
_SPF_PASS_RE = re.compile(r"\bspf\s*=\s*pass\b", re.IGNORECASE)
def _empty_summary() -> dict:
return {
"subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "",
"message_id_hdr": "", "content_type": "",
"return_path": "", "x_mailer": "",
"dkim_signed": False, "spf_pass": False,
"attachments": [], "urls": [],
}
def _extract_urls(msg: Message) -> list[str]:
"""Walk text/* parts and return the unique http(s) URLs found.
Order is preserved (first-seen wins) so the lifter's IDN-punycode
check and the SIEM evidence list are stable across runs. The walker
intentionally skips non-text parts: HTML-smuggling decode of binary
blobs is a heavyweight detector deferred to the EmailLifter follow-
up DEBT entry, not in scope for the cheap projection.
"""
seen: dict[str, None] = {}
for part in msg.walk():
if part.is_multipart():
continue
ctype = (part.get_content_type() or "").lower()
if not ctype.startswith("text/"):
continue
try:
raw = part.get_payload(decode=True) or b""
text = raw.decode("utf-8", errors="replace") if isinstance(raw, bytes) else ""
except Exception:
text = ""
for match in _URL_RE.findall(text):
# Strip trailing punctuation that frequently rides on URLs in
# natural-language bodies ("see https://x.com.").
url = match.rstrip(".,;:!?")
if url and url not in seen:
seen[url] = None
return list(seen.keys())
def _summarize_message(body: bytes, msg_id: str) -> dict: def _summarize_message(body: bytes, msg_id: str) -> dict:
"""Parse the DATA body and extract forensic metadata. """Parse the DATA body and extract forensic metadata.
Returns a dict with: Returns a dict with:
subject, from_hdr, to_hdr, date_hdr, message_id_hdr, content_type, subject, from_hdr, to_hdr, date_hdr, message_id_hdr,
attachments: list of {filename, content_type, size, sha256}. content_type, return_path, x_mailer, dkim_signed, spf_pass,
attachments (list of {filename, content_type, size, sha256}),
urls (list of http(s) URLs from text/* parts).
Headers are RFC 2047 decoded. Attachment hashing uses the *decoded* Headers are RFC 2047 decoded. Attachment hashing uses the *decoded*
payload so operators can match against VT / MalwareBazaar directly. payload so operators can match against VT / MalwareBazaar directly.
`dkim_signed` / `spf_pass` are derived from any
``Authentication-Results:`` header line (multiple lines tolerated;
a positive verdict on any line counts).
""" """
try: try:
msg: Message = message_from_bytes(body) msg: Message = message_from_bytes(body)
except Exception: except Exception:
return { return _empty_summary()
"subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "",
"message_id_hdr": "", "content_type": "", "attachments": [],
}
attachments: list[dict] = [] attachments: list[dict] = []
for part in msg.walk(): for part in msg.walk():
@@ -162,6 +217,9 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
"sha256": hashlib.sha256(payload).hexdigest() if payload else "", "sha256": hashlib.sha256(payload).hexdigest() if payload else "",
}) })
auth_results = " | ".join(
v for v in msg.get_all("Authentication-Results") or [] if v
)
return { return {
"subject": _decode_header(msg.get("Subject")), "subject": _decode_header(msg.get("Subject")),
"from_hdr": _decode_header(msg.get("From")), "from_hdr": _decode_header(msg.get("From")),
@@ -169,7 +227,12 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
"date_hdr": _decode_header(msg.get("Date")), "date_hdr": _decode_header(msg.get("Date")),
"message_id_hdr": _decode_header(msg.get("Message-ID")), "message_id_hdr": _decode_header(msg.get("Message-ID")),
"content_type": msg.get_content_type(), "content_type": msg.get_content_type(),
"return_path": _decode_header(msg.get("Return-Path")),
"x_mailer": _decode_header(msg.get("X-Mailer")),
"dkim_signed": bool(_DKIM_PASS_RE.search(auth_results)),
"spf_pass": bool(_SPF_PASS_RE.search(auth_results)),
"attachments": attachments, "attachments": attachments,
"urls": _extract_urls(msg),
} }
@@ -289,11 +352,24 @@ class SMTPProtocol(asyncio.Protocol):
date_hdr=summary["date_hdr"][:64], date_hdr=summary["date_hdr"][:64],
message_id_hdr=summary["message_id_hdr"][:256], message_id_hdr=summary["message_id_hdr"][:256],
content_type=summary["content_type"], content_type=summary["content_type"],
# Header-derived signals consumed by EmailLifter
# R0043 / R0044 / R0045. Truncated to bound the
# SD-value size; the lifter only needs presence
# + domain extraction.
return_path=summary["return_path"][:256],
x_mailer=summary["x_mailer"][:256],
dkim_signed=int(summary["dkim_signed"]),
spf_pass=int(summary["spf_pass"]),
attachment_count=len(summary["attachments"]), attachment_count=len(summary["attachments"]),
# Full manifest (filename/sha256/size/content_type) # Full manifest (filename/sha256/size/content_type)
# rides as a compact JSON blob — the SD-value escape # rides as a compact JSON blob — the SD-value escape
# in syslog_bridge handles the quotes and brackets. # in syslog_bridge handles the quotes and brackets.
attachments_json=json.dumps(summary["attachments"], separators=(",", ":")), attachments_json=json.dumps(summary["attachments"], separators=(",", ":")),
# URL list extracted from text/* body parts;
# capped at 64 entries to bound the syslog SD
# value. Spam kits with hundreds of unique URLs
# are rare and the cap is loud-friendly.
urls_json=json.dumps(summary["urls"][:64], separators=(",", ":")),
) )
# Real MTAs take tens of ms to queue; instantaneous replies # Real MTAs take tens of ms to queue; instantaneous replies
# on DATA are a tell. # on DATA are a tell.

View File

@@ -605,6 +605,66 @@ class TestMessageCapture:
assert manifest[0]["sha256"] == _hashlib.sha256(payload).hexdigest() assert manifest[0]["sha256"] == _hashlib.sha256(payload).hexdigest()
assert manifest[0]["size"] == len(payload) assert manifest[0]["size"] == len(payload)
def test_message_stored_carries_layer2_signals(self, tmp_path):
"""Cheap Layer 2 fields the EmailLifter consumes (R0043 / R0044 /
R0045): X-Mailer, Return-Path, Authentication-Results dkim/spf
verdicts, and URLs lifted from text body parts."""
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<spoof@evil.com>",
"RCPT TO:<victim@target.com>",
"DATA",
"Subject: phish",
"From: ceo@bigcorp.com",
"Return-Path: <mailer@kit.evil>",
"X-Mailer: PHPMailer 6.0.7",
"Authentication-Results: relay.example; dkim=pass header.d=evil.com; spf=pass smtp.mailfrom=mailer@kit.evil",
"",
"Click https://xn--80ak6aa92e.example/login. and also http://safe.test/ok",
".",
)
events = _logged_events(mod)
stored = [f for t, f in events if t == "message_stored"]
assert len(stored) == 1
rec = stored[0]
assert rec["x_mailer"] == "PHPMailer 6.0.7"
assert rec["return_path"] == "<mailer@kit.evil>"
assert rec["dkim_signed"] == 1
assert rec["spf_pass"] == 1
import json as _json
urls = _json.loads(rec["urls_json"])
assert "https://xn--80ak6aa92e.example/login" in urls
assert "http://safe.test/ok" in urls
def test_message_stored_dkim_spf_default_false_when_no_auth_header(
self, tmp_path,
):
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"Subject: bare",
"",
"no auth header here",
".",
)
events = _logged_events(mod)
stored = [f for t, f in events if t == "message_stored"]
rec = stored[0]
assert rec["dkim_signed"] == 0
assert rec["spf_pass"] == 0
assert rec["x_mailer"] == ""
assert rec["return_path"] == ""
import json as _json
assert _json.loads(rec["urls_json"]) == []
def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod): def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod):
"""With SMTP_QUARANTINE_DIR unset, message_accepted fires but no """With SMTP_QUARANTINE_DIR unset, message_accepted fires but no
message_stored event and no files are written.""" message_stored event and no files are written."""