diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py
index bbf62482..35eecf6e 100644
--- a/decnet/web/ingester.py
+++ b/decnet/web/ingester.py
@@ -613,6 +613,13 @@ async def _extract_bounty(
"content_type": _fields.get("content_type"),
},
})
+ # Fan the captured email out to the TTP worker — same hook
+ # ``attacker.intel.enriched`` uses, but for the EmailLifter
+ # (R0041–R0048). Always fires for both relay and non-relay
+ # services; the TTP path doesn't care which mode the decky was
+ # in. Best-effort — bus-down or unresolved attacker UUID
+ # downgrades to a no-op silently.
+ await _publish_email_received(repo, log_data, _fields)
# Signal the realism worker to forward this as a probe if it's the
# first message from this IP on an smtp_relay decky. The worker has
# real internet access (the container is on MACVLAN and doesn't).
@@ -620,6 +627,153 @@ async def _extract_bounty(
await _publish_probe_pending(log_data, _fields)
+_RCPT_SPLIT_RE = re.compile(r"[,\s]+")
+_ADDR_AT_RE = re.compile(r"@([A-Za-z0-9.\-]+)")
+
+
+def _domain_of(addr_or_header: str | None) -> str | None:
+ """Extract the trailing domain from an address-like string.
+
+ Tolerant of full RFC 5322 headers (``"Name" ``), bare
+ addresses (``a@b.com``), and angle-bracketed envelope values
+ (````). Returns the domain lowercased, or ``None`` when
+ no ``@``-delimited domain is present. The EmailLifter expects
+ just the domain — display names and local-parts ride in the
+ artifact, not on the bus.
+ """
+ if not addr_or_header:
+ return None
+ match = _ADDR_AT_RE.search(addr_or_header)
+ if not match:
+ return None
+ domain = match.group(1).strip(".").lower()
+ return domain or None
+
+
+def _rcpt_projection(rcpt_to_field: str | None) -> tuple[int, list[str]]:
+ """Split the comma-or-whitespace-separated RCPT TO blob into
+ ``(count, unique_domains_first_seen_order)``."""
+ if not rcpt_to_field:
+ return 0, []
+ parts = [p for p in _RCPT_SPLIT_RE.split(rcpt_to_field) if p]
+ domains: dict[str, None] = {}
+ for part in parts:
+ domain = _domain_of(part)
+ if domain:
+ domains.setdefault(domain, None)
+ return len(parts), list(domains.keys())
+
+
+def _attachment_extensions(manifest: list[dict]) -> list[str]:
+ """Return unique lowercased file extensions (with dot) from the
+ attachment manifest, preserving first-seen order. ``"payload.exe"``
+ → ``".exe"``; missing / dotless filenames are skipped."""
+ seen: dict[str, None] = {}
+ for entry in manifest:
+ if not isinstance(entry, dict):
+ continue
+ filename = entry.get("filename") or ""
+ if not isinstance(filename, str):
+ continue
+ idx = filename.rfind(".")
+ if idx < 0 or idx == len(filename) - 1:
+ continue
+ ext = filename[idx:].lower()
+ seen.setdefault(ext, None)
+ return list(seen.keys())
+
+
+async def _publish_email_received(
+ repo: BaseRepository, log_data: dict, fields: dict,
+) -> None:
+ """Project the SMTP capture event onto the EmailLifter wire contract.
+
+ Mirrors :func:`_publish_probe_pending`: a fresh bus connection per
+ publish, fire-and-forget, all exceptions swallowed (the bus is the
+ notification layer, not the source of truth). Producer for
+ ``email.received`` per the 2026-05-02 DEBT #3 paydown.
+ """
+ attacker_ip = log_data.get("attacker_ip")
+ try:
+ attacker_uuid = (
+ await repo.get_attacker_uuid_by_ip(attacker_ip)
+ if attacker_ip else None
+ )
+ except Exception as exc: # noqa: BLE001
+ logger.debug("email_received: attacker resolve failed: %s", exc)
+ attacker_uuid = None
+ if not attacker_uuid:
+ # Without an attacker_uuid the TTP worker cannot anchor the
+ # tags to a row. Drop rather than emit an orphan event.
+ return
+
+ rcpt_count, rcpt_domains = _rcpt_projection(fields.get("rcpt_to"))
+ try:
+ attachment_manifest = json.loads(fields.get("attachments_json") or "[]")
+ except (TypeError, ValueError):
+ attachment_manifest = []
+ if not isinstance(attachment_manifest, list):
+ attachment_manifest = []
+ attachment_sha256s = [
+ entry.get("sha256") for entry in attachment_manifest
+ if isinstance(entry, dict) and isinstance(entry.get("sha256"), str)
+ and entry.get("sha256")
+ ]
+ try:
+ urls = json.loads(fields.get("urls_json") or "[]")
+ except (TypeError, ValueError):
+ urls = []
+ if not isinstance(urls, list):
+ urls = []
+
+ # The decky writes ``dkim_signed`` / ``spf_pass`` as 0/1 ints
+ # because syslog SD-values are strings; coerce back to the bool
+ # the EmailLifter predicates check with ``is True`` / ``is False``.
+ def _to_bool(raw: Any) -> bool:
+ if isinstance(raw, bool):
+ return raw
+ if isinstance(raw, (int, float)):
+ return raw != 0
+ if isinstance(raw, str):
+ return raw.strip() in {"1", "true", "True", "yes"}
+ return False
+
+ payload: dict[str, Any] = {
+ "source_id": fields.get("msg_id") or fields.get("stored_as"),
+ "attacker_uuid": attacker_uuid,
+ "attacker_ip": attacker_ip,
+ "decky_id": log_data.get("decky"),
+ "service": log_data.get("service"),
+ "subject": fields.get("subject"),
+ "from_domain": _domain_of(fields.get("from_hdr")),
+ "mail_from_domain": _domain_of(fields.get("mail_from")),
+ "return_path_domain": _domain_of(fields.get("return_path")),
+ "rcpt_count": rcpt_count,
+ "rcpt_domains": rcpt_domains,
+ "x_mailer": fields.get("x_mailer") or None,
+ "dkim_signed": _to_bool(fields.get("dkim_signed")),
+ "spf_pass": _to_bool(fields.get("spf_pass")),
+ "urls": [u for u in urls if isinstance(u, str)],
+ "attachment_count": fields.get("attachment_count"),
+ "attachment_sha256s": attachment_sha256s,
+ "attachment_extensions": _attachment_extensions(attachment_manifest),
+ "stored_as": fields.get("stored_as"),
+ "body_sha256": fields.get("sha256"),
+ }
+ try:
+ bus = get_bus(client_name="ingester-email")
+ await bus.connect()
+ await publish_safely(
+ bus,
+ _topics.email_topic(_topics.EMAIL_RECEIVED),
+ payload,
+ event_type=_topics.EMAIL_RECEIVED,
+ )
+ await bus.close()
+ except Exception as exc: # noqa: BLE001
+ logger.debug("email.received publish failed: %s", exc)
+
+
async def _publish_probe_pending(log_data: dict, fields: dict) -> None:
try:
bus = get_bus(client_name="ingester-probe")
diff --git a/tests/web/test_ingester.py b/tests/web/test_ingester.py
index f27eb840..d978a71e 100644
--- a/tests/web/test_ingester.py
+++ b/tests/web/test_ingester.py
@@ -154,6 +154,160 @@ class TestExtractBounty:
assert bounty["payload"]["subject"] == "URGENT: invoice"
assert bounty["payload"]["mail_from"] == "spammer@spammer.example"
+ @pytest.mark.asyncio
+ async def test_message_stored_publishes_email_received(self):
+ """SMTP message_stored persists the artifact AND publishes
+ ``email.received`` with the EmailLifter wire contract: domains,
+ rcpt_count + rcpt_domains, attachment shas + extensions, urls,
+ dkim/spf bools, x_mailer."""
+ from decnet.web import ingester as _ing
+ from decnet.web.ingester import _extract_bounty
+ mock_repo = MagicMock()
+ mock_repo.add_bounty = AsyncMock()
+ mock_repo.upsert_credential = AsyncMock()
+ mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value="att-7")
+
+ published: list = []
+
+ async def fake_publish(_bus, topic, payload, event_type=""):
+ published.append((topic, payload, event_type))
+
+ fake_bus = MagicMock()
+ fake_bus.connect = AsyncMock()
+ fake_bus.close = AsyncMock()
+
+ with patch.object(_ing, "get_bus", return_value=fake_bus), \
+ patch.object(_ing, "publish_safely", side_effect=fake_publish):
+ await _extract_bounty(mock_repo, {
+ "decky": "mail-decky",
+ "service": "smtp",
+ "attacker_ip": "203.0.113.7",
+ "event_type": "message_stored",
+ "fields": {
+ "msg_id": "ABCD1234",
+ "stored_as": "2026-04-28T12:00:00Z_abc_msg.eml",
+ "sha256": "cafebabe" * 8,
+ "size": "8192",
+ "subject": "URGENT: invoice",
+ "from_hdr": '"CEO" ',
+ "to_hdr": "victim@target.tld",
+ "mail_from": "",
+ "rcpt_to": (
+ "victim1@target.tld, victim2@target.tld, "
+ "victim3@other.tld"
+ ),
+ "return_path": "",
+ "x_mailer": "PHPMailer 6.0.7",
+ "dkim_signed": "1",
+ "spf_pass": "0",
+ "attachment_count": "2",
+ "attachments_json": (
+ '[{"filename":"payload.exe","sha256":"deadbeef",'
+ '"size":12,"content_type":"application/octet-stream"},'
+ '{"filename":"resume.docx","sha256":"feedface",'
+ '"size":34,"content_type":"application/msword"}]'
+ ),
+ "urls_json": (
+ '["https://xn--80ak6aa92e.example/login",'
+ '"http://kit.evil/payload.bin"]'
+ ),
+ "content_type": "multipart/mixed",
+ },
+ })
+
+ # Bounty still lands.
+ mock_repo.add_bounty.assert_awaited_once()
+ # And exactly one email.received publish.
+ email_publishes = [
+ p for p in published
+ if p[0].endswith("email.received")
+ ]
+ assert len(email_publishes) == 1
+ topic, payload, event_type = email_publishes[0]
+ assert event_type == "received"
+ assert topic == "email.received"
+ assert payload["attacker_uuid"] == "att-7"
+ assert payload["from_domain"] == "bigcorp.com"
+ assert payload["mail_from_domain"] == "evil.example"
+ assert payload["return_path_domain"] == "kit.evil"
+ assert payload["rcpt_count"] == 3
+ assert payload["rcpt_domains"] == ["target.tld", "other.tld"]
+ assert payload["x_mailer"] == "PHPMailer 6.0.7"
+ assert payload["dkim_signed"] is True
+ assert payload["spf_pass"] is False
+ assert payload["urls"] == [
+ "https://xn--80ak6aa92e.example/login",
+ "http://kit.evil/payload.bin",
+ ]
+ assert payload["attachment_sha256s"] == ["deadbeef", "feedface"]
+ assert payload["attachment_extensions"] == [".exe", ".docx"]
+ assert payload["source_id"] == "ABCD1234"
+
+ @pytest.mark.asyncio
+ async def test_message_stored_skips_publish_when_attacker_unresolved(self):
+ """If get_attacker_uuid_by_ip returns None, no orphan
+ email.received event lands."""
+ from decnet.web import ingester as _ing
+ from decnet.web.ingester import _extract_bounty
+ mock_repo = MagicMock()
+ mock_repo.add_bounty = AsyncMock()
+ mock_repo.upsert_credential = AsyncMock()
+ mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value=None)
+
+ with patch.object(_ing, "get_bus") as p_bus, \
+ patch.object(_ing, "publish_safely", new=AsyncMock()) as p_pub:
+ await _extract_bounty(mock_repo, {
+ "decky": "d",
+ "service": "smtp",
+ "attacker_ip": "10.0.0.1",
+ "event_type": "message_stored",
+ "fields": {
+ "stored_as": "x.eml",
+ "sha256": "h",
+ "size": "1",
+ "subject": "s",
+ "from_hdr": "a@b.c",
+ "to_hdr": "v@t.t",
+ "mail_from": "a@b.c",
+ "rcpt_to": "v@t.t",
+ "attachment_count": "0",
+ "content_type": "text/plain",
+ },
+ })
+ mock_repo.add_bounty.assert_awaited_once()
+ p_bus.assert_not_called()
+ p_pub.assert_not_called()
+
+ def test_domain_of_handles_common_shapes(self):
+ from decnet.web.ingester import _domain_of
+ assert _domain_of('"CEO" ') == "bigcorp.com"
+ assert _domain_of("ceo@bigcorp.com") == "bigcorp.com"
+ assert _domain_of("") == "b.com"
+ assert _domain_of("BIGCORP@EXAMPLE.COM") == "example.com"
+ assert _domain_of("") is None
+ assert _domain_of(None) is None
+ assert _domain_of("no-at-sign-here") is None
+
+ def test_attachment_extensions_unique_first_seen(self):
+ from decnet.web.ingester import _attachment_extensions
+ manifest = [
+ {"filename": "a.EXE"},
+ {"filename": "b.exe"}, # dedup'd against ".EXE"->".exe"
+ {"filename": "noext"},
+ {"filename": "report.pdf"},
+ {"filename": "trailing."}, # dotless tail → skip
+ ]
+ assert _attachment_extensions(manifest) == [".exe", ".pdf"]
+
+ def test_rcpt_projection_dedups_domains(self):
+ from decnet.web.ingester import _rcpt_projection
+ count, domains = _rcpt_projection(
+ "a@x.com, b@x.com, c@y.com d@y.com",
+ )
+ # Whitespace-and-comma split gives 4 raw rcpts; domain set is 2.
+ assert count == 4
+ assert domains == ["x.com", "y.com"]
+
@pytest.mark.asyncio
async def test_no_secret_b64_no_credential(self):
"""The native branch keys off `secret_b64`. Fields lacking it