feat(templates): IMAP/POP3 servers read EML spool from emailgen

When IMAP_EMAIL_SEED / POP3_EMAIL_SEED points at a directory of .eml
files (the orchestrator emailgen worker's drop path,
/var/spool/decnet-emails/ by convention), the bait mailbox is replaced
with those LLM-generated, persona-driven, threaded messages.  Empty /
missing dir keeps the hardcoded fallback so a fresh deployment is never
silent.  Cached with mtime invalidation + a short TTL so a hot mailbox
doesn't pay the parse cost on every IMAP/POP3 command.

Replaces the DEBT-026 stub on both templates that named the env var but
never wired it through.
This commit is contained in:
2026-04-26 22:21:01 -04:00
parent 3ee55ec341
commit 2979997442
4 changed files with 463 additions and 21 deletions

View File

@@ -11,7 +11,12 @@ Banner advertises Dovecot so nmap fingerprints correctly.
""" """
import asyncio import asyncio
import email
import email.policy
import os import os
import time
from email.utils import getaddresses
from pathlib import Path
from syslog_bridge import ( from syslog_bridge import (
SEVERITY_WARNING, SEVERITY_WARNING,
encode_secret, encode_secret,
@@ -31,10 +36,20 @@ VALID_USERS: dict[str, str] = {
u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)] u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)]
} }
# DEBT-026: path to a JSON file with custom email definitions. # Path to a directory of ``*.eml`` files that the orchestrator emailgen
# When set, _BAIT_EMAILS should be replaced/extended from that file. # worker drops into the container (``/var/spool/decnet-emails/`` by
# Wiring (service_cfg["email_seed"] → compose_fragment → env var → here) is deferred. # convention). When set AND the directory contains parseable EMLs,
_EMAIL_SEED_PATH = os.environ.get("IMAP_EMAIL_SEED", "") # stub — currently unused # they replace the hardcoded ``_BAIT_EMAILS`` fallback below — meaning
# every mail an attacker reads is the LLM-generated, persona-driven,
# language-aware version, not the static credential-stuffed bait list.
# Empty / missing / unparseable: the fallback list still serves so a
# fresh deployment is never silent.
_EMAIL_SEED_PATH = os.environ.get("IMAP_EMAIL_SEED", "")
# Re-scan the seed directory at most this often. Cheap: walking a few
# dozen .eml files is sub-millisecond, but caching keeps an attacker's
# rapid LIST/FETCH burst from re-parsing the same files on every
# command. Mtime check still triggers a re-load on real changes.
_SEED_RESCAN_INTERVAL = float(os.environ.get("IMAP_EMAIL_SEED_RESCAN", "5"))
# ── Bait emails ─────────────────────────────────────────────────────────────── # ── Bait emails ───────────────────────────────────────────────────────────────
# All 10 live in INBOX. UID == sequence number. # All 10 live in INBOX. UID == sequence number.
@@ -238,6 +253,119 @@ _BAIT_EMAILS: list[dict] = [
_MAILBOXES = ["INBOX", "Sent", "Drafts", "Archive"] _MAILBOXES = ["INBOX", "Sent", "Drafts", "Archive"]
# ── Spool-backed email loader ─────────────────────────────────────────────────
# When IMAP_EMAIL_SEED points at a directory of .eml files the
# orchestrator emailgen worker has dropped into the container, parse
# them on demand and serve them as the INBOX. Cached between requests
# with a short TTL + mtime check so a hot mailbox doesn't pay the parse
# cost on every IMAP command.
#
# Failure modes (missing dir, unparseable EMLs, empty dir) all return
# the hardcoded fallback rather than 0 messages — a silent INBOX is a
# stronger tell than a slightly-stale one.
_seed_cache: list[dict] | None = None
_seed_cache_dir_mtime: float = 0.0
_seed_cache_loaded_at: float = 0.0
def _split_addr(value: str) -> tuple[str, str]:
"""Return (display_name, email) from a header value, falling back to
the raw string when the parse fails. Worker side; we don't need
real RFC 5322 — just enough to populate the IMAP envelope tuple."""
if not value:
return "", ""
pairs = getaddresses([value])
if not pairs:
return "", value
name, addr = pairs[0]
return (name or "").strip(), (addr or value).strip()
def _eml_to_dict(path: Path, uid: int) -> dict | None:
"""Parse one .eml into the dict shape the rest of this server uses.
Returns None when the file isn't parseable; callers skip + continue
so one corrupt EML does not kill the whole INBOX listing.
"""
try:
raw = path.read_bytes()
msg = email.message_from_bytes(raw, policy=email.policy.compat32)
except Exception: # noqa: BLE001
return None
from_name, from_addr = _split_addr(msg.get("From", ""))
_to_name, to_addr = _split_addr(msg.get("To", ""))
subject = (msg.get("Subject") or "").strip()
date = msg.get("Date") or ""
return {
"uid": uid,
"flags": [], # never \Seen for spool emails — fresh delivery
"from_name": from_name or from_addr.split("@", 1)[0] if from_addr else "Unknown",
"from_addr": from_addr or "unknown@localhost",
"to_addr": to_addr or "unknown@localhost",
"subject": subject or "(no subject)",
"date": date,
# The body field carries the full RFC 822 message — headers + body.
# That mirrors how the hardcoded _BAIT_EMAILS entries are shaped.
"body": raw.decode("utf-8", errors="replace"),
}
def _scan_seed_dir(path: Path) -> list[dict]:
"""Walk *path* recursively, parse every ``*.eml``, sort by mtime."""
eml_paths: list[Path] = []
try:
for p in path.rglob("*.eml"):
if p.is_file():
eml_paths.append(p)
except OSError:
return []
eml_paths.sort(key=lambda p: p.stat().st_mtime)
out: list[dict] = []
for i, p in enumerate(eml_paths, start=1):
d = _eml_to_dict(p, uid=i)
if d is not None:
out.append(d)
return out
def _get_emails() -> list[dict]:
"""Return the active mailbox list.
Resolution order:
1. ``IMAP_EMAIL_SEED`` set + dir exists + at least one parseable EML
→ that list (rescan-throttled).
2. Else → the hardcoded ``_BAIT_EMAILS`` fallback.
"""
global _seed_cache, _seed_cache_dir_mtime, _seed_cache_loaded_at
if not _EMAIL_SEED_PATH:
return _BAIT_EMAILS
seed_dir = Path(_EMAIL_SEED_PATH)
try:
dir_stat = seed_dir.stat()
except OSError:
return _BAIT_EMAILS
now = time.monotonic()
fresh_enough = (
_seed_cache is not None
and (now - _seed_cache_loaded_at) < _SEED_RESCAN_INTERVAL
and dir_stat.st_mtime == _seed_cache_dir_mtime
)
if fresh_enough:
return _seed_cache or _BAIT_EMAILS
scanned = _scan_seed_dir(seed_dir)
if not scanned:
# Don't poison the cache with an empty list; a single early
# FETCH before emailgen has run would otherwise stick the
# mailbox at 0 for _SEED_RESCAN_INTERVAL seconds.
return _BAIT_EMAILS
_seed_cache = scanned
_seed_cache_dir_mtime = dir_stat.st_mtime
_seed_cache_loaded_at = now
return scanned
# ── Logging ─────────────────────────────────────────────────────────────────── # ── Logging ───────────────────────────────────────────────────────────────────
def _log(event_type: str, severity: int = 6, **kwargs) -> None: def _log(event_type: str, severity: int = 6, **kwargs) -> None:
@@ -464,7 +592,8 @@ class IMAPProtocol(asyncio.Protocol):
mailbox = parts[0].strip('"') if parts else "INBOX" mailbox = parts[0].strip('"') if parts else "INBOX"
attr_str = parts[1].strip("()").upper() if len(parts) > 1 else "MESSAGES" attr_str = parts[1].strip("()").upper() if len(parts) > 1 else "MESSAGES"
counts = {"MESSAGES": 10, "RECENT": 0, "UNSEEN": 10} if mailbox == "INBOX" \ n = len(_get_emails()) if mailbox == "INBOX" else 0
counts = {"MESSAGES": n, "RECENT": 0, "UNSEEN": n} if mailbox == "INBOX" \
else {"MESSAGES": 0, "RECENT": 0, "UNSEEN": 0} else {"MESSAGES": 0, "RECENT": 0, "UNSEEN": 0}
result_parts = [] result_parts = []
@@ -479,7 +608,8 @@ class IMAPProtocol(asyncio.Protocol):
self._w(f"{tag} BAD Not authenticated\r\n") self._w(f"{tag} BAD Not authenticated\r\n")
return return
mailbox = args.strip('"') mailbox = args.strip('"')
total = len(_BAIT_EMAILS) if mailbox == "INBOX" else 0 emails = _get_emails()
total = len(emails) if mailbox == "INBOX" else 0
self._selected = mailbox self._selected = mailbox
self._state = "SELECTED" self._state = "SELECTED"
self._w(f"* {total} EXISTS\r\n") self._w(f"* {total} EXISTS\r\n")
@@ -500,7 +630,8 @@ class IMAPProtocol(asyncio.Protocol):
range_str = parts[0] if parts else "1:*" range_str = parts[0] if parts else "1:*"
items_str = parts[1] if len(parts) > 1 else "FLAGS" items_str = parts[1] if len(parts) > 1 else "FLAGS"
total = len(_BAIT_EMAILS) emails = _get_emails()
total = len(emails)
indices = _parse_seq_range(range_str, total) indices = _parse_seq_range(range_str, total)
items = _parse_fetch_items(items_str) items = _parse_fetch_items(items_str)
# Ensure UID is included when using UID FETCH # Ensure UID is included when using UID FETCH
@@ -509,14 +640,14 @@ class IMAPProtocol(asyncio.Protocol):
for seq in indices: for seq in indices:
if 1 <= seq <= total: if 1 <= seq <= total:
self._transport.write(_build_fetch_response(seq, _BAIT_EMAILS[seq - 1], items)) self._transport.write(_build_fetch_response(seq, emails[seq - 1], items))
self._w(f"{tag} OK FETCH completed\r\n") self._w(f"{tag} OK FETCH completed\r\n")
def _cmd_search(self, tag: str, uid_mode: bool = False) -> None: def _cmd_search(self, tag: str, uid_mode: bool = False) -> None:
if self._state != "SELECTED": if self._state != "SELECTED":
self._w(f"{tag} BAD Not in selected state\r\n") self._w(f"{tag} BAD Not in selected state\r\n")
return return
nums = " ".join(str(i) for i in range(1, len(_BAIT_EMAILS) + 1)) nums = " ".join(str(i) for i in range(1, len(_get_emails()) + 1))
self._w(f"* SEARCH {nums}\r\n") self._w(f"* SEARCH {nums}\r\n")
self._w(f"{tag} OK SEARCH completed\r\n") self._w(f"{tag} OK SEARCH completed\r\n")

View File

@@ -11,6 +11,8 @@ Credentials via IMAP_USERS env var (shared with IMAP service).
import asyncio import asyncio
import os import os
import time
from pathlib import Path
from syslog_bridge import ( from syslog_bridge import (
SEVERITY_WARNING, SEVERITY_WARNING,
encode_secret, encode_secret,
@@ -30,9 +32,13 @@ VALID_USERS: dict[str, str] = {
u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)] u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)]
} }
# DEBT-026: path to a JSON file with custom email definitions. # Path to a directory of ``*.eml`` files dropped by the orchestrator
# Wiring (service_cfg["email_seed"] → compose_fragment → env var → here) is deferred. # emailgen worker (``/var/spool/decnet-emails/`` by convention). When
_EMAIL_SEED_PATH = os.environ.get("POP3_EMAIL_SEED", "") # stub — currently unused # set and populated, those EMLs replace the hardcoded fallback list
# below — same semantics as the IMAP template. Empty / missing falls
# back so a fresh deployment is never silent.
_EMAIL_SEED_PATH = os.environ.get("POP3_EMAIL_SEED", "")
_SEED_RESCAN_INTERVAL = float(os.environ.get("POP3_EMAIL_SEED_RESCAN", "5"))
# ── Bait emails ─────────────────────────────────────────────────────────────── # ── Bait emails ───────────────────────────────────────────────────────────────
@@ -163,6 +169,64 @@ _BAIT_EMAILS: list[str] = [
), ),
] ]
# ── Spool-backed email loader ─────────────────────────────────────────────────
# POP3 stores each message as a single str (full RFC 822 text); when the
# emailgen spool is configured, we read every *.eml in it and serve the
# raw bytes as the corpus. Same caching strategy as the IMAP template.
_seed_cache: list[str] | None = None
_seed_cache_dir_mtime: float = 0.0
_seed_cache_loaded_at: float = 0.0
def _scan_seed_dir(path: Path) -> list[str]:
"""Walk *path* recursively and return each .eml's raw text content,
sorted by mtime so older threads get lower indices."""
eml_paths: list[Path] = []
try:
for p in path.rglob("*.eml"):
if p.is_file():
eml_paths.append(p)
except OSError:
return []
eml_paths.sort(key=lambda p: p.stat().st_mtime)
out: list[str] = []
for p in eml_paths:
try:
out.append(p.read_text(encoding="utf-8", errors="replace"))
except OSError:
continue
return out
def _get_emails() -> list[str]:
"""Return the active corpus. Same fallback rules as IMAP template."""
global _seed_cache, _seed_cache_dir_mtime, _seed_cache_loaded_at
if not _EMAIL_SEED_PATH:
return _BAIT_EMAILS
seed_dir = Path(_EMAIL_SEED_PATH)
try:
dir_stat = seed_dir.stat()
except OSError:
return _BAIT_EMAILS
now = time.monotonic()
fresh_enough = (
_seed_cache is not None
and (now - _seed_cache_loaded_at) < _SEED_RESCAN_INTERVAL
and dir_stat.st_mtime == _seed_cache_dir_mtime
)
if fresh_enough:
return _seed_cache or _BAIT_EMAILS
scanned = _scan_seed_dir(seed_dir)
if not scanned:
return _BAIT_EMAILS
_seed_cache = scanned
_seed_cache_dir_mtime = dir_stat.st_mtime
_seed_cache_loaded_at = now
return scanned
# ── Logging ─────────────────────────────────────────────────────────────────── # ── Logging ───────────────────────────────────────────────────────────────────
def _log(event_type: str, severity: int = 6, **kwargs) -> None: def _log(event_type: str, severity: int = 6, **kwargs) -> None:
@@ -287,7 +351,7 @@ class POP3Protocol(asyncio.Protocol):
"""Return [(1-based-num, body), ...] excluding DELE'd messages.""" """Return [(1-based-num, body), ...] excluding DELE'd messages."""
return [ return [
(i + 1, body) (i + 1, body)
for i, body in enumerate(_BAIT_EMAILS) for i, body in enumerate(_get_emails())
if i not in self._deleted if i not in self._deleted
] ]
@@ -301,14 +365,15 @@ class POP3Protocol(asyncio.Protocol):
def _cmd_list(self, args: str) -> None: def _cmd_list(self, args: str) -> None:
if not self._require_transaction(): if not self._require_transaction():
return return
emails = _get_emails()
if args: if args:
try: try:
n = int(args) n = int(args)
idx = n - 1 idx = n - 1
if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): if idx in self._deleted or not (0 <= idx < len(emails)):
self._transport.write(b"-ERR No such message\r\n") self._transport.write(b"-ERR No such message\r\n")
else: else:
size = len(_BAIT_EMAILS[idx].encode()) size = len(emails[idx].encode())
self._transport.write(f"+OK {n} {size}\r\n".encode()) self._transport.write(f"+OK {n} {size}\r\n".encode())
except ValueError: except ValueError:
self._transport.write(b"-ERR Invalid argument\r\n") self._transport.write(b"-ERR Invalid argument\r\n")
@@ -326,10 +391,11 @@ class POP3Protocol(asyncio.Protocol):
try: try:
n = int(args) n = int(args)
idx = n - 1 idx = n - 1
if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): emails = _get_emails()
if idx in self._deleted or not (0 <= idx < len(emails)):
self._transport.write(b"-ERR No such message\r\n") self._transport.write(b"-ERR No such message\r\n")
return return
body = _BAIT_EMAILS[idx] body = emails[idx]
raw = body.encode() raw = body.encode()
_log("retr", src=self._peer[0], message_num=n) _log("retr", src=self._peer[0], message_num=n)
self._transport.write(f"+OK {len(raw)} octets\r\n".encode()) self._transport.write(f"+OK {len(raw)} octets\r\n".encode())
@@ -348,10 +414,11 @@ class POP3Protocol(asyncio.Protocol):
n = int(parts[0]) n = int(parts[0])
line_count = int(parts[1]) if len(parts) > 1 else 0 line_count = int(parts[1]) if len(parts) > 1 else 0
idx = n - 1 idx = n - 1
if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): emails = _get_emails()
if idx in self._deleted or not (0 <= idx < len(emails)):
self._transport.write(b"-ERR No such message\r\n") self._transport.write(b"-ERR No such message\r\n")
return return
body = _BAIT_EMAILS[idx] body = emails[idx]
sep = "\r\n\r\n" sep = "\r\n\r\n"
if sep in body: if sep in body:
headers, rest = body.split(sep, 1) headers, rest = body.split(sep, 1)
@@ -375,7 +442,7 @@ class POP3Protocol(asyncio.Protocol):
try: try:
n = int(args) n = int(args)
idx = n - 1 idx = n - 1
if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): if idx in self._deleted or not (0 <= idx < len(_get_emails())):
self._transport.write(b"-ERR No such message\r\n") self._transport.write(b"-ERR No such message\r\n")
else: else:
self._transport.write(f"+OK {n} msg-{n}\r\n".encode()) self._transport.write(f"+OK {n} msg-{n}\r\n".encode())
@@ -393,7 +460,7 @@ class POP3Protocol(asyncio.Protocol):
try: try:
n = int(args) n = int(args)
idx = n - 1 idx = n - 1
if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): if idx in self._deleted or not (0 <= idx < len(_get_emails())):
self._transport.write(b"-ERR No such message\r\n") self._transport.write(b"-ERR No such message\r\n")
else: else:
self._deleted.add(idx) self._deleted.add(idx)

