feat(bus): project EmailLifter heavyweight fields onto email.received

The decky's Layer-2 extension (commit 291b78c1) emits body_simhash /
body_base64_bytes / html_smuggling on the message_stored log and adds
macro_indicator / encrypted booleans to each attachments_json
manifest entry. Lift them all onto the email.received bus payload:

* body_simhash — passes through as-is (16 hex chars or "")
* body_base64_bytes — coerced to int (0 on absent / malformed)
* attachment_macros / attachment_password_protected — OR-reduced
  across the per-attachment manifest booleans; matches R0046's
  matched_trigger semantics where a single positive lane fires the
  rule
* html_smuggling — coerced bool from the decky's 0/1 int

Pre-Layer-2 message_stored events (older deckies, malformed log
rows) project to safe defaults: empty simhash, zero base64-bytes,
all booleans False — the EmailLifter then stays silent, never
fires a false positive on missing data.

R0042 (mass-phish) / R0046 macro / R0046 password / R0046 smuggling
/ R0048 (encoded payload) all fire end-to-end after this commit.
R0046 mal_hash_match and R0047 BEC remain deferred per their
respective DEBT entries (filed in the next commit).
This commit is contained in:
2026-05-02 19:10:30 -04:00
parent 291b78c1d0
commit c714941069
2 changed files with 157 additions and 0 deletions

View File

