diff --git a/decnet/ttp/impl/email_lifter.py b/decnet/ttp/impl/email_lifter.py index 86e7f6ba..cb9c8d81 100644 --- a/decnet/ttp/impl/email_lifter.py +++ b/decnet/ttp/impl/email_lifter.py @@ -19,11 +19,16 @@ from __future__ import annotations import base64 import binascii +import email +import email.errors +import email.message +import email.policy import hashlib import re from collections.abc import Callable from typing import Any, Final +from decnet.artifacts.paths import ArtifactPathError, resolve_artifact_path from decnet.ttp.base import TaggerEvent, TolerantTagger from decnet.ttp.impl._emit import emit_tags from decnet.ttp.impl._rule_index import RuleIndex @@ -241,12 +246,78 @@ def _p_malicious_attachment( return None +def _extract_body_text(msg: email.message.EmailMessage) -> str | None: + """Best-effort plain-text body extraction from a parsed email. + + Prefers ``text/plain``. Falls back to ``text/html`` (raw — predicates + here are substring-matchers, no need to de-tag). Returns None when + the message has no readable text part. Requires the message to have + been parsed with ``policy=email.policy.default`` so parts are + ``EmailMessage`` instances (``get_content`` is policy-conditional). + """ + candidates: list[email.message.EmailMessage] = list(msg.walk()) + for content_type in ("text/plain", "text/html"): + for part in candidates: + if part.get_content_type() != content_type: + continue + try: + content = part.get_content() + except (LookupError, ValueError, KeyError): + continue + if isinstance(content, str): + return content + return None + + +def _load_body_text(payload: dict[str, Any]) -> str | None: + """Return the email body text for predicates that need it. + + If the bus payload already carries ``body_text`` (older deployments + or master-side producers), use it. Otherwise disk-reach: open the + ``.eml`` from ``/var/lib/decnet/artifacts/{decky_id}/smtp/{stored_as}`` + and parse the body in-process. + + The decoded body is memoized back into the payload dict so the next + predicate on the same event reuses it without re-opening the file. + The bus envelope only carries the artifact pointer (``decky_id`` + + ``stored_as``); raw body bytes never cross the host boundary + (DEBT-047). Returns None on any failure — predicates then short + circuit to no-match, matching pre-disk-reach behavior when fields + were absent. + """ + existing = payload.get("body_text") + if isinstance(existing, str): + return existing + decky_id = payload.get("decky_id") + stored_as = payload.get("stored_as") + if not isinstance(decky_id, str) or not isinstance(stored_as, str): + return None + try: + path = resolve_artifact_path(decky_id, stored_as, "smtp") + except ArtifactPathError: + return None + try: + with open(path, "rb") as fh: + msg = email.message_from_binary_file( + fh, policy=email.policy.default, + ) + except (OSError, email.errors.MessageError): + return None + body = _extract_body_text(msg) + if body is None: + return None + payload["body_text"] = body + return body + + def _p_bec( spec: dict[str, Any], payload: dict[str, Any], ) -> dict[str, Any] | None: subject = payload.get("subject") - body_text = payload.get("body_text") - if not isinstance(subject, str) or not isinstance(body_text, str): + if not isinstance(subject, str): + return None + body_text = _load_body_text(payload) + if body_text is None: return None subj_kws = spec.get("subject_keywords", []) body_kws = spec.get("body_action_keywords", []) @@ -278,8 +349,8 @@ def _p_encoded_payload( spec: dict[str, Any], payload: dict[str, Any], ) -> dict[str, Any] | None: min_bytes = int(spec.get("min_bytes", 4096)) - body_text = payload.get("body_text") - if not isinstance(body_text, str) or not body_text: + body_text = _load_body_text(payload) + if not body_text: return None # Upstream may pre-compute the largest decoded base64 length. body_b64_bytes = payload.get("body_base64_bytes") diff --git a/tests/ttp/test_email_lifter_disk_reach.py b/tests/ttp/test_email_lifter_disk_reach.py new file mode 100644 index 00000000..794a5c1d --- /dev/null +++ b/tests/ttp/test_email_lifter_disk_reach.py @@ -0,0 +1,150 @@ +"""Disk-reach tests for EmailLifter (DEBT-047). + +When the bus payload omits ``body_text`` but carries ``decky_id`` + +``stored_as``, body-aware predicates (R0047 BEC, encoded-payload) must +open the stored ``.eml`` from the artifact tree and parse the body +in-process. Bus carries only the pointer; raw body bytes stay on +host disk. +""" + +from __future__ import annotations + +from email.message import EmailMessage + +import pytest + +from decnet.artifacts import paths as artifact_paths +from decnet.ttp.impl import email_lifter as lifter_mod + + +_DECKY = "test-decky-01" +_STORED_AS = "2026-04-18T02:22:56Z_abc123def456_msg.eml" + + +def _write_eml(root, body_text, *, content_type="text/plain"): + msg = EmailMessage() + msg["From"] = "alice@evil.example" + msg["To"] = "victim@target.example" + msg["Subject"] = "URGENT: wire transfer needed" + if content_type == "text/plain": + msg.set_content(body_text) + else: + msg.set_content("plain fallback") + msg.add_alternative(body_text, subtype="html") + smtp_dir = root / _DECKY / "smtp" + smtp_dir.mkdir(parents=True, exist_ok=True) + p = smtp_dir / _STORED_AS + p.write_bytes(bytes(msg)) + return p + + +@pytest.fixture +def root(tmp_path, monkeypatch): + monkeypatch.setattr(artifact_paths, "ARTIFACTS_ROOT", tmp_path) + return tmp_path + + +_BEC_SPEC = { + "subject_keywords": ["wire transfer", "urgent"], + "body_action_keywords": ["bank", "iban", "account"], +} + + +def test_p_bec_matches_via_disk_reach(root): + _write_eml( + root, "Please update our bank account / IBAN before EOD.", + ) + payload = { + "subject": "URGENT: wire transfer needed", + "decky_id": _DECKY, + "stored_as": _STORED_AS, + } + result = lifter_mod._p_bec(_BEC_SPEC, payload) + assert result is not None + assert result["matched_subject_kw"] == "wire transfer" + assert result["matched_body_kw"] in {"bank", "iban"} + # Helper must have memoized the body back into the payload. + assert "bank" in payload["body_text"].lower() + + +def test_p_bec_no_match_when_eml_missing(root): + payload = { + "subject": "URGENT: wire transfer needed", + "decky_id": _DECKY, + "stored_as": _STORED_AS, + } + assert lifter_mod._p_bec(_BEC_SPEC, payload) is None + + +def test_p_bec_no_match_without_pointer(root): + payload = {"subject": "URGENT: wire transfer needed"} + assert lifter_mod._p_bec(_BEC_SPEC, payload) is None + + +def test_inline_body_text_takes_precedence(root, monkeypatch): + """If the producer ships body_text inline, no file IO happens.""" + sentinel = "Please remit IBAN bank details now." + payload = { + "subject": "URGENT: wire transfer needed", + "body_text": sentinel, + "decky_id": _DECKY, + "stored_as": _STORED_AS, + } + + def _explode(*a, **kw): + raise AssertionError("disk-reach must not run when body_text inline") + + monkeypatch.setattr(lifter_mod, "resolve_artifact_path", _explode) + res = lifter_mod._p_bec(_BEC_SPEC, payload) + assert res is not None + + +def test_body_cache_avoids_second_open(root, monkeypatch): + _write_eml(root, "wire to our bank IBAN now") + opens: list[str] = [] + real_open = lifter_mod.email.message_from_binary_file + + def _spy(fh, *a, **kw): + opens.append("opened") + return real_open(fh, *a, **kw) + + monkeypatch.setattr( + lifter_mod.email, "message_from_binary_file", _spy, + ) + payload = { + "subject": "URGENT: wire transfer needed", + "decky_id": _DECKY, + "stored_as": _STORED_AS, + } + lifter_mod._p_bec(_BEC_SPEC, payload) + lifter_mod._p_bec(_BEC_SPEC, payload) + assert len(opens) == 1 + + +def test_html_fallback_when_no_text_plain(tmp_path, monkeypatch): + monkeypatch.setattr(artifact_paths, "ARTIFACTS_ROOT", tmp_path) + smtp = tmp_path / _DECKY / "smtp" + smtp.mkdir(parents=True) + raw = ( + b"From: a@b\r\nTo: c@d\r\nSubject: t\r\n" + b"Content-Type: text/html; charset=utf-8\r\n\r\n" + b"please send our IBAN bank info" + ) + (smtp / _STORED_AS).write_bytes(raw) + payload = { + "subject": "URGENT: wire transfer needed", + "decky_id": _DECKY, + "stored_as": _STORED_AS, + } + result = lifter_mod._p_bec(_BEC_SPEC, payload) + assert result is not None + + +def test_invalid_pointer_rejected(root): + """Bad decky/stored_as values must not crash and must yield no body.""" + payload = { + "subject": "URGENT: wire transfer needed", + "decky_id": "../etc", + "stored_as": _STORED_AS, + } + assert lifter_mod._p_bec(_BEC_SPEC, payload) is None