feat(smtp): emit X-Mailer / Return-Path / dkim+spf / URLs on message_stored
The EmailLifter (R0041–R0048) keys on header-derived signals that the v0 _summarize_message did not extract. Add cheap Layer 2 projections inside the existing single-pass parse: * return_path / x_mailer — direct header reads, decoded RFC 2047 * dkim_signed / spf_pass — booleans derived from any Authentication-Results header (multiple lines tolerated; positive verdict on any line wins) * urls — http(s) URLs lifted from text/* body parts via a tight regex, deduplicated first-seen-wins, capped at 64 in the wire payload to bound the syslog SD value Heavyweight extraction (body simhash, office-macro detection, HTML-smuggling, password-protected archives, mal-hash-match, body_text projection) stays deferred per the EmailLifter heavyweight DEBT entry — those rules need privacy / extractor decisions before they ship.
This commit is contained in:
@@ -121,24 +121,79 @@ def _decode_header(raw: str | None) -> str:
|
|||||||
# accept SMTP drops unchanged: <iso_ts>_<sha12>_<basename>. The basename
|
# accept SMTP drops unchanged: <iso_ts>_<sha12>_<basename>. The basename
|
||||||
# always ends in .eml so operators can open it in any MUA.
|
# always ends in .eml so operators can open it in any MUA.
|
||||||
_STORED_AS_BASE_RE = re.compile(r"[^A-Za-z0-9._-]")
|
_STORED_AS_BASE_RE = re.compile(r"[^A-Za-z0-9._-]")
|
||||||
|
# Body-URL extraction. Tight enough to skip stray text that happens to
|
||||||
|
# start with "http"; loose enough to catch IDN punycode, query strings,
|
||||||
|
# and the trailing-paren / trailing-period tokens that bare-URL regexes
|
||||||
|
# typically over-capture. Anchored on whitespace / quote / angle-bracket
|
||||||
|
# boundaries so URLs inside `<a href="...">` round-trip cleanly.
|
||||||
|
_URL_RE = re.compile(r"https?://[^\s<>\"'\)\]]+")
|
||||||
|
# Authentication-Results parsing. We only care about the binary
|
||||||
|
# pass-or-not for dkim and spf — finer-grained verdicts (neutral /
|
||||||
|
# softfail / temperror) are evidence at best and the EmailLifter does
|
||||||
|
# not key on them.
|
||||||
|
_DKIM_PASS_RE = re.compile(r"\bdkim\s*=\s*pass\b", re.IGNORECASE)
|
||||||
|
_SPF_PASS_RE = re.compile(r"\bspf\s*=\s*pass\b", re.IGNORECASE)
|
||||||
|
|
||||||
|
|
||||||
|
def _empty_summary() -> dict:
|
||||||
|
return {
|
||||||
|
"subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "",
|
||||||
|
"message_id_hdr": "", "content_type": "",
|
||||||
|
"return_path": "", "x_mailer": "",
|
||||||
|
"dkim_signed": False, "spf_pass": False,
|
||||||
|
"attachments": [], "urls": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_urls(msg: Message) -> list[str]:
|
||||||
|
"""Walk text/* parts and return the unique http(s) URLs found.
|
||||||
|
|
||||||
|
Order is preserved (first-seen wins) so the lifter's IDN-punycode
|
||||||
|
check and the SIEM evidence list are stable across runs. The walker
|
||||||
|
intentionally skips non-text parts: HTML-smuggling decode of binary
|
||||||
|
blobs is a heavyweight detector deferred to the EmailLifter follow-
|
||||||
|
up DEBT entry, not in scope for the cheap projection.
|
||||||
|
"""
|
||||||
|
seen: dict[str, None] = {}
|
||||||
|
for part in msg.walk():
|
||||||
|
if part.is_multipart():
|
||||||
|
continue
|
||||||
|
ctype = (part.get_content_type() or "").lower()
|
||||||
|
if not ctype.startswith("text/"):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
raw = part.get_payload(decode=True) or b""
|
||||||
|
text = raw.decode("utf-8", errors="replace") if isinstance(raw, bytes) else ""
|
||||||
|
except Exception:
|
||||||
|
text = ""
|
||||||
|
for match in _URL_RE.findall(text):
|
||||||
|
# Strip trailing punctuation that frequently rides on URLs in
|
||||||
|
# natural-language bodies ("see https://x.com.").
|
||||||
|
url = match.rstrip(".,;:!?")
|
||||||
|
if url and url not in seen:
|
||||||
|
seen[url] = None
|
||||||
|
return list(seen.keys())
|
||||||
|
|
||||||
|
|
||||||
def _summarize_message(body: bytes, msg_id: str) -> dict:
|
def _summarize_message(body: bytes, msg_id: str) -> dict:
|
||||||
"""Parse the DATA body and extract forensic metadata.
|
"""Parse the DATA body and extract forensic metadata.
|
||||||
|
|
||||||
Returns a dict with:
|
Returns a dict with:
|
||||||
subject, from_hdr, to_hdr, date_hdr, message_id_hdr, content_type,
|
subject, from_hdr, to_hdr, date_hdr, message_id_hdr,
|
||||||
attachments: list of {filename, content_type, size, sha256}.
|
content_type, return_path, x_mailer, dkim_signed, spf_pass,
|
||||||
|
attachments (list of {filename, content_type, size, sha256}),
|
||||||
|
urls (list of http(s) URLs from text/* parts).
|
||||||
|
|
||||||
Headers are RFC 2047 decoded. Attachment hashing uses the *decoded*
|
Headers are RFC 2047 decoded. Attachment hashing uses the *decoded*
|
||||||
payload so operators can match against VT / MalwareBazaar directly.
|
payload so operators can match against VT / MalwareBazaar directly.
|
||||||
|
`dkim_signed` / `spf_pass` are derived from any
|
||||||
|
``Authentication-Results:`` header line (multiple lines tolerated;
|
||||||
|
a positive verdict on any line counts).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
msg: Message = message_from_bytes(body)
|
msg: Message = message_from_bytes(body)
|
||||||
except Exception:
|
except Exception:
|
||||||
return {
|
return _empty_summary()
|
||||||
"subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "",
|
|
||||||
"message_id_hdr": "", "content_type": "", "attachments": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
attachments: list[dict] = []
|
attachments: list[dict] = []
|
||||||
for part in msg.walk():
|
for part in msg.walk():
|
||||||
@@ -162,6 +217,9 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
|
|||||||
"sha256": hashlib.sha256(payload).hexdigest() if payload else "",
|
"sha256": hashlib.sha256(payload).hexdigest() if payload else "",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
auth_results = " | ".join(
|
||||||
|
v for v in msg.get_all("Authentication-Results") or [] if v
|
||||||
|
)
|
||||||
return {
|
return {
|
||||||
"subject": _decode_header(msg.get("Subject")),
|
"subject": _decode_header(msg.get("Subject")),
|
||||||
"from_hdr": _decode_header(msg.get("From")),
|
"from_hdr": _decode_header(msg.get("From")),
|
||||||
@@ -169,7 +227,12 @@ def _summarize_message(body: bytes, msg_id: str) -> dict:
|
|||||||
"date_hdr": _decode_header(msg.get("Date")),
|
"date_hdr": _decode_header(msg.get("Date")),
|
||||||
"message_id_hdr": _decode_header(msg.get("Message-ID")),
|
"message_id_hdr": _decode_header(msg.get("Message-ID")),
|
||||||
"content_type": msg.get_content_type(),
|
"content_type": msg.get_content_type(),
|
||||||
|
"return_path": _decode_header(msg.get("Return-Path")),
|
||||||
|
"x_mailer": _decode_header(msg.get("X-Mailer")),
|
||||||
|
"dkim_signed": bool(_DKIM_PASS_RE.search(auth_results)),
|
||||||
|
"spf_pass": bool(_SPF_PASS_RE.search(auth_results)),
|
||||||
"attachments": attachments,
|
"attachments": attachments,
|
||||||
|
"urls": _extract_urls(msg),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -289,11 +352,24 @@ class SMTPProtocol(asyncio.Protocol):
|
|||||||
date_hdr=summary["date_hdr"][:64],
|
date_hdr=summary["date_hdr"][:64],
|
||||||
message_id_hdr=summary["message_id_hdr"][:256],
|
message_id_hdr=summary["message_id_hdr"][:256],
|
||||||
content_type=summary["content_type"],
|
content_type=summary["content_type"],
|
||||||
|
# Header-derived signals consumed by EmailLifter
|
||||||
|
# R0043 / R0044 / R0045. Truncated to bound the
|
||||||
|
# SD-value size; the lifter only needs presence
|
||||||
|
# + domain extraction.
|
||||||
|
return_path=summary["return_path"][:256],
|
||||||
|
x_mailer=summary["x_mailer"][:256],
|
||||||
|
dkim_signed=int(summary["dkim_signed"]),
|
||||||
|
spf_pass=int(summary["spf_pass"]),
|
||||||
attachment_count=len(summary["attachments"]),
|
attachment_count=len(summary["attachments"]),
|
||||||
# Full manifest (filename/sha256/size/content_type)
|
# Full manifest (filename/sha256/size/content_type)
|
||||||
# rides as a compact JSON blob — the SD-value escape
|
# rides as a compact JSON blob — the SD-value escape
|
||||||
# in syslog_bridge handles the quotes and brackets.
|
# in syslog_bridge handles the quotes and brackets.
|
||||||
attachments_json=json.dumps(summary["attachments"], separators=(",", ":")),
|
attachments_json=json.dumps(summary["attachments"], separators=(",", ":")),
|
||||||
|
# URL list extracted from text/* body parts;
|
||||||
|
# capped at 64 entries to bound the syslog SD
|
||||||
|
# value. Spam kits with hundreds of unique URLs
|
||||||
|
# are rare and the cap is loud-friendly.
|
||||||
|
urls_json=json.dumps(summary["urls"][:64], separators=(",", ":")),
|
||||||
)
|
)
|
||||||
# Real MTAs take tens of ms to queue; instantaneous replies
|
# Real MTAs take tens of ms to queue; instantaneous replies
|
||||||
# on DATA are a tell.
|
# on DATA are a tell.
|
||||||
|
|||||||
@@ -605,6 +605,66 @@ class TestMessageCapture:
|
|||||||
assert manifest[0]["sha256"] == _hashlib.sha256(payload).hexdigest()
|
assert manifest[0]["sha256"] == _hashlib.sha256(payload).hexdigest()
|
||||||
assert manifest[0]["size"] == len(payload)
|
assert manifest[0]["size"] == len(payload)
|
||||||
|
|
||||||
|
def test_message_stored_carries_layer2_signals(self, tmp_path):
|
||||||
|
"""Cheap Layer 2 fields the EmailLifter consumes (R0043 / R0044 /
|
||||||
|
R0045): X-Mailer, Return-Path, Authentication-Results dkim/spf
|
||||||
|
verdicts, and URLs lifted from text body parts."""
|
||||||
|
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||||
|
proto, _, _ = _make_protocol(mod)
|
||||||
|
_send(
|
||||||
|
proto,
|
||||||
|
"EHLO x.com",
|
||||||
|
"MAIL FROM:<spoof@evil.com>",
|
||||||
|
"RCPT TO:<victim@target.com>",
|
||||||
|
"DATA",
|
||||||
|
"Subject: phish",
|
||||||
|
"From: ceo@bigcorp.com",
|
||||||
|
"Return-Path: <mailer@kit.evil>",
|
||||||
|
"X-Mailer: PHPMailer 6.0.7",
|
||||||
|
"Authentication-Results: relay.example; dkim=pass header.d=evil.com; spf=pass smtp.mailfrom=mailer@kit.evil",
|
||||||
|
"",
|
||||||
|
"Click https://xn--80ak6aa92e.example/login. and also http://safe.test/ok",
|
||||||
|
".",
|
||||||
|
)
|
||||||
|
events = _logged_events(mod)
|
||||||
|
stored = [f for t, f in events if t == "message_stored"]
|
||||||
|
assert len(stored) == 1
|
||||||
|
rec = stored[0]
|
||||||
|
assert rec["x_mailer"] == "PHPMailer 6.0.7"
|
||||||
|
assert rec["return_path"] == "<mailer@kit.evil>"
|
||||||
|
assert rec["dkim_signed"] == 1
|
||||||
|
assert rec["spf_pass"] == 1
|
||||||
|
import json as _json
|
||||||
|
urls = _json.loads(rec["urls_json"])
|
||||||
|
assert "https://xn--80ak6aa92e.example/login" in urls
|
||||||
|
assert "http://safe.test/ok" in urls
|
||||||
|
|
||||||
|
def test_message_stored_dkim_spf_default_false_when_no_auth_header(
|
||||||
|
self, tmp_path,
|
||||||
|
):
|
||||||
|
mod = _load_smtp_with_quarantine(str(tmp_path))
|
||||||
|
proto, _, _ = _make_protocol(mod)
|
||||||
|
_send(
|
||||||
|
proto,
|
||||||
|
"EHLO x.com",
|
||||||
|
"MAIL FROM:<a@b.com>",
|
||||||
|
"RCPT TO:<c@d.com>",
|
||||||
|
"DATA",
|
||||||
|
"Subject: bare",
|
||||||
|
"",
|
||||||
|
"no auth header here",
|
||||||
|
".",
|
||||||
|
)
|
||||||
|
events = _logged_events(mod)
|
||||||
|
stored = [f for t, f in events if t == "message_stored"]
|
||||||
|
rec = stored[0]
|
||||||
|
assert rec["dkim_signed"] == 0
|
||||||
|
assert rec["spf_pass"] == 0
|
||||||
|
assert rec["x_mailer"] == ""
|
||||||
|
assert rec["return_path"] == ""
|
||||||
|
import json as _json
|
||||||
|
assert _json.loads(rec["urls_json"]) == []
|
||||||
|
|
||||||
def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod):
|
def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod):
|
||||||
"""With SMTP_QUARANTINE_DIR unset, message_accepted fires but no
|
"""With SMTP_QUARANTINE_DIR unset, message_accepted fires but no
|
||||||
message_stored event and no files are written."""
|
message_stored event and no files are written."""
|
||||||
|
|||||||
Reference in New Issue
Block a user