feat(smtp): capture full messages + attachments to disk

SMTP template now writes each accepted DATA body as a .eml file into a
bind-mounted per-decky quarantine dir and emits a `message_stored` log
with sha256, size, decoded headers, and an attachment manifest
(filename + sha256 + size + content-type). Attachment hashing uses the
*decoded* payload so operators can match against VT / MalwareBazaar
directly. Body accumulator is capped at SMTP_MAX_BODY_BYTES (default
10 MB, matching the EHLO SIZE advert) so a streaming client can't OOM
the container.

The existing /api/v1/artifacts/{decky}/{stored_as} endpoint now takes
an optional ?service= query param (defaults to ssh for back-compat)
and can serve .eml files out of the smtp subdir. Forensic metadata
rides the normal log pipeline, same as SSH file_captured.
This commit is contained in:
2026-04-22 22:17:50 -04:00
parent d47a84c90b
commit c50448995b
7 changed files with 430 additions and 13 deletions

View File

@@ -1,8 +1,14 @@
import os
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent / "templates" / "smtp"
ARTIFACTS_ROOT = os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts")
# In-container path for full-message capture. /var/spool/mqueue is where
# sendmail historically parks unsent messages, so `ls` / `mount` inside the
# container looks benign to an attacker poking around.
_IN_CONTAINER_QUARANTINE = "/var/spool/mqueue"
class SMTPService(BaseService):
@@ -17,6 +23,7 @@ class SMTPService(BaseService):
service_cfg: dict | None = None,
) -> dict:
cfg = service_cfg or {}
quarantine_host = f"{ARTIFACTS_ROOT}/{decky_name}/smtp"
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-smtp",
@@ -24,7 +31,9 @@ class SMTPService(BaseService):
"cap_add": ["NET_BIND_SERVICE"],
"environment": {
"NODE_NAME": decky_name,
"SMTP_QUARANTINE_DIR": _IN_CONTAINER_QUARANTINE,
},
"volumes": [f"{quarantine_host}:{_IN_CONTAINER_QUARANTINE}:rw"],
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target

View File

