feat(bus): publish email.received from ingester after SMTP artifact persist

Wires the EmailLifter (R0041–R0048) producer that DEBT.md item #3
deferred. After the existing add_bounty() call in _extract_bounty
(line 615), call _publish_email_received() which:

* resolves the attacker_uuid via repo.get_attacker_uuid_by_ip; drops
  the publish if unresolved (the TTP worker can't anchor orphan
  events)
* projects the message_stored fields onto the EmailLifter wire
  contract: from_domain / mail_from_domain / return_path_domain
  parsed via _domain_of, rcpt_count + rcpt_domains via
  _rcpt_projection, attachment_sha256s + attachment_extensions
  derived from the existing attachments_json manifest, urls from
  urls_json, dkim_signed/spf_pass coerced from 0/1 ints to bool
* mirrors _publish_probe_pending's bus-per-call pattern and
  swallows all exceptions (the bus is the notification layer, not
  the source of truth)

Fires for both relay and non-relay SMTP services. R0041 / R0043 /
R0044 / R0045 are now live end-to-end; R0046 partial (extension
lane). Heavyweight predicates (R0042 simhash, R0046-deep, R0047 /
R0048 body_text) stay deferred per the EmailLifter heavyweight
DEBT entry.
This commit is contained in:
2026-05-02 18:39:13 -04:00
parent e9324acac7
commit fb85762703
2 changed files with 308 additions and 0 deletions

View File