View File

@@ -0,0 +1,148 @@
"""Spool-backed email loading for the IMAP template.
Verifies that when ``IMAP_EMAIL_SEED`` points at a directory of .eml
files, the IMAP server serves those (replacing the hardcoded
``_BAIT_EMAILS`` fallback). Empty / missing dir falls back gracefully.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
_EML_TEMPLATE = (
"From: {from_name} <{from_addr}>\r\n"
"To: Sarah <sarah@corp.com>\r\n"
"Subject: {subject}\r\n"
"Message-ID: <{mid}@corp.com>\r\n"
"Date: Mon, 26 Apr 2026 10:00:00 +0000\r\n"
"\r\n"
"{body}\r\n"
)
def _make_fake_syslog_bridge() -> ModuleType:
mod = ModuleType("syslog_bridge")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
mod.encode_secret = MagicMock(return_value={"secret_printable": "", "secret_b64": ""})
mod.classify_authorization = MagicMock(return_value=None)
return mod
def _load_imap(env_overrides: dict[str, str]):
env = {
"NODE_NAME": "testhost",
"IMAP_USERS": "admin:admin123",
"IMAP_BANNER": "* OK Dovecot ready.",
**env_overrides,
}
for key in list(sys.modules):
if key in ("imap_server", "syslog_bridge"):
del sys.modules[key]
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
spec = importlib.util.spec_from_file_location(
"imap_server", "decnet/templates/imap/server.py"
)
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _seed(tmp_path: Path, n: int = 3) -> Path:
spool = tmp_path / "spool"
spool.mkdir()
thread = spool / "thr1"
thread.mkdir()
for i in range(n):
eml = thread / f"msg{i}.eml"
eml.write_text(_EML_TEMPLATE.format(
from_name=f"Sender {i}",
from_addr=f"sender{i}@corp.com",
subject=f"Topic {i}",
mid=f"m{i}",
body=f"Body of message {i}.",
))
return spool
def test_falls_back_to_hardcoded_when_seed_unset(tmp_path):
mod = _load_imap({})
emails = mod._get_emails()
# The shipped fallback ships exactly 10 entries.
assert len(emails) == 10
assert emails[0]["from_addr"] == "devops@company.internal"
def test_falls_back_when_seed_dir_missing(tmp_path):
mod = _load_imap({"IMAP_EMAIL_SEED": str(tmp_path / "does-not-exist")})
emails = mod._get_emails()
assert len(emails) == 10 # fallback
def test_falls_back_when_seed_dir_empty(tmp_path):
(tmp_path / "spool").mkdir()
mod = _load_imap({"IMAP_EMAIL_SEED": str(tmp_path / "spool")})
assert len(mod._get_emails()) == 10 # fallback (no .eml files)
def test_loads_eml_files_from_spool(tmp_path):
spool = _seed(tmp_path, n=3)
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert len(emails) == 3
senders = {e["from_addr"] for e in emails}
assert senders == {"sender0@corp.com", "sender1@corp.com", "sender2@corp.com"}
# UIDs are 1-based and unique.
assert {e["uid"] for e in emails} == {1, 2, 3}
def test_loaded_eml_carries_full_rfc822_body(tmp_path):
spool = _seed(tmp_path, n=1)
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert "From:" in emails[0]["body"]
assert "Subject: Topic 0" in emails[0]["body"]
assert "Body of message 0." in emails[0]["body"]
def test_corrupt_eml_skipped_not_fatal(tmp_path):
spool = tmp_path / "spool"
spool.mkdir()
(spool / "good.eml").write_text(_EML_TEMPLATE.format(
from_name="Good", from_addr="good@corp.com",
subject="ok", mid="g", body="ok",
))
# Make a directory with a .eml extension to provoke an OSError on
# read_bytes — the loader should skip it without crashing.
(spool / "broken.eml").mkdir()
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert len(emails) == 1
assert emails[0]["from_addr"] == "good@corp.com"
def test_select_inbox_reflects_spool_count(tmp_path):
spool = _seed(tmp_path, n=4)
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
proto = mod.IMAPProtocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
written.clear()
proto.data_received(b"A0 LOGIN admin admin123\r\n")
written.clear()
proto.data_received(b"B0 SELECT INBOX\r\n")
out = b"".join(written)
assert b"* 4 EXISTS" in out
assert b"[UIDNEXT 5]" in out

View File

@@ -0,0 +1,96 @@
"""Spool-backed email loading for the POP3 template."""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
from types import ModuleType
from unittest.mock import MagicMock, patch
_EML_TEMPLATE = (
"From: Sender <sender@corp.com>\r\n"
"To: Sarah <sarah@corp.com>\r\n"
"Subject: {subject}\r\n"
"Message-ID: <{mid}@corp.com>\r\n"
"\r\n"
"{body}\r\n"
)
def _make_fake_syslog_bridge() -> ModuleType:
mod = ModuleType("syslog_bridge")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
mod.encode_secret = MagicMock(return_value={"secret_printable": "", "secret_b64": ""})
mod.classify_authorization = MagicMock(return_value=None)
return mod
def _load_pop3(env_overrides):
env = {
"NODE_NAME": "testhost",
"IMAP_USERS": "admin:admin123",
**env_overrides,
}
for key in list(sys.modules):
if key in ("pop3_server", "syslog_bridge"):
del sys.modules[key]
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
spec = importlib.util.spec_from_file_location(
"pop3_server", "decnet/templates/pop3/server.py"
)
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _seed(tmp_path: Path, n: int) -> Path:
spool = tmp_path / "spool"
spool.mkdir()
for i in range(n):
(spool / f"m{i}.eml").write_text(_EML_TEMPLATE.format(
subject=f"Topic {i}", mid=f"m{i}", body=f"Body {i}",
))
return spool
def test_falls_back_when_seed_unset(tmp_path):
mod = _load_pop3({})
assert len(mod._get_emails()) == 10 # hardcoded fallback
def test_falls_back_when_seed_dir_missing(tmp_path):
mod = _load_pop3({"POP3_EMAIL_SEED": str(tmp_path / "nope")})
assert len(mod._get_emails()) == 10
def test_loads_emls_from_spool(tmp_path):
spool = _seed(tmp_path, n=3)
mod = _load_pop3({"POP3_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert len(emails) == 3
# POP3 stores raw RFC 822 strings; verify content round-trips.
assert any("Topic 0" in e for e in emails)
assert all(e.startswith("From:") for e in emails)
def test_stat_reflects_spool_size(tmp_path):
spool = _seed(tmp_path, n=2)
mod = _load_pop3({"POP3_EMAIL_SEED": str(spool)})
proto = mod.POP3Protocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
written.clear()
proto.data_received(b"USER admin\r\n")
proto.data_received(b"PASS admin123\r\n")
written.clear()
proto.data_received(b"STAT\r\n")
out = b"".join(written)
assert out.startswith(b"+OK 2 ")