feat(smtp_relay): forward probe emails upstream so attackers verify relay works

First SMTP_PROBE_LIMIT messages per source IP are forwarded via a real
upstream relay (SMTP_UPSTREAM_HOST/PORT/USER/PASS) so the attacker's
test email actually lands in their inbox. All subsequent messages from
the same IP get 250 Ok but only hit the quarantine — campaign content
captured, nothing delivered.
This commit is contained in:
2026-04-30 11:21:04 -04:00
parent 4b7cb42ab1
commit 9a4fe2677b
3 changed files with 139 additions and 0 deletions

View File

@@ -34,6 +34,39 @@ class SMTPRelayService(BaseService):
default="postfix",
help="Shapes EHLO capability list and error wording.",
),
ServiceConfigField(
key="upstream_host",
label="Upstream relay host",
type="string",
placeholder="smtp.sendgrid.net",
help="Real SMTP relay used to forward probe emails. Leave blank to disable forwarding.",
),
ServiceConfigField(
key="upstream_port",
label="Upstream relay port",
type="int",
default=25,
help="Port on the upstream relay (25 or 587).",
),
ServiceConfigField(
key="upstream_user",
label="Upstream relay username",
type="string",
help="AUTH username for the upstream relay (optional).",
),
ServiceConfigField(
key="upstream_pass",
label="Upstream relay password",
type="string",
help="AUTH password for the upstream relay (optional).",
),
ServiceConfigField(
key="probe_limit",
label="Probe forward limit",
type="int",
default=1,
help="Number of emails per source IP to actually deliver upstream. All subsequent emails are silently quarantined.",
),
]
def compose_fragment(
@@ -62,6 +95,16 @@ class SMTPRelayService(BaseService):
fragment["environment"]["SMTP_BANNER"] = cfg["banner"]
if "mta" in cfg:
fragment["environment"]["SMTP_MTA"] = cfg["mta"]
if "upstream_host" in cfg:
fragment["environment"]["SMTP_UPSTREAM_HOST"] = cfg["upstream_host"]
if "upstream_port" in cfg:
fragment["environment"]["SMTP_UPSTREAM_PORT"] = str(cfg["upstream_port"])
if "upstream_user" in cfg:
fragment["environment"]["SMTP_UPSTREAM_USER"] = cfg["upstream_user"]
if "upstream_pass" in cfg:
fragment["environment"]["SMTP_UPSTREAM_PASS"] = cfg["upstream_pass"]
if "probe_limit" in cfg:
fragment["environment"]["SMTP_PROBE_LIMIT"] = str(cfg["probe_limit"])
return fragment
def dockerfile_context(self) -> Path:

View File

@@ -25,7 +25,9 @@ import json
import os
import random as _rand
import re
import smtplib
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from email import message_from_bytes
from email.header import decode_header, make_header
@@ -46,6 +48,22 @@ LOG_TARGET = os.environ.get("LOG_TARGET", "")
PORT = int(os.environ.get("PORT", "25"))
OPEN_RELAY = os.environ.get("SMTP_OPEN_RELAY", "0").strip() == "1"
# Upstream relay for probe forwarding. When set, the first SMTP_PROBE_LIMIT
# messages per source IP are actually delivered via this upstream so the
# attacker can verify receipt and proceeds to run their campaign. All subsequent
# messages get 250 OK but only land in quarantine.
_UPSTREAM_HOST = os.environ.get("SMTP_UPSTREAM_HOST", "").strip()
_UPSTREAM_PORT = int(os.environ.get("SMTP_UPSTREAM_PORT", "25"))
_UPSTREAM_USER = os.environ.get("SMTP_UPSTREAM_USER", "").strip()
_UPSTREAM_PASS = os.environ.get("SMTP_UPSTREAM_PASS", "").strip()
_PROBE_LIMIT = int(os.environ.get("SMTP_PROBE_LIMIT", "1"))
# Per-source-IP count of messages that have been actually forwarded upstream.
# Bounded at _IP_COUNT_MAX entries to avoid unbounded growth over long runs.
_ip_delivery_count: dict[str, int] = {}
_IP_COUNT_MAX = 20_000
_forward_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="smtp-fwd")
# In open-relay mode, optionally restrict which creds succeed. Blank means
# "accept anything". Format: "user1,user2,..." — any name not in the list
# gets a 535 instead of 235, so the relay looks realistically selective.
@@ -77,6 +95,28 @@ _QUEUE_CHARS = "BCDFGHJKLMNPQRSTVWXYZ23456789"
_Q_BASE = len(_QUEUE_CHARS)
def _forward_probe_sync(
mail_from: str,
rcpt_to: list[str],
body: bytes,
msg_id: str,
) -> bool:
"""Forward a probe email to the real upstream relay (blocking, runs in thread pool).
Returns True on success. Any exception is swallowed — the honeypot always
replies 250 regardless of whether the upstream accepted the message.
"""
try:
with smtplib.SMTP(_UPSTREAM_HOST, _UPSTREAM_PORT, timeout=15) as conn:
conn.ehlo(NODE_NAME)
if _UPSTREAM_USER and _UPSTREAM_PASS:
conn.login(_UPSTREAM_USER, _UPSTREAM_PASS)
conn.sendmail(mail_from, rcpt_to, body)
return True
except Exception:
return False
def _log(event_type: str, severity: int = 6, **kwargs) -> None:
line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs)
write_syslog_file(line)
@@ -260,6 +300,34 @@ class SMTPProtocol(asyncio.Protocol):
body_bytes=len(body),
truncated=int(self._data_truncated),
msg_id=msg_id)
# Forward the probe email upstream so the attacker can verify
# receipt from their own inbox and proceeds to run their
# campaign. Only the first SMTP_PROBE_LIMIT messages per
# source IP are forwarded; the rest get 250 OK but only land
# in quarantine — the attacker never notices.
src_ip = self._peer[0]
delivery_count = _ip_delivery_count.get(src_ip, 0)
if _UPSTREAM_HOST and delivery_count < _PROBE_LIMIT:
if len(_ip_delivery_count) >= _IP_COUNT_MAX:
_ip_delivery_count.clear()
_ip_delivery_count[src_ip] = delivery_count + 1
_new_count = delivery_count + 1
_fwd_from = self._mail_from
_fwd_rcpt = list(self._rcpt_to)
_fwd_body = body
_fwd_id = msg_id
_fwd_src = src_ip
def _on_fwd_done(fut, _src=_fwd_src, _mid=_fwd_id, _n=_new_count):
ok = fut.result() if not fut.exception() else False
_log("probe_forwarded", src=_src, msg_id=_mid,
forwarded=int(ok), delivery_count=_n)
fut = asyncio.get_event_loop().run_in_executor(
_forward_pool, _forward_probe_sync,
_fwd_from, _fwd_rcpt, _fwd_body, _fwd_id,
)
fut.add_done_callback(_on_fwd_done)
# Persist the full .eml into the quarantine bind mount
# (if configured) and emit a richer event so the collector
# can index attachments + headers. This is the hook the