@@ -613,6 +613,13 @@ async def _extract_bounty(
"content_type": _fields.get("content_type"),
},
})
# Fan the captured email out to the TTP worker — same hook
# ``attacker.intel.enriched`` uses, but for the EmailLifter
# (R0041R0048). Always fires for both relay and non-relay
# services; the TTP path doesn't care which mode the decky was
# in. Best-effort — bus-down or unresolved attacker UUID
# downgrades to a no-op silently.
await _publish_email_received(repo, log_data, _fields)
# Signal the realism worker to forward this as a probe if it's the
# first message from this IP on an smtp_relay decky. The worker has
# real internet access (the container is on MACVLAN and doesn't).
@@ -620,6 +627,153 @@ async def _extract_bounty(
await _publish_probe_pending(log_data, _fields)
_RCPT_SPLIT_RE = re.compile(r"[,\s]+")
_ADDR_AT_RE = re.compile(r"@([A-Za-z0-9.\-]+)")
def _domain_of(addr_or_header: str | None) -> str | None:
"""Extract the trailing domain from an address-like string.
Tolerant of full RFC 5322 headers (``"Name" <a@b.com>``), bare
addresses (``a@b.com``), and angle-bracketed envelope values
(``<a@b.com>``). Returns the domain lowercased, or ``None`` when
no ``@``-delimited domain is present. The EmailLifter expects
just the domain — display names and local-parts ride in the
artifact, not on the bus.
"""
if not addr_or_header:
return None
match = _ADDR_AT_RE.search(addr_or_header)
if not match:
return None
domain = match.group(1).strip(".").lower()
return domain or None
def _rcpt_projection(rcpt_to_field: str | None) -> tuple[int, list[str]]:
"""Split the comma-or-whitespace-separated RCPT TO blob into
``(count, unique_domains_first_seen_order)``."""
if not rcpt_to_field:
return 0, []
parts = [p for p in _RCPT_SPLIT_RE.split(rcpt_to_field) if p]
domains: dict[str, None] = {}
for part in parts:
domain = _domain_of(part)
if domain:
domains.setdefault(domain, None)
return len(parts), list(domains.keys())
def _attachment_extensions(manifest: list[dict]) -> list[str]:
"""Return unique lowercased file extensions (with dot) from the
attachment manifest, preserving first-seen order. ``"payload.exe"``
→ ``".exe"``; missing / dotless filenames are skipped."""
seen: dict[str, None] = {}
for entry in manifest:
if not isinstance(entry, dict):
continue
filename = entry.get("filename") or ""
if not isinstance(filename, str):
continue
idx = filename.rfind(".")
if idx < 0 or idx == len(filename) - 1:
continue
ext = filename[idx:].lower()
seen.setdefault(ext, None)
return list(seen.keys())
async def _publish_email_received(
repo: BaseRepository, log_data: dict, fields: dict,
) -> None:
"""Project the SMTP capture event onto the EmailLifter wire contract.
Mirrors :func:`_publish_probe_pending`: a fresh bus connection per
publish, fire-and-forget, all exceptions swallowed (the bus is the
notification layer, not the source of truth). Producer for
``email.received`` per the 2026-05-02 DEBT #3 paydown.
"""
attacker_ip = log_data.get("attacker_ip")
try:
attacker_uuid = (
await repo.get_attacker_uuid_by_ip(attacker_ip)
if attacker_ip else None
)
except Exception as exc: # noqa: BLE001
logger.debug("email_received: attacker resolve failed: %s", exc)
attacker_uuid = None
if not attacker_uuid:
# Without an attacker_uuid the TTP worker cannot anchor the
# tags to a row. Drop rather than emit an orphan event.
return
rcpt_count, rcpt_domains = _rcpt_projection(fields.get("rcpt_to"))
try:
attachment_manifest = json.loads(fields.get("attachments_json") or "[]")
except (TypeError, ValueError):
attachment_manifest = []
if not isinstance(attachment_manifest, list):
attachment_manifest = []
attachment_sha256s = [
entry.get("sha256") for entry in attachment_manifest
if isinstance(entry, dict) and isinstance(entry.get("sha256"), str)
and entry.get("sha256")
]
try:
urls = json.loads(fields.get("urls_json") or "[]")
except (TypeError, ValueError):
urls = []
if not isinstance(urls, list):
urls = []
# The decky writes ``dkim_signed`` / ``spf_pass`` as 0/1 ints
# because syslog SD-values are strings; coerce back to the bool
# the EmailLifter predicates check with ``is True`` / ``is False``.
def _to_bool(raw: Any) -> bool:
if isinstance(raw, bool):
return raw
if isinstance(raw, (int, float)):
return raw != 0
if isinstance(raw, str):
return raw.strip() in {"1", "true", "True", "yes"}
return False
payload: dict[str, Any] = {
"source_id": fields.get("msg_id") or fields.get("stored_as"),
"attacker_uuid": attacker_uuid,
"attacker_ip": attacker_ip,
"decky_id": log_data.get("decky"),
"service": log_data.get("service"),
"subject": fields.get("subject"),
"from_domain": _domain_of(fields.get("from_hdr")),
"mail_from_domain": _domain_of(fields.get("mail_from")),
"return_path_domain": _domain_of(fields.get("return_path")),
"rcpt_count": rcpt_count,
"rcpt_domains": rcpt_domains,
"x_mailer": fields.get("x_mailer") or None,
"dkim_signed": _to_bool(fields.get("dkim_signed")),
"spf_pass": _to_bool(fields.get("spf_pass")),
"urls": [u for u in urls if isinstance(u, str)],
"attachment_count": fields.get("attachment_count"),
"attachment_sha256s": attachment_sha256s,
"attachment_extensions": _attachment_extensions(attachment_manifest),
"stored_as": fields.get("stored_as"),
"body_sha256": fields.get("sha256"),
}
try:
bus = get_bus(client_name="ingester-email")
await bus.connect()
await publish_safely(
bus,
_topics.email_topic(_topics.EMAIL_RECEIVED),
payload,
event_type=_topics.EMAIL_RECEIVED,
)
await bus.close()
except Exception as exc: # noqa: BLE001
logger.debug("email.received publish failed: %s", exc)
async def _publish_probe_pending(log_data: dict, fields: dict) -> None:
try:
bus = get_bus(client_name="ingester-probe")

View File

@@ -154,6 +154,160 @@ class TestExtractBounty:
assert bounty["payload"]["subject"] == "URGENT: invoice"
assert bounty["payload"]["mail_from"] == "spammer@spammer.example"
@pytest.mark.asyncio
async def test_message_stored_publishes_email_received(self):
"""SMTP message_stored persists the artifact AND publishes
``email.received`` with the EmailLifter wire contract: domains,
rcpt_count + rcpt_domains, attachment shas + extensions, urls,
dkim/spf bools, x_mailer."""
from decnet.web import ingester as _ing
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.upsert_credential = AsyncMock()
mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value="att-7")
published: list = []
async def fake_publish(_bus, topic, payload, event_type=""):
published.append((topic, payload, event_type))
fake_bus = MagicMock()
fake_bus.connect = AsyncMock()
fake_bus.close = AsyncMock()
with patch.object(_ing, "get_bus", return_value=fake_bus), \
patch.object(_ing, "publish_safely", side_effect=fake_publish):
await _extract_bounty(mock_repo, {
"decky": "mail-decky",
"service": "smtp",
"attacker_ip": "203.0.113.7",
"event_type": "message_stored",
"fields": {
"msg_id": "ABCD1234",
"stored_as": "2026-04-28T12:00:00Z_abc_msg.eml",
"sha256": "cafebabe" * 8,
"size": "8192",
"subject": "URGENT: invoice",
"from_hdr": '"CEO" <ceo@bigcorp.com>',
"to_hdr": "victim@target.tld",
"mail_from": "<spammer@evil.example>",
"rcpt_to": (
"victim1@target.tld, victim2@target.tld, "
"victim3@other.tld"
),
"return_path": "<bounce@kit.evil>",
"x_mailer": "PHPMailer 6.0.7",
"dkim_signed": "1",
"spf_pass": "0",
"attachment_count": "2",
"attachments_json": (
'[{"filename":"payload.exe","sha256":"deadbeef",'
'"size":12,"content_type":"application/octet-stream"},'
'{"filename":"resume.docx","sha256":"feedface",'
'"size":34,"content_type":"application/msword"}]'
),
"urls_json": (
'["https://xn--80ak6aa92e.example/login",'
'"http://kit.evil/payload.bin"]'
),
"content_type": "multipart/mixed",
},
})
# Bounty still lands.
mock_repo.add_bounty.assert_awaited_once()
# And exactly one email.received publish.
email_publishes = [
p for p in published
if p[0].endswith("email.received")
]
assert len(email_publishes) == 1
topic, payload, event_type = email_publishes[0]
assert event_type == "received"
assert topic == "email.received"
assert payload["attacker_uuid"] == "att-7"
assert payload["from_domain"] == "bigcorp.com"
assert payload["mail_from_domain"] == "evil.example"
assert payload["return_path_domain"] == "kit.evil"
assert payload["rcpt_count"] == 3
assert payload["rcpt_domains"] == ["target.tld", "other.tld"]
assert payload["x_mailer"] == "PHPMailer 6.0.7"
assert payload["dkim_signed"] is True
assert payload["spf_pass"] is False
assert payload["urls"] == [
"https://xn--80ak6aa92e.example/login",
"http://kit.evil/payload.bin",
]
assert payload["attachment_sha256s"] == ["deadbeef", "feedface"]
assert payload["attachment_extensions"] == [".exe", ".docx"]
assert payload["source_id"] == "ABCD1234"
@pytest.mark.asyncio
async def test_message_stored_skips_publish_when_attacker_unresolved(self):
"""If get_attacker_uuid_by_ip returns None, no orphan
email.received event lands."""
from decnet.web import ingester as _ing
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.upsert_credential = AsyncMock()
mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value=None)
with patch.object(_ing, "get_bus") as p_bus, \
patch.object(_ing, "publish_safely", new=AsyncMock()) as p_pub:
await _extract_bounty(mock_repo, {
"decky": "d",
"service": "smtp",
"attacker_ip": "10.0.0.1",
"event_type": "message_stored",
"fields": {
"stored_as": "x.eml",
"sha256": "h",
"size": "1",
"subject": "s",
"from_hdr": "a@b.c",
"to_hdr": "v@t.t",
"mail_from": "a@b.c",
"rcpt_to": "v@t.t",
"attachment_count": "0",
"content_type": "text/plain",
},
})
mock_repo.add_bounty.assert_awaited_once()
p_bus.assert_not_called()
p_pub.assert_not_called()
def test_domain_of_handles_common_shapes(self):
from decnet.web.ingester import _domain_of
assert _domain_of('"CEO" <ceo@bigcorp.com>') == "bigcorp.com"
assert _domain_of("ceo@bigcorp.com") == "bigcorp.com"
assert _domain_of("<a@b.com>") == "b.com"
assert _domain_of("BIGCORP@EXAMPLE.COM") == "example.com"
assert _domain_of("") is None
assert _domain_of(None) is None
assert _domain_of("no-at-sign-here") is None
def test_attachment_extensions_unique_first_seen(self):
from decnet.web.ingester import _attachment_extensions
manifest = [
{"filename": "a.EXE"},
{"filename": "b.exe"}, # dedup'd against ".EXE"->".exe"
{"filename": "noext"},
{"filename": "report.pdf"},
{"filename": "trailing."}, # dotless tail → skip
]
assert _attachment_extensions(manifest) == [".exe", ".pdf"]
def test_rcpt_projection_dedups_domains(self):
from decnet.web.ingester import _rcpt_projection
count, domains = _rcpt_projection(
"a@x.com, b@x.com, c@y.com d@y.com",
)
# Whitespace-and-comma split gives 4 raw rcpts; domain set is 2.
assert count == 4
assert domains == ["x.com", "y.com"]
@pytest.mark.asyncio
async def test_no_secret_b64_no_credential(self):
"""The native branch keys off `secret_b64`. Fields lacking it