feat(smtp): emit X-Mailer / Return-Path / dkim+spf / URLs on message_stored
The EmailLifter (R0041–R0048) keys on header-derived signals that the v0 _summarize_message did not extract. Add cheap Layer 2 projections inside the existing single-pass parse: * return_path / x_mailer — direct header reads, decoded RFC 2047 * dkim_signed / spf_pass — booleans derived from any Authentication-Results header (multiple lines tolerated; positive verdict on any line wins) * urls — http(s) URLs lifted from text/* body parts via a tight regex, deduplicated first-seen-wins, capped at 64 in the wire payload to bound the syslog SD value Heavyweight extraction (body simhash, office-macro detection, HTML-smuggling, password-protected archives, mal-hash-match, body_text projection) stays deferred per the EmailLifter heavyweight DEBT entry — those rules need privacy / extractor decisions before they ship.
This commit is contained in:
@@ -121,24 +121,79 @@ def _decode_header(raw: str | None) -> str:
|
||||
# accept SMTP drops unchanged: <iso_ts>_<sha12>_<basename>. The basename
|
||||
# always ends in .eml so operators can open it in any MUA.
|
||||
_STORED_AS_BASE_RE = re.compile(r"[^A-Za-z0-9._-]")
|
||||
# Body-URL extraction. Tight enough to skip stray text that happens to
|
||||
# start with "http"; loose enough to catch IDN punycode, query strings,
|
||||
# and the trailing-paren / trailing-period tokens that bare-URL regexes
|
||||
# typically over-capture. Anchored on whitespace / quote / angle-bracket
|
||||
# boundaries so URLs inside `<a href="...">` round-trip cleanly.
|
||||
_URL_RE = re.compile(r"https?://[^\s<>\"'\)\]]+")
|
||||
# Authentication-Results parsing. We only care about the binary
|
||||
# pass-or-not for dkim and spf — finer-grained verdicts (neutral /
|
||||
# softfail / temperror) are evidence at best and the EmailLifter does
|
||||
# not key on them.
|
||||
_DKIM_PASS_RE = re.compile(r"\bdkim\s*=\s*pass\b", re.IGNORECASE)
|
||||
_SPF_PASS_RE = re.compile(r"\bspf\s*=\s*pass\b", re.IGNORECASE)
|
||||
|
||||
|
||||
def _empty_summary() -> dict:
|
||||
return {
|
||||
"subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "",
|
||||
"message_id_hdr": "", "content_type": "",
|
||||
"return_path": "", "x_mailer": "",
|
||||
"dkim_signed": False, "spf_pass": False,
|
||||
"attachments": [], "urls": [],
|
||||
}
|
||||
|
||||
|
||||
def _extract_urls(msg: Message) -> list[str]:
|
||||
"""Walk text/* parts and return the unique http(s) URLs found.
|
||||
|
||||
Order is preserved (first-seen wins) so the lifter's IDN-punycode
|
||||
check and the SIEM evidence list are stable across runs. The walker
|
||||
intentionally skips non-text parts: HTML-smuggling decode of binary
|
||||
blobs is a heavyweight detector deferred to the EmailLifter follow-
|
||||
up DEBT entry, not in scope for the cheap projection.
|
||||
"""
|
||||
seen: dict[str, None] = {}
|
||||
for part in msg.walk():
|
||||
if part.is_multipart():
|
||||
continue
|
||||
ctype = (part.get_content_type() or "").lower()
|
||||
if not ctype.startswith("text/"):
|
||||
continue
|
||||
try:
|
||||
raw = part.get_payload(decode=True) or b""
|
||||
text = raw.decode("utf-8", errors="replace") if isinstance(raw, bytes) else ""
|
||||
except Exception:
|
||||
text = ""
|
||||
for match in _URL_RE.findall(text):
|
||||
# Strip trailing punctuation that frequently rides on URLs in
|
||||
# natural-language bodies ("see https://x.com.").
|
||||
url = match.rstrip(".,;:!?")
|
||||
if url and url not in seen:
|
||||
seen[url] = None
|
||||
return list(seen.keys())
|
||||
|
||||
|
||||
def _summarize_message(body: bytes, msg_id: str) -> dict:
|
||||
"""Parse the DATA body and extract forensic metadata.
|
||||
|
||||
Returns a dict with:
|
||||
subject, from_hdr, to_hdr, date_hdr, message_id_hdr, content_type,
|
||||
attachments: list of {filename, content_type, size, sha256}.
|
||||
subject, from_hdr, to_hdr, date_hdr, message_id_hdr,
|
||||
content_type, return_path, x_mailer, dkim_signed, spf_pass,
|
||||
attachments (list of {filename, content_type, size, sha256}),
|
||||
urls (list of http(s) URLs from text/* parts).
|
||||
|
||||
Headers are RFC 2047 decoded. Attachment hashing uses the *decoded*
|
||||
payload so operators can match against VT / MalwareBazaar directly.
|
||||
`dkim_signed` / `spf_pass` are derived from any
|
||||
``Authentication-Results:`` header line (multiple lines tolerated;
|
||||
a positive verdict on any line counts).
|
||||
"""
|
||||
try:
|
||||
msg: Message = message_from_bytes(body)
|
||||
except Exception:
|
||||
return {
|
||||
"subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "",
|
||||
"message_id_hdr": "", "content_type": "", "attachments": [],
|
||||
}
|
||||
return _empty_summary()
|
||||
|
||||
attachments: list[dict] = []
|
||||
for part in msg.walk():
|
||||
@@ -162,6 +217,9 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
|
||||
"sha256": hashlib.sha256(payload).hexdigest() if payload else "",
|
||||
})
|
||||
|
||||
auth_results = " | ".join(
|
||||
v for v in msg.get_all("Authentication-Results") or [] if v
|
||||
)
|
||||
return {
|
||||
"subject": _decode_header(msg.get("Subject")),
|
||||
"from_hdr": _decode_header(msg.get("From")),
|
||||
@@ -169,7 +227,12 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
|
||||
"date_hdr": _decode_header(msg.get("Date")),
|
||||
"message_id_hdr": _decode_header(msg.get("Message-ID")),
|
||||
"content_type": msg.get_content_type(),
|
||||
"return_path": _decode_header(msg.get("Return-Path")),
|
||||
"x_mailer": _decode_header(msg.get("X-Mailer")),
|
||||
"dkim_signed": bool(_DKIM_PASS_RE.search(auth_results)),
|
||||
"spf_pass": bool(_SPF_PASS_RE.search(auth_results)),
|
||||
"attachments": attachments,
|
||||
"urls": _extract_urls(msg),
|
||||
}
|
||||
|
||||
|
||||
@@ -289,11 +352,24 @@ class SMTPProtocol(asyncio.Protocol):
|
||||
date_hdr=summary["date_hdr"][:64],
|
||||
message_id_hdr=summary["message_id_hdr"][:256],
|
||||
content_type=summary["content_type"],
|
||||
# Header-derived signals consumed by EmailLifter
|
||||
# R0043 / R0044 / R0045. Truncated to bound the
|
||||
# SD-value size; the lifter only needs presence
|
||||
# + domain extraction.
|
||||
return_path=summary["return_path"][:256],
|
||||
x_mailer=summary["x_mailer"][:256],
|
||||
dkim_signed=int(summary["dkim_signed"]),
|
||||
spf_pass=int(summary["spf_pass"]),
|
||||
attachment_count=len(summary["attachments"]),
|
||||
# Full manifest (filename/sha256/size/content_type)
|
||||
# rides as a compact JSON blob — the SD-value escape
|
||||
# in syslog_bridge handles the quotes and brackets.
|
||||
attachments_json=json.dumps(summary["attachments"], separators=(",", ":")),
|
||||
# URL list extracted from text/* body parts;
|
||||
# capped at 64 entries to bound the syslog SD
|
||||
# value. Spam kits with hundreds of unique URLs
|
||||
# are rare and the cap is loud-friendly.
|
||||
urls_json=json.dumps(summary["urls"][:64], separators=(",", ":")),
|
||||
)
|
||||
# Real MTAs take tens of ms to queue; instantaneous replies
|
||||
# on DATA are a tell.
|
||||
|
||||
@@ -605,6 +605,66 @@ class TestMessageCapture:
|
||||
assert manifest[0]["sha256"] == _hashlib.sha256(payload).hexdigest()
|
||||
assert manifest[0]["size"] == len(payload)
|
||||
|
||||
def test_message_stored_carries_layer2_signals(self, tmp_path):
|
||||
"""Cheap Layer 2 fields the EmailLifter consumes (R0043 / R0044 /
|
||||
R0045): X-Mailer, Return-Path, Authentication-Results dkim/spf
|
||||
verdicts, and URLs lifted from text body parts."""
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
proto, _, _ = _make_protocol(mod)
|
||||
_send(
|
||||
proto,
|
||||
"EHLO x.com",
|
||||
"MAIL FROM:<spoof@evil.com>",
|
||||
"RCPT TO:<victim@target.com>",
|
||||
"DATA",
|
||||
"Subject: phish",
|
||||
"From: ceo@bigcorp.com",
|
||||
"Return-Path: <mailer@kit.evil>",
|
||||
"X-Mailer: PHPMailer 6.0.7",
|
||||
"Authentication-Results: relay.example; dkim=pass header.d=evil.com; spf=pass smtp.mailfrom=mailer@kit.evil",
|
||||
"",
|
||||
"Click https://xn--80ak6aa92e.example/login. and also http://safe.test/ok",
|
||||
".",
|
||||
)
|
||||
events = _logged_events(mod)
|
||||
stored = [f for t, f in events if t == "message_stored"]
|
||||
assert len(stored) == 1
|
||||
rec = stored[0]
|
||||
assert rec["x_mailer"] == "PHPMailer 6.0.7"
|
||||
assert rec["return_path"] == "<mailer@kit.evil>"
|
||||
assert rec["dkim_signed"] == 1
|
||||
assert rec["spf_pass"] == 1
|
||||
import json as _json
|
||||
urls = _json.loads(rec["urls_json"])
|
||||
assert "https://xn--80ak6aa92e.example/login" in urls
|
||||
assert "http://safe.test/ok" in urls
|
||||
|
||||
def test_message_stored_dkim_spf_default_false_when_no_auth_header(
|
||||
self, tmp_path,
|
||||
):
|
||||
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||
proto, _, _ = _make_protocol(mod)
|
||||
_send(
|
||||
proto,
|
||||
"EHLO x.com",
|
||||
"MAIL FROM:<a@b.com>",
|
||||
"RCPT TO:<c@d.com>",
|
||||
"DATA",
|
||||
"Subject: bare",
|
||||
"",
|
||||
"no auth header here",
|
||||
".",
|
||||
)
|
||||
events = _logged_events(mod)
|
||||
stored = [f for t, f in events if t == "message_stored"]
|
||||
rec = stored[0]
|
||||
assert rec["dkim_signed"] == 0
|
||||
assert rec["spf_pass"] == 0
|
||||
assert rec["x_mailer"] == ""
|
||||
assert rec["return_path"] == ""
|
||||
import json as _json
|
||||
assert _json.loads(rec["urls_json"]) == []
|
||||
|
||||
def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod):
|
||||
"""With SMTP_QUARANTINE_DIR unset, message_accepted fires but no
|
||||
message_stored event and no files are written."""
|
||||
|
||||
Reference in New Issue
Block a user