View File

@@ -28,6 +28,34 @@ def test_smtp_relay_dockerfile_context():
assert ctx.is_dir()
def test_smtp_relay_upstream_cfg():
svc = SMTPRelayService()
fragment = svc.compose_fragment(
"test-decky",
service_cfg={
"upstream_host": "smtp.sendgrid.net",
"upstream_port": 587,
"upstream_user": "apikey",
"upstream_pass": "SG.secret",
"probe_limit": 2,
},
)
env = fragment["environment"]
assert env["SMTP_UPSTREAM_HOST"] == "smtp.sendgrid.net"
assert env["SMTP_UPSTREAM_PORT"] == "587"
assert env["SMTP_UPSTREAM_USER"] == "apikey"
assert env["SMTP_UPSTREAM_PASS"] == "SG.secret"
assert env["SMTP_PROBE_LIMIT"] == "2"
def test_smtp_relay_upstream_not_set_by_default():
svc = SMTPRelayService()
fragment = svc.compose_fragment("test-decky")
env = fragment["environment"]
assert "SMTP_UPSTREAM_HOST" not in env
assert "SMTP_PROBE_LIMIT" not in env
def test_smtp_relay_quarantine_bind_mount():
"""Full-message capture: each decky gets its own host quarantine dir
bind-mounted into the container, and the in-container path is exposed