feat(smtp): emit X-Mailer / Return-Path / dkim+spf / URLs on message_stored

The EmailLifter (R0041–R0048) keys on header-derived signals that the
v0 _summarize_message did not extract. Add cheap Layer 2 projections
inside the existing single-pass parse:

* return_path / x_mailer — direct header reads, decoded RFC 2047
* dkim_signed / spf_pass — booleans derived from any
  Authentication-Results header (multiple lines tolerated; positive
  verdict on any line wins)
* urls — http(s) URLs lifted from text/* body parts via a tight
  regex, deduplicated first-seen-wins, capped at 64 in the wire
  payload to bound the syslog SD value

Heavyweight extraction (body simhash, office-macro detection,
HTML-smuggling, password-protected archives, mal-hash-match,
body_text projection) stays deferred per the EmailLifter heavyweight
DEBT entry — those rules need privacy / extractor decisions before
they ship.
This commit is contained in:
2026-05-02 18:37:11 -04:00
parent 2ce150a53e
commit e9324acac7
2 changed files with 142 additions and 6 deletions

View File

@@ -605,6 +605,66 @@ class TestMessageCapture:
assert manifest[0]["sha256"] == _hashlib.sha256(payload).hexdigest()
assert manifest[0]["size"] == len(payload)
def test_message_stored_carries_layer2_signals(self, tmp_path):
"""Cheap Layer 2 fields the EmailLifter consumes (R0043 / R0044 /
R0045): X-Mailer, Return-Path, Authentication-Results dkim/spf
verdicts, and URLs lifted from text body parts."""
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<spoof@evil.com>",
"RCPT TO:<victim@target.com>",
"DATA",
"Subject: phish",
"From: ceo@bigcorp.com",
"Return-Path: <mailer@kit.evil>",
"X-Mailer: PHPMailer 6.0.7",
"Authentication-Results: relay.example; dkim=pass header.d=evil.com; spf=pass smtp.mailfrom=mailer@kit.evil",
"",
"Click https://xn--80ak6aa92e.example/login. and also http://safe.test/ok",
".",
)
events = _logged_events(mod)
stored = [f for t, f in events if t == "message_stored"]
assert len(stored) == 1
rec = stored[0]
assert rec["x_mailer"] == "PHPMailer 6.0.7"
assert rec["return_path"] == "<mailer@kit.evil>"
assert rec["dkim_signed"] == 1
assert rec["spf_pass"] == 1
import json as _json
urls = _json.loads(rec["urls_json"])
assert "https://xn--80ak6aa92e.example/login" in urls
assert "http://safe.test/ok" in urls
def test_message_stored_dkim_spf_default_false_when_no_auth_header(
self, tmp_path,
):
mod = _load_smtp_with_quarantine(str(tmp_path))
proto, _, _ = _make_protocol(mod)
_send(
proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"Subject: bare",
"",
"no auth header here",
".",
)
events = _logged_events(mod)
stored = [f for t, f in events if t == "message_stored"]
rec = stored[0]
assert rec["dkim_signed"] == 0
assert rec["spf_pass"] == 0
assert rec["x_mailer"] == ""
assert rec["return_path"] == ""
import json as _json
assert _json.loads(rec["urls_json"]) == []
def test_capture_disabled_when_dir_unset(self, tmp_path, relay_mod):
"""With SMTP_QUARANTINE_DIR unset, message_accepted fires but no
message_stored event and no files are written."""