@@ -1,3 +1,4 @@
import os
from pathlib import Path
from decnet.services.base import BaseService
@@ -5,6 +6,9 @@ from decnet.services.base import BaseService
# Reuses the same template as the smtp service — only difference is
# SMTP_OPEN_RELAY=1 in the environment, which enables the open relay persona.
_TEMPLATES_DIR = Path(__file__).parent.parent / "templates" / "smtp"
ARTIFACTS_ROOT = os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts")
# See decnet/services/smtp.py — benign-looking in-container quarantine path.
_IN_CONTAINER_QUARANTINE = "/var/spool/mqueue"
class SMTPRelayService(BaseService):
@@ -21,6 +25,7 @@ class SMTPRelayService(BaseService):
service_cfg: dict | None = None,
) -> dict:
cfg = service_cfg or {}
quarantine_host = f"{ARTIFACTS_ROOT}/{decky_name}/smtp"
fragment: dict = {
"build": {"context": str(_TEMPLATES_DIR)},
"container_name": f"{decky_name}-smtp_relay",
@@ -29,7 +34,9 @@ class SMTPRelayService(BaseService):
"environment": {
"NODE_NAME": decky_name,
"SMTP_OPEN_RELAY": "1",
"SMTP_QUARANTINE_DIR": _IN_CONTAINER_QUARANTINE,
},
"volumes": [f"{quarantine_host}:{_IN_CONTAINER_QUARANTINE}:rw"],
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target

View File

@@ -20,10 +20,16 @@ The DATA state machine (and the 502-per-line bug) is fixed in both modes.
import asyncio
import base64
import hashlib
import json
import os
import random as _rand
import re
import time
from datetime import datetime, timezone
from email import message_from_bytes
from email.header import decode_header, make_header
from email.message import Message
import instance_seed as _seed
from syslog_bridge import SEVERITY_WARNING, syslog_line, write_syslog_file, forward_syslog
@@ -50,6 +56,15 @@ _RCPT_DROP_RATE = float(os.environ.get("SMTP_RCPT_DROP_RATE", "0.08"))
_SMTP_BANNER = os.environ.get("SMTP_BANNER", f"220 {NODE_NAME} ESMTP Postfix (Debian/GNU)")
_SMTP_MTA = os.environ.get("SMTP_MTA", NODE_NAME)
# Full-message capture: bind-mounted quarantine dir (host path
# /var/lib/decnet/artifacts/{decky}/smtp). When unset, capture is skipped —
# the container still accepts mail, it just doesn't persist the body. Used by
# tests and by deployments that don't want disk persistence.
_QUARANTINE_DIR = os.environ.get("SMTP_QUARANTINE_DIR", "")
# EHLO advertises SIZE 10240000 (10 MB). Cap the accumulator at the same
# value so a crafted client can't OOM the container by streaming forever.
_MAX_BODY_BYTES = int(os.environ.get("SMTP_MAX_BODY_BYTES", "10485760"))
# Postfix's queue-ID character set (real one: excludes vowels and look-alikes
# like 0/O, 1/I, so scanners that know Postfix's alphabet are satisfied).
_QUEUE_CHARS = "BCDFGHJKLMNPQRSTVWXYZ23456789"
@@ -82,6 +97,95 @@ def _rand_msg_id() -> str:
return base + _QUEUE_CHARS[suffix_idx]
def _decode_header(raw: str | None) -> str:
"""Best-effort decode of an RFC 2047 encoded-word header to Unicode.
Returns "" for missing / undecodable values so callers can treat the
result as a plain string.
"""
if not raw:
return ""
try:
return str(make_header(decode_header(raw)))
except Exception:
return raw
# Stored_as format mirrors the SSH artifact convention so the existing
# /api/v1/artifacts/{decky}/{stored_as} endpoint and its filename regex
# accept SMTP drops unchanged: <iso_ts>_<sha12>_<basename>. The basename
# always ends in .eml so operators can open it in any MUA.
_STORED_AS_BASE_RE = re.compile(r"[^A-Za-z0-9._-]")
def _summarize_message(body: bytes, msg_id: str) -> dict:
"""Parse the DATA body and extract forensic metadata.
Returns a dict with:
subject, from_hdr, to_hdr, date_hdr, message_id_hdr, content_type,
attachments: list of {filename, content_type, size, sha256}.
Headers are RFC 2047 decoded. Attachment hashing uses the *decoded*
payload so operators can match against VT / MalwareBazaar directly.
"""
try:
msg: Message = message_from_bytes(body)
except Exception:
return {
"subject": "", "from_hdr": "", "to_hdr": "", "date_hdr": "",
"message_id_hdr": "", "content_type": "", "attachments": [],
}
attachments: list[dict] = []
for part in msg.walk():
if part.is_multipart():
continue
disposition = (part.get("Content-Disposition") or "").lower()
filename = part.get_filename()
# Treat any part with an explicit filename as an attachment, even
# when Content-Disposition is missing — spam kits frequently omit it.
if not filename and "attachment" not in disposition:
continue
try:
payload = part.get_payload(decode=True) or b""
except Exception:
payload = b""
attachments.append({
"filename": _decode_header(filename) or "",
"content_type": part.get_content_type(),
"size": len(payload),
"sha256": hashlib.sha256(payload).hexdigest() if payload else "",
})
return {
"subject": _decode_header(msg.get("Subject")),
"from_hdr": _decode_header(msg.get("From")),
"to_hdr": _decode_header(msg.get("To")),
"date_hdr": _decode_header(msg.get("Date")),
"message_id_hdr": _decode_header(msg.get("Message-ID")),
"content_type": msg.get_content_type(),
"attachments": attachments,
}
def _persist_message(body: bytes, msg_id: str) -> str | None:
"""Write the raw DATA body to the quarantine dir as a .eml file.
Returns the stored_as basename on success, None if capture is disabled
or the write failed. The SMTP reply is always 250 regardless — a real
relay is opaque about its storage path.
"""
if not _QUARANTINE_DIR:
return None
sha = hashlib.sha256(body).hexdigest()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
safe_id = _STORED_AS_BASE_RE.sub("_", msg_id)[:32] or "msg"
stored_as = f"{ts}_{sha[:12]}_{safe_id}.eml"
try:
with open(os.path.join(_QUARANTINE_DIR, stored_as), "wb") as fh:
fh.write(body)
return stored_as
except OSError:
return None
def _decode_auth_plain(blob: str) -> tuple[str, str]:
"""Decode SASL PLAIN: base64(authzid\0authcid\0passwd) → (user, pass)."""
try:
@@ -107,6 +211,11 @@ class SMTPProtocol(asyncio.Protocol):
# DATA accumulation
self._in_data = False
self._data_buf: list[str] = []
# Running byte count for the DATA body; once this exceeds
# _MAX_BODY_BYTES we stop appending to _data_buf but keep
# consuming lines so the session still terminates cleanly.
self._data_bytes = 0
self._data_truncated = False
# AUTH multi-step state (LOGIN mechanism sends user/pass in separate lines)
self._auth_state = "" # "" | "await_user" | "await_pass"
self._auth_user = ""
@@ -135,25 +244,67 @@ class SMTPProtocol(asyncio.Protocol):
# ── DATA body accumulation ────────────────────────────────────────────
if self._in_data:
if line == ".":
body = "\r\n".join(self._data_buf)
body_str = "\r\n".join(self._data_buf)
body = body_str.encode("utf-8", errors="replace")
msg_id = _rand_msg_id()
_log("message_accepted",
src=self._peer[0],
mail_from=self._mail_from,
rcpt_to=",".join(self._rcpt_to),
body_bytes=len(body),
truncated=int(self._data_truncated),
msg_id=msg_id)
# Persist the full .eml into the quarantine bind mount
# (if configured) and emit a richer event so the collector
# can index attachments + headers. This is the hook the
# dashboard's "sent mail" viewer reads.
stored_as = _persist_message(body, msg_id)
if stored_as is not None:
summary = _summarize_message(body, msg_id)
_log(
"message_stored",
src=self._peer[0],
msg_id=msg_id,
stored_as=stored_as,
sha256=hashlib.sha256(body).hexdigest(),
size=len(body),
truncated=int(self._data_truncated),
mail_from=self._mail_from,
rcpt_to=",".join(self._rcpt_to),
subject=summary["subject"][:512],
from_hdr=summary["from_hdr"][:256],
to_hdr=summary["to_hdr"][:512],
date_hdr=summary["date_hdr"][:64],
message_id_hdr=summary["message_id_hdr"][:256],
content_type=summary["content_type"],
attachment_count=len(summary["attachments"]),
# Full manifest (filename/sha256/size/content_type)
# rides as a compact JSON blob — the SD-value escape
# in syslog_bridge handles the quotes and brackets.
attachments_json=json.dumps(summary["attachments"], separators=(",", ":")),
)
# Real MTAs take tens of ms to queue; instantaneous replies
# on DATA are a tell.
_seed.jitter_sync(30, 180)
self._transport.write(f"250 2.0.0 Ok: queued as {msg_id}\r\n".encode())
self._in_data = False
self._data_buf = []
self._mail_from = ""
self._rcpt_to = []
self._in_data = False
self._data_buf = []
self._data_bytes = 0
self._data_truncated = False
self._mail_from = ""
self._rcpt_to = []
else:
# RFC 5321 dot-stuffing: strip leading dot
self._data_buf.append(line[1:] if line.startswith(".") else line)
decoded = line[1:] if line.startswith(".") else line
# +2 accounts for the CRLF that rejoins this line to the body.
new_total = self._data_bytes + len(decoded.encode("utf-8", errors="replace")) + 2
if new_total <= _MAX_BODY_BYTES:
self._data_buf.append(decoded)
self._data_bytes = new_total
else:
# Stop appending but keep consuming so the client's
# final CRLF.CRLF still terminates the state machine.
self._data_truncated = True
return
# ── AUTH multi-step (LOGIN / PLAIN continuation) ─────────────────────
@@ -253,6 +404,8 @@ class SMTPProtocol(asyncio.Protocol):
self._rcpt_to = []
self._in_data = False
self._data_buf = []
self._data_bytes = 0
self._data_truncated = False
self._auth_state = ""
self._auth_user = ""
self._transport.write(b"250 2.0.0 Ok\r\n")

View File

@@ -16,7 +16,7 @@ import os
import re
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import FileResponse
from decnet.telemetry import traced as _traced
@@ -25,13 +25,17 @@ from decnet.web.dependencies import require_admin
router = APIRouter()
# Override via env for tests; the prod path matches the bind mount declared in
# decnet/services/ssh.py.
# decnet/services/ssh.py and decnet/services/smtp.py.
ARTIFACTS_ROOT = Path(os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts"))
# decky names come from the deployer — lowercase alnum plus hyphens.
_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
# stored_as is assembled by capture.sh as:
# Services that own an artifacts subdir. Kept explicit so a caller can't
# pivot into arbitrary subpaths via the query string.
_ALLOWED_SERVICES = {"ssh", "smtp"}
# stored_as is assembled by the capturing template as:
# ${ts}_${sha:0:12}_${base}
# where ts is ISO-8601 UTC (e.g. 2026-04-18T02:22:56Z), sha is 12 hex chars,
# and base is the original filename's basename. Keep the filename charset
@@ -41,16 +45,18 @@ _STORED_AS_RE = re.compile(
)
def _resolve_artifact_path(decky: str, stored_as: str) -> Path:
def _resolve_artifact_path(decky: str, stored_as: str, service: str) -> Path:
"""Validate inputs, resolve the on-disk path, and confirm it stays inside
the artifacts root. Raises HTTPException(400) on any violation."""
if service not in _ALLOWED_SERVICES:
raise HTTPException(status_code=400, detail="invalid service")
if not _DECKY_RE.fullmatch(decky):
raise HTTPException(status_code=400, detail="invalid decky name")
if not _STORED_AS_RE.fullmatch(stored_as):
raise HTTPException(status_code=400, detail="invalid stored_as")
root = ARTIFACTS_ROOT.resolve()
candidate = (root / decky / "ssh" / stored_as).resolve()
candidate = (root / decky / service / stored_as).resolve()
# defence-in-depth: even though the regexes reject `..`, make sure a
# symlink or weird filesystem state can't escape the root.
if root not in candidate.parents and candidate != root:
@@ -62,7 +68,7 @@ def _resolve_artifact_path(decky: str, stored_as: str) -> Path:
"/artifacts/{decky}/{stored_as}",
tags=["Artifacts"],
responses={
400: {"description": "Invalid decky or stored_as parameter"},
400: {"description": "Invalid decky, service, or stored_as parameter"},
401: {"description": "Could not validate credentials"},
403: {"description": "Admin access required"},
404: {"description": "Artifact not found"},
@@ -72,9 +78,10 @@ def _resolve_artifact_path(decky: str, stored_as: str) -> Path:
async def get_artifact(
decky: str,
stored_as: str,
service: str = Query("ssh", pattern=r"^[a-z]{1,16}$"),
admin: dict = Depends(require_admin),
) -> FileResponse:
path = _resolve_artifact_path(decky, stored_as)
path = _resolve_artifact_path(decky, stored_as, service)
if not path.is_file():
raise HTTPException(status_code=404, detail="artifact not found")
return FileResponse(