Files
DECNET/decnet/templates/smtp/server.py
anti 3fb84ac5d0 feat(templates): per-instance stealth via instance_seed in service servers
Every service template now pulls version strings, cluster/node UUIDs, auth
salts, greeting banners, and uptime from the seeded per-instance RNG instead
of hard-coded defaults. Scanners sweeping the fleet now see legitimately
diverging fingerprints per decky while each decky's own responses stay
internally consistent across restarts.

Covers elasticsearch, ftp, http, https, ldap, mongodb, mqtt, mssql, mysql,
postgres, redis, and smtp templates.
2026-04-22 09:24:16 -04:00

332 lines
14 KiB
Python

#!/usr/bin/env python3
"""
SMTP server — emulates a realistic ESMTP server (Postfix-style).
Two modes of operation, controlled by SMTP_OPEN_RELAY:
SMTP_OPEN_RELAY=0 (default) — credential harvester
AUTH attempts are logged and rejected (535).
RCPT TO is rejected with 554 (relay denied) for all recipients.
This captures credential stuffing and scanning activity.
SMTP_OPEN_RELAY=1 — open relay bait
AUTH is accepted for any credentials (235).
RCPT TO is accepted for any domain (250).
DATA is fully buffered until CRLF.CRLF and acknowledged with a
queued-as message ID. Attractive to spam relay operators.
The DATA state machine (and the 502-per-line bug) is fixed in both modes.
"""
import asyncio
import base64
import os
import random as _rand
import re
import time
import instance_seed as _seed
from syslog_bridge import SEVERITY_WARNING, syslog_line, write_syslog_file, forward_syslog
NODE_NAME = os.environ.get("NODE_NAME", "mailserver")
SERVICE_NAME = "smtp"
LOG_TARGET = os.environ.get("LOG_TARGET", "")
PORT = int(os.environ.get("PORT", "25"))
OPEN_RELAY = os.environ.get("SMTP_OPEN_RELAY", "0").strip() == "1"
# In open-relay mode, optionally restrict which creds succeed. Blank means
# "accept anything". Format: "user1,user2,..." — any name not in the list
# gets a 535 instead of 235, so the relay looks realistically selective.
_AUTH_WHITELIST = {u.strip() for u in os.environ.get("SMTP_AUTH_WHITELIST", "").split(",") if u.strip()}
# Open-relay filtering. Even compromised/misconfigured relays aren't pure
# tarpits — Postfix rejects malformed addresses at RCPT time, and many drop
# a small fraction of external recipients under greylisting or reputation
# checks. Accepting literally every RCPT is a honeypot tell.
_ADDR_RE = re.compile(r"^<?([^\s<>@]+)@([A-Za-z0-9.-]+\.[A-Za-z]{2,})>?$")
_BLOCKED_TLDS = {"invalid", "test", "localhost", "local", "example"}
_RCPT_DROP_RATE = float(os.environ.get("SMTP_RCPT_DROP_RATE", "0.08"))
_SMTP_BANNER = os.environ.get("SMTP_BANNER", f"220 {NODE_NAME} ESMTP Postfix (Debian/GNU)")
_SMTP_MTA = os.environ.get("SMTP_MTA", NODE_NAME)
# Postfix's queue-ID character set (real one: excludes vowels and look-alikes
# like 0/O, 1/I, so scanners that know Postfix's alphabet are satisfied).
_QUEUE_CHARS = "BCDFGHJKLMNPQRSTVWXYZ23456789"
_Q_BASE = len(_QUEUE_CHARS)
def _log(event_type: str, severity: int = 6, **kwargs) -> None:
line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs)
write_syslog_file(line)
forward_syslog(line, LOG_TARGET)
def _rand_msg_id() -> str:
"""Postfix-style queue ID.
Real Postfix derives its short queue IDs from the message's arrival
microseconds, base-encoded with a vowel-free alphabet — so IDs are
monotonically increasing and visually distinctive. We encode the current
microsecond count with Postfix's actual character set, then append a
short per-instance suffix so two deckies never emit identical IDs at
the same instant.
"""
us = int(time.time() * 1_000_000)
out: list[str] = []
while us and len(out) < 10:
us, r = divmod(us, _Q_BASE)
out.append(_QUEUE_CHARS[r])
base = "".join(reversed(out)) or _QUEUE_CHARS[0]
suffix_idx = _seed.rng.randint(0, _Q_BASE - 1)
return base + _QUEUE_CHARS[suffix_idx]
def _decode_auth_plain(blob: str) -> tuple[str, str]:
"""Decode SASL PLAIN: base64(authzid\0authcid\0passwd) → (user, pass)."""
try:
decoded = base64.b64decode(blob + "==").decode(errors="replace")
parts = decoded.split("\x00")
if len(parts) >= 3:
return parts[1], parts[2]
if len(parts) == 2:
return parts[0], parts[1]
except Exception:
pass
return blob, ""
class SMTPProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = ("?", 0)
self._buf = b""
# per-transaction state
self._mail_from = ""
self._rcpt_to: list[str] = []
# DATA accumulation
self._in_data = False
self._data_buf: list[str] = []
# AUTH multi-step state (LOGIN mechanism sends user/pass in separate lines)
self._auth_state = "" # "" | "await_user" | "await_pass"
self._auth_user = ""
# ── asyncio.Protocol ──────────────────────────────────────────────────────
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
transport.write(f"{_SMTP_BANNER}\r\n".encode())
def data_received(self, data):
self._buf += data
while b"\n" in self._buf:
line, self._buf = self._buf.split(b"\n", 1)
# Strip trailing \r so both CRLF and bare LF work
self._handle_line(line.rstrip(b"\r").decode(errors="replace"))
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
# ── Command dispatch ──────────────────────────────────────────────────────
def _handle_line(self, line: str) -> None:
# ── DATA body accumulation ────────────────────────────────────────────
if self._in_data:
if line == ".":
body = "\r\n".join(self._data_buf)
msg_id = _rand_msg_id()
_log("message_accepted",
src=self._peer[0],
mail_from=self._mail_from,
rcpt_to=",".join(self._rcpt_to),
body_bytes=len(body),
msg_id=msg_id)
# Real MTAs take tens of ms to queue; instantaneous replies
# on DATA are a tell.
_seed.jitter_sync(30, 180)
self._transport.write(f"250 2.0.0 Ok: queued as {msg_id}\r\n".encode())
self._in_data = False
self._data_buf = []
self._mail_from = ""
self._rcpt_to = []
else:
# RFC 5321 dot-stuffing: strip leading dot
self._data_buf.append(line[1:] if line.startswith(".") else line)
return
# ── AUTH multi-step (LOGIN / PLAIN continuation) ─────────────────────
if self._auth_state == "await_plain":
user, password = _decode_auth_plain(line)
self._finish_auth(user, password)
self._auth_state = ""
return
if self._auth_state == "await_user":
self._auth_user = base64.b64decode(line + "==").decode(errors="replace")
self._auth_state = "await_pass"
self._transport.write(b"334 UGFzc3dvcmQ6\r\n") # "Password:"
return
if self._auth_state == "await_pass":
password = base64.b64decode(line + "==").decode(errors="replace")
self._finish_auth(self._auth_user, password)
self._auth_state = ""
self._auth_user = ""
return
# ── Normal command dispatch ───────────────────────────────────────────
parts = line.split(None, 1)
cmd = parts[0].upper() if parts else ""
args = parts[1] if len(parts) > 1 else ""
if cmd in ("EHLO", "HELO"):
if not args:
self._transport.write(
f"501 5.5.4 Syntax: {cmd} hostname\r\n".encode()
)
return
_log("ehlo", src=self._peer[0], domain=args)
self._transport.write(
f"250-{_SMTP_MTA}\r\n"
f"250-PIPELINING\r\n"
f"250-SIZE 10240000\r\n"
f"250-VRFY\r\n"
f"250-ETRN\r\n"
f"250-AUTH PLAIN LOGIN\r\n"
f"250-ENHANCEDSTATUSCODES\r\n"
f"250-8BITMIME\r\n"
f"250 DSN\r\n".encode()
)
elif cmd == "AUTH":
self._handle_auth(args)
elif cmd == "MAIL":
addr = args.split(":", 1)[1].strip() if ":" in args else args
self._mail_from = addr
_log("mail_from", src=self._peer[0], value=addr)
self._transport.write(b"250 2.1.0 Ok\r\n")
elif cmd == "RCPT":
addr = args.split(":", 1)[1].strip() if ":" in args else args
if OPEN_RELAY:
match = _ADDR_RE.match(addr)
if not match:
_log("rcpt_rejected_syntax", src=self._peer[0], value=addr,
severity=SEVERITY_WARNING)
self._transport.write(
b"501 5.1.3 Bad recipient address syntax\r\n"
)
elif match.group(2).rsplit(".", 1)[-1].lower() in _BLOCKED_TLDS:
_log("rcpt_rejected_tld", src=self._peer[0], value=addr,
severity=SEVERITY_WARNING)
self._transport.write(
b"550 5.1.2 <" + addr.encode()
+ b">: Recipient address rejected: Domain not found\r\n"
)
elif _rand.random() < _RCPT_DROP_RATE:
_log("rcpt_greylisted", src=self._peer[0], value=addr)
self._transport.write(
b"451 4.7.1 <" + addr.encode()
+ b">: Recipient address rejected: Greylisted, try again later\r\n"
)
else:
self._rcpt_to.append(addr)
_log("rcpt_to", src=self._peer[0], value=addr)
self._transport.write(b"250 2.1.5 Ok\r\n")
else:
_log("rcpt_denied", src=self._peer[0], value=addr,
severity=SEVERITY_WARNING)
self._transport.write(
b"554 5.7.1 <" + addr.encode() + b">: Relay access denied\r\n"
)
elif cmd == "DATA":
if not self._rcpt_to:
self._transport.write(b"503 5.5.1 Error: need RCPT command\r\n")
else:
self._in_data = True
self._transport.write(b"354 End data with <CR><LF>.<CR><LF>\r\n")
elif cmd == "RSET":
self._mail_from = ""
self._rcpt_to = []
self._in_data = False
self._data_buf = []
self._auth_state = ""
self._auth_user = ""
self._transport.write(b"250 2.0.0 Ok\r\n")
elif cmd == "VRFY":
_log("vrfy", src=self._peer[0], value=args)
self._transport.write(b"252 2.0.0 Cannot VRFY user\r\n")
elif cmd == "NOOP":
self._transport.write(b"250 2.0.0 Ok\r\n")
elif cmd == "STARTTLS":
self._transport.write(b"454 4.7.0 TLS not available due to local problem\r\n")
elif cmd == "QUIT":
self._transport.write(b"221 2.0.0 Bye\r\n")
self._transport.close()
else:
_log("unknown_command", src=self._peer[0], command=line[:128])
self._transport.write(b"502 5.5.2 Error: command not recognized\r\n")
# ── AUTH helpers ──────────────────────────────────────────────────────────
def _handle_auth(self, args: str) -> None:
parts = args.split(None, 1)
mech = parts[0].upper() if parts else ""
initial = parts[1] if len(parts) > 1 else ""
if mech == "PLAIN":
if initial:
user, password = _decode_auth_plain(initial)
self._finish_auth(user, password)
else:
# Client will send credentials on next line
self._auth_state = "await_plain"
self._transport.write(b"334 \r\n")
elif mech == "LOGIN":
if initial:
self._auth_user = base64.b64decode(initial + "==").decode(errors="replace")
self._auth_state = "await_pass"
self._transport.write(b"334 UGFzc3dvcmQ6\r\n") # "Password:"
else:
self._auth_state = "await_user"
self._transport.write(b"334 VXNlcm5hbWU6\r\n") # "Username:"
else:
self._transport.write(b"504 5.5.4 Unrecognized authentication mechanism\r\n")
def _finish_auth(self, username: str, password: str) -> None:
_log("auth_attempt", src=self._peer[0],
username=username, password=password,
severity=SEVERITY_WARNING)
if not OPEN_RELAY:
self._transport.write(b"535 5.7.8 Error: authentication failed\r\n")
return
# Open-relay mode: still be selective so the decoy doesn't look like a
# tarpit that accepts literally anything. If no whitelist is set,
# accept; otherwise gate on username presence.
accepted = not _AUTH_WHITELIST or username in _AUTH_WHITELIST
if accepted:
self._transport.write(b"235 2.7.0 Authentication successful\r\n")
else:
self._transport.write(b"535 5.7.8 Error: authentication failed\r\n")
async def main():
mode = "open-relay" if OPEN_RELAY else "credential-harvester"
_log("startup", msg=f"SMTP server starting as {NODE_NAME} ({mode})")
loop = asyncio.get_running_loop()
server = await loop.create_server(SMTPProtocol, "0.0.0.0", PORT) # nosec B104
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())