@@ -738,6 +738,29 @@ async def _publish_email_received(
return raw.strip() in {"1", "true", "True", "yes"}
return False
# Reduce per-attachment booleans (added by the decky's
# _summarize_message Layer-2 extension) to top-level rule fields.
# OR across all attachments — R0046 fires on a single positive.
attachment_macros = any(
bool(entry.get("macro_indicator"))
for entry in attachment_manifest
if isinstance(entry, dict)
)
attachment_password_protected = any(
bool(entry.get("encrypted"))
for entry in attachment_manifest
if isinstance(entry, dict)
)
body_base64_bytes_raw = fields.get("body_base64_bytes")
try:
body_base64_bytes = (
int(body_base64_bytes_raw)
if body_base64_bytes_raw is not None else 0
)
except (TypeError, ValueError):
body_base64_bytes = 0
payload: dict[str, Any] = {
"source_id": fields.get("msg_id") or fields.get("stored_as"),
"attacker_uuid": attacker_uuid,
@@ -757,6 +780,18 @@ async def _publish_email_received(
"attachment_count": fields.get("attachment_count"),
"attachment_sha256s": attachment_sha256s,
"attachment_extensions": _attachment_extensions(attachment_manifest),
# Heavyweight Layer-2 fields consumed by R0042 / R0046 / R0048.
# body_simhash is a 16-hex-char string per the decky's
# _body_simhash; ``""`` when no body text was extractable —
# the lifter's _p_mass_phish predicate rejects non-strings, so
# an empty string is the right "no signal" value (predicate
# accepts str|int and treats "" as falsy via the rcpt threshold
# gate fallback).
"body_simhash": fields.get("body_simhash") or "",
"body_base64_bytes": body_base64_bytes,
"attachment_macros": attachment_macros,
"attachment_password_protected": attachment_password_protected,
"html_smuggling": _to_bool(fields.get("html_smuggling")),
"stored_as": fields.get("stored_as"),
"body_sha256": fields.get("sha256"),
}

View File

@@ -243,6 +243,128 @@ class TestExtractBounty:
assert payload["attachment_extensions"] == [".exe", ".docx"]
assert payload["source_id"] == "ABCD1234"
@pytest.mark.asyncio
async def test_message_stored_projects_heavyweight_fields(self):
"""Layer-2 heavyweight fields land on the bus payload:
body_simhash + body_base64_bytes (R0042 / R0048),
attachment_macros + attachment_password_protected
(R0046 macro / password lanes), html_smuggling (R0046 smuggling
lane). Per-attachment manifest booleans reduce to top-level
flags via OR."""
from decnet.web import ingester as _ing
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.upsert_credential = AsyncMock()
mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value="att-9")
published: list = []
async def fake_publish(_bus, topic, payload, event_type=""):
published.append((topic, payload, event_type))
fake_bus = MagicMock()
fake_bus.connect = AsyncMock()
fake_bus.close = AsyncMock()
with patch.object(_ing, "get_bus", return_value=fake_bus), \
patch.object(_ing, "publish_safely", side_effect=fake_publish):
await _extract_bounty(mock_repo, {
"decky": "mail-decky",
"service": "smtp",
"attacker_ip": "203.0.113.7",
"event_type": "message_stored",
"fields": {
"msg_id": "ABCD9999",
"stored_as": "2026-04-28T12:00:00Z_def_msg.eml",
"sha256": "babecafe" * 8,
"size": "12345",
"subject": "invoice",
"from_hdr": "ceo@bigcorp.com",
"to_hdr": "victim@target.tld",
"mail_from": "<spammer@evil.example>",
"rcpt_to": "victim@target.tld",
"attachment_count": "2",
"attachments_json": (
'[{"filename":"r.docm","sha256":"a","size":1,'
'"content_type":"application/vnd.ms-word.document.macroEnabled.12",'
'"macro_indicator":true,"encrypted":false},'
'{"filename":"s.zip","sha256":"b","size":2,'
'"content_type":"application/zip",'
'"macro_indicator":false,"encrypted":true}]'
),
"urls_json": "[]",
"body_simhash": "deadbeefcafebabe",
"body_base64_bytes": 8192,
"html_smuggling": "1",
"content_type": "multipart/mixed",
},
})
email_publishes = [
p for p in published if p[0].endswith("email.received")
]
assert len(email_publishes) == 1
_topic, payload, _event_type = email_publishes[0]
assert payload["body_simhash"] == "deadbeefcafebabe"
assert payload["body_base64_bytes"] == 8192
assert payload["attachment_macros"] is True
assert payload["attachment_password_protected"] is True
assert payload["html_smuggling"] is True
@pytest.mark.asyncio
async def test_message_stored_heavyweight_fields_safe_when_absent(self):
"""A pre-Layer-2 message_stored event (no simhash, no
per-attachment booleans, no html_smuggling) projects to safe
defaults: empty simhash, zero base64-bytes, all bools False."""
from decnet.web import ingester as _ing
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.upsert_credential = AsyncMock()
mock_repo.get_attacker_uuid_by_ip = AsyncMock(return_value="att-10")
published: list = []
async def fake_publish(_bus, topic, payload, event_type=""):
published.append((topic, payload, event_type))
fake_bus = MagicMock()
fake_bus.connect = AsyncMock()
fake_bus.close = AsyncMock()
with patch.object(_ing, "get_bus", return_value=fake_bus), \
patch.object(_ing, "publish_safely", side_effect=fake_publish):
await _extract_bounty(mock_repo, {
"decky": "old-decky",
"service": "smtp",
"attacker_ip": "10.0.0.99",
"event_type": "message_stored",
"fields": {
"stored_as": "x.eml",
"sha256": "h",
"size": "1",
"subject": "s",
"from_hdr": "a@b.c",
"to_hdr": "v@t.t",
"mail_from": "a@b.c",
"rcpt_to": "v@t.t",
"attachment_count": "0",
"content_type": "text/plain",
# No body_simhash / body_base64_bytes /
# html_smuggling / per-attachment manifest booleans.
},
})
_topic, payload, _ = next(
p for p in published if p[0].endswith("email.received")
)
assert payload["body_simhash"] == ""
assert payload["body_base64_bytes"] == 0
assert payload["attachment_macros"] is False
assert payload["attachment_password_protected"] is False
assert payload["html_smuggling"] is False
@pytest.mark.asyncio
async def test_message_stored_skips_publish_when_attacker_unresolved(self):
"""If get_attacker_uuid_by_ip returns None, no orphan