From fb85762703602865b1f30f626586140b850e82f6 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 2 May 2026 18:39:13 -0400 Subject: [PATCH] feat(bus): publish email.received from ingester after SMTP artifact persist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the EmailLifter (R0041–R0048) producer that DEBT.md item #3 deferred. After the existing add_bounty() call in _extract_bounty (line 615), call _publish_email_received() which: * resolves the attacker_uuid via repo.get_attacker_uuid_by_ip; drops the publish if unresolved (the TTP worker can't anchor orphan events) * projects the message_stored fields onto the EmailLifter wire contract: from_domain / mail_from_domain / return_path_domain parsed via _domain_of, rcpt_count + rcpt_domains via _rcpt_projection, attachment_sha256s + attachment_extensions derived from the existing attachments_json manifest, urls from urls_json, dkim_signed/spf_pass coerced from 0/1 ints to bool * mirrors _publish_probe_pending's bus-per-call pattern and swallows all exceptions (the bus is the notification layer, not the source of truth) Fires for both relay and non-relay SMTP services. R0041 / R0043 / R0044 / R0045 are now live end-to-end; R0046 partial (extension lane). Heavyweight predicates (R0042 simhash, R0046-deep, R0047 / R0048 body_text) stay deferred per the EmailLifter heavyweight DEBT entry. --- decnet/web/ingester.py | 154 +++++++++++++++++++++++++++++++++++++ tests/web/test_ingester.py | 154 +++++++++++++++++++++++++++++++++++++ 2 files changed, 308 insertions(+) diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index bbf62482..35eecf6e 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -613,6 +613,13 @@ async def _extract_bounty( "content_type": _fields.get("content_type"), }, }) + # Fan the captured email out to the TTP worker — same hook + # ``attacker.intel.enriched`` uses, but for the EmailLifter + # (R0041–R0048). Always fires for both relay and non-relay + # services; the TTP path doesn't care which mode the decky was + # in. Best-effort — bus-down or unresolved attacker UUID + # downgrades to a no-op silently. + await _publish_email_received(repo, log_data, _fields) # Signal the realism worker to forward this as a probe if it's the # first message from this IP on an smtp_relay decky. The worker has # real internet access (the container is on MACVLAN and doesn't). @@ -620,6 +627,153 @@ async def _extract_bounty( await _publish_probe_pending(log_data, _fields) +_RCPT_SPLIT_RE = re.compile(r"[,\s]+") +_ADDR_AT_RE = re.compile(r"@([A-Za-z0-9.\-]+)") + + +def _domain_of(addr_or_header: str | None) -> str | None: + """Extract the trailing domain from an address-like string. + + Tolerant of full RFC 5322 headers (``"Name" ``), bare + addresses (``a@b.com``), and angle-bracketed envelope values + (````). Returns the domain lowercased, or ``None`` when + no ``@``-delimited domain is present. The EmailLifter expects + just the domain — display names and local-parts ride in the + artifact, not on the bus. + """ + if not addr_or_header: + return None + match = _ADDR_AT_RE.search(addr_or_header) + if not match: + return None + domain = match.group(1).strip(".").lower() + return domain or None + + +def _rcpt_projection(rcpt_to_field: str | None) -> tuple[int, list[str]]: + """Split the comma-or-whitespace-separated RCPT TO blob into + ``(count, unique_domains_first_seen_order)``.""" + if not rcpt_to_field: + return 0, [] + parts = [p for p in _RCPT_SPLIT_RE.split(rcpt_to_field) if p] + domains: dict[str, None] = {} + for part in parts: + domain = _domain_of(part) + if domain: + domains.setdefault(domain, None) + return len(parts), list(domains.keys()) + + +def _attachment_extensions(manifest: list[dict]) -> list[str]: + """Return unique lowercased file extensions (with dot) from the + attachment manifest, preserving first-seen order. ``"payload.exe"`` + → ``".exe"``; missing / dotless filenames are skipped.""" + seen: dict[str, None] = {} + for entry in manifest: + if not isinstance(entry, dict): + continue + filename = entry.get("filename") or "" + if not isinstance(filename, str): + continue + idx = filename.rfind(".") + if idx < 0 or idx == len(filename) - 1: + continue + ext = filename[idx:].lower() + seen.setdefault(ext, None) + return list(seen.keys()) + + +async def _publish_email_received( + repo: BaseRepository, log_data: dict, fields: dict, +) -> None: + """Project the SMTP capture event onto the EmailLifter wire contract. + + Mirrors :func:`_publish_probe_pending`: a fresh bus connection per + publish, fire-and-forget, all exceptions swallowed (the bus is the + notification layer, not the source of truth). Producer for + ``email.received`` per the 2026-05-02 DEBT #3 paydown. + """ + attacker_ip = log_data.get("attacker_ip") + try: + attacker_uuid = ( + await repo.get_attacker_uuid_by_ip(attacker_ip) + if attacker_ip else None + ) + except Exception as exc: # noqa: BLE001 + logger.debug("email_received: attacker resolve failed: %s", exc) + attacker_uuid = None + if not attacker_uuid: + # Without an attacker_uuid the TTP worker cannot anchor the + # tags to a row. Drop rather than emit an orphan event. + return + + rcpt_count, rcpt_domains = _rcpt_projection(fields.get("rcpt_to")) + try: + attachment_manifest = json.loads(fields.get("attachments_json") or "[]") + except (TypeError, ValueError): + attachment_manifest = [] + if not isinstance(attachment_manifest, list): + attachment_manifest = [] + attachment_sha256s = [ + entry.get("sha256") for entry in attachment_manifest + if isinstance(entry, dict) and isinstance(entry.get("sha256"), str) + and entry.get("sha256") + ] + try: + urls = json.loads(fields.get("urls_json") or "[]") + except (TypeError, ValueError): + urls = [] + if not isinstance(urls, list): + urls = [] + + # The decky writes ``dkim_signed`` / ``spf_pass`` as 0/1 ints + # because syslog SD-values are strings; coerce back to the bool + # the EmailLifter predicates check with ``is True`` / ``is False``. + def _to_bool(raw: Any) -> bool: + if isinstance(raw, bool): + return raw + if isinstance(raw, (int, float)): + return raw != 0 + if isinstance(raw, str): + return raw.strip() in {"1", "true", "True", "yes"} + return False + + payload: dict[str, Any] = { + "source_id": fields.get("msg_id") or fields.get("stored_as"), + "attacker_uuid": attacker_uuid, + "attacker_ip": attacker_ip, + "decky_id": log_data.get("decky"), + "service": log_data.get("service"), + "subject": fields.get("subject"), + "from_domain": _domain_of(fields.get("from_hdr")), + "mail_from_domain": _domain_of(fields.get("mail_from")), + "return_path_domain": _domain_of(fields.get("return_path")), + "rcpt_count": rcpt_count, + "rcpt_domains": rcpt_domains, + "x_mailer": fields.get("x_mailer") or None, + "dkim_signed": _to_bool(fields.get("dkim_signed")), + "spf_pass": _to_bool(fields.get("spf_pass")), + "urls": [u for u in urls if isinstance(u, str)], + "attachment_count": fields.get("attachment_count"), + "attachment_sha256s": attachment_sha256s, + "attachment_extensions": _attachment_extensions(attachment_manifest), + "stored_as": fields.get("stored_as"), + "body_sha256": fields.get("sha256"), + } + try: + bus = get_bus(client_name="ingester-email") + await bus.connect() + await publish_safely( + bus, + _topics.email_topic(_topics.EMAIL_RECEIVED), + payload, + event_type=_topics.EMAIL_RECEIVED, + ) + await bus.close() + except Exception as exc: # noqa: BLE001 + logger.debug("email.received publish failed: %s", exc) + + async def _publish_probe_pending(log_data: dict, fields: dict) -> None: try: bus = get_bus(client_name="ingester-probe") diff --git a/tests/web/test_ingester.py b/tests/web/test_ingester.py index f27eb840..d978a71e 100644 --- a/tests/web/test_ingester.py +++ b/tests/web/test_ingester.py @@ -154,6 +154,160 @@ class TestExtractBounty: assert bounty["payload"]["subject"] == "URGENT: invoice" assert bounty["payload"]["mail_from"] == "spammer@spammer.example" + @pytest.mark.asyncio + async def test_message_stored_publishes_email_received(self): + """SMTP message_stored persists the artifact AND publishes + ``email.received`` with the EmailLifter wire contract: domains, + rcpt_count + rcpt_domains, attachment shas + extensions, urls, + dkim/spf bools, x_mailer.""" + from decnet.web import ingester as _ing + from decnet.web.ingester import _extract_bounty + mock_repo = MagicMock() + mock_repo.add_bounty = AsyncMock() + mock_repo.upsert_credential = AsyncMock() + mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value="att-7") + + published: list = [] + + async def fake_publish(_bus, topic, payload, event_type=""): + published.append((topic, payload, event_type)) + + fake_bus = MagicMock() + fake_bus.connect = AsyncMock() + fake_bus.close = AsyncMock() + + with patch.object(_ing, "get_bus", return_value=fake_bus), \ + patch.object(_ing, "publish_safely", side_effect=fake_publish): + await _extract_bounty(mock_repo, { + "decky": "mail-decky", + "service": "smtp", + "attacker_ip": "203.0.113.7", + "event_type": "message_stored", + "fields": { + "msg_id": "ABCD1234", + "stored_as": "2026-04-28T12:00:00Z_abc_msg.eml", + "sha256": "cafebabe" * 8, + "size": "8192", + "subject": "URGENT: invoice", + "from_hdr": '"CEO" ', + "to_hdr": "victim@target.tld", + "mail_from": "", + "rcpt_to": ( + "victim1@target.tld, victim2@target.tld, " + "victim3@other.tld" + ), + "return_path": "", + "x_mailer": "PHPMailer 6.0.7", + "dkim_signed": "1", + "spf_pass": "0", + "attachment_count": "2", + "attachments_json": ( + '[{"filename":"payload.exe","sha256":"deadbeef",' + '"size":12,"content_type":"application/octet-stream"},' + '{"filename":"resume.docx","sha256":"feedface",' + '"size":34,"content_type":"application/msword"}]' + ), + "urls_json": ( + '["https://xn--80ak6aa92e.example/login",' + '"http://kit.evil/payload.bin"]' + ), + "content_type": "multipart/mixed", + }, + }) + + # Bounty still lands. + mock_repo.add_bounty.assert_awaited_once() + # And exactly one email.received publish. + email_publishes = [ + p for p in published + if p[0].endswith("email.received") + ] + assert len(email_publishes) == 1 + topic, payload, event_type = email_publishes[0] + assert event_type == "received" + assert topic == "email.received" + assert payload["attacker_uuid"] == "att-7" + assert payload["from_domain"] == "bigcorp.com" + assert payload["mail_from_domain"] == "evil.example" + assert payload["return_path_domain"] == "kit.evil" + assert payload["rcpt_count"] == 3 + assert payload["rcpt_domains"] == ["target.tld", "other.tld"] + assert payload["x_mailer"] == "PHPMailer 6.0.7" + assert payload["dkim_signed"] is True + assert payload["spf_pass"] is False + assert payload["urls"] == [ + "https://xn--80ak6aa92e.example/login", + "http://kit.evil/payload.bin", + ] + assert payload["attachment_sha256s"] == ["deadbeef", "feedface"] + assert payload["attachment_extensions"] == [".exe", ".docx"] + assert payload["source_id"] == "ABCD1234" + + @pytest.mark.asyncio + async def test_message_stored_skips_publish_when_attacker_unresolved(self): + """If get_attacker_uuid_by_ip returns None, no orphan + email.received event lands.""" + from decnet.web import ingester as _ing + from decnet.web.ingester import _extract_bounty + mock_repo = MagicMock() + mock_repo.add_bounty = AsyncMock() + mock_repo.upsert_credential = AsyncMock() + mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value=None) + + with patch.object(_ing, "get_bus") as p_bus, \ + patch.object(_ing, "publish_safely", new=AsyncMock()) as p_pub: + await _extract_bounty(mock_repo, { + "decky": "d", + "service": "smtp", + "attacker_ip": "10.0.0.1", + "event_type": "message_stored", + "fields": { + "stored_as": "x.eml", + "sha256": "h", + "size": "1", + "subject": "s", + "from_hdr": "a@b.c", + "to_hdr": "v@t.t", + "mail_from": "a@b.c", + "rcpt_to": "v@t.t", + "attachment_count": "0", + "content_type": "text/plain", + }, + }) + mock_repo.add_bounty.assert_awaited_once() + p_bus.assert_not_called() + p_pub.assert_not_called() + + def test_domain_of_handles_common_shapes(self): + from decnet.web.ingester import _domain_of + assert _domain_of('"CEO" ') == "bigcorp.com" + assert _domain_of("ceo@bigcorp.com") == "bigcorp.com" + assert _domain_of("") == "b.com" + assert _domain_of("BIGCORP@EXAMPLE.COM") == "example.com" + assert _domain_of("") is None + assert _domain_of(None) is None + assert _domain_of("no-at-sign-here") is None + + def test_attachment_extensions_unique_first_seen(self): + from decnet.web.ingester import _attachment_extensions + manifest = [ + {"filename": "a.EXE"}, + {"filename": "b.exe"}, # dedup'd against ".EXE"->".exe" + {"filename": "noext"}, + {"filename": "report.pdf"}, + {"filename": "trailing."}, # dotless tail → skip + ] + assert _attachment_extensions(manifest) == [".exe", ".pdf"] + + def test_rcpt_projection_dedups_domains(self): + from decnet.web.ingester import _rcpt_projection + count, domains = _rcpt_projection( + "a@x.com, b@x.com, c@y.com d@y.com", + ) + # Whitespace-and-comma split gives 4 raw rcpts; domain set is 2. + assert count == 4 + assert domains == ["x.com", "y.com"] + @pytest.mark.asyncio async def test_no_secret_b64_no_credential(self): """The native branch keys off `secret_b64`. Fields lacking it