From 297999744264c3c3bb3f2a8dcb4832fc0d6827a7 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 22:21:01 -0400 Subject: [PATCH] feat(templates): IMAP/POP3 servers read EML spool from emailgen When IMAP_EMAIL_SEED / POP3_EMAIL_SEED points at a directory of .eml files (the orchestrator emailgen worker's drop path, /var/spool/decnet-emails/ by convention), the bait mailbox is replaced with those LLM-generated, persona-driven, threaded messages. Empty / missing dir keeps the hardcoded fallback so a fresh deployment is never silent. Cached with mtime invalidation + a short TTL so a hot mailbox doesn't pay the parse cost on every IMAP/POP3 command. Replaces the DEBT-026 stub on both templates that named the env var but never wired it through. --- decnet/templates/imap/server.py | 149 +++++++++++++++++++++-- decnet/templates/pop3/server.py | 91 ++++++++++++-- tests/service_testing/test_imap_spool.py | 148 ++++++++++++++++++++++ tests/service_testing/test_pop3_spool.py | 96 +++++++++++++++ 4 files changed, 463 insertions(+), 21 deletions(-) create mode 100644 tests/service_testing/test_imap_spool.py create mode 100644 tests/service_testing/test_pop3_spool.py diff --git a/decnet/templates/imap/server.py b/decnet/templates/imap/server.py index 6c5433a4..02e19164 100644 --- a/decnet/templates/imap/server.py +++ b/decnet/templates/imap/server.py @@ -11,7 +11,12 @@ Banner advertises Dovecot so nmap fingerprints correctly. """ import asyncio +import email +import email.policy import os +import time +from email.utils import getaddresses +from pathlib import Path from syslog_bridge import ( SEVERITY_WARNING, encode_secret, @@ -31,10 +36,20 @@ VALID_USERS: dict[str, str] = { u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)] } -# DEBT-026: path to a JSON file with custom email definitions. -# When set, _BAIT_EMAILS should be replaced/extended from that file. -# Wiring (service_cfg["email_seed"] → compose_fragment → env var → here) is deferred. -_EMAIL_SEED_PATH = os.environ.get("IMAP_EMAIL_SEED", "") # stub — currently unused +# Path to a directory of ``*.eml`` files that the orchestrator emailgen +# worker drops into the container (``/var/spool/decnet-emails/`` by +# convention). When set AND the directory contains parseable EMLs, +# they replace the hardcoded ``_BAIT_EMAILS`` fallback below — meaning +# every mail an attacker reads is the LLM-generated, persona-driven, +# language-aware version, not the static credential-stuffed bait list. +# Empty / missing / unparseable: the fallback list still serves so a +# fresh deployment is never silent. +_EMAIL_SEED_PATH = os.environ.get("IMAP_EMAIL_SEED", "") +# Re-scan the seed directory at most this often. Cheap: walking a few +# dozen .eml files is sub-millisecond, but caching keeps an attacker's +# rapid LIST/FETCH burst from re-parsing the same files on every +# command. Mtime check still triggers a re-load on real changes. +_SEED_RESCAN_INTERVAL = float(os.environ.get("IMAP_EMAIL_SEED_RESCAN", "5")) # ── Bait emails ─────────────────────────────────────────────────────────────── # All 10 live in INBOX. UID == sequence number. @@ -238,6 +253,119 @@ _BAIT_EMAILS: list[dict] = [ _MAILBOXES = ["INBOX", "Sent", "Drafts", "Archive"] + +# ── Spool-backed email loader ───────────────────────────────────────────────── +# When IMAP_EMAIL_SEED points at a directory of .eml files the +# orchestrator emailgen worker has dropped into the container, parse +# them on demand and serve them as the INBOX. Cached between requests +# with a short TTL + mtime check so a hot mailbox doesn't pay the parse +# cost on every IMAP command. +# +# Failure modes (missing dir, unparseable EMLs, empty dir) all return +# the hardcoded fallback rather than 0 messages — a silent INBOX is a +# stronger tell than a slightly-stale one. + +_seed_cache: list[dict] | None = None +_seed_cache_dir_mtime: float = 0.0 +_seed_cache_loaded_at: float = 0.0 + + +def _split_addr(value: str) -> tuple[str, str]: + """Return (display_name, email) from a header value, falling back to + the raw string when the parse fails. Worker side; we don't need + real RFC 5322 — just enough to populate the IMAP envelope tuple.""" + if not value: + return "", "" + pairs = getaddresses([value]) + if not pairs: + return "", value + name, addr = pairs[0] + return (name or "").strip(), (addr or value).strip() + + +def _eml_to_dict(path: Path, uid: int) -> dict | None: + """Parse one .eml into the dict shape the rest of this server uses. + + Returns None when the file isn't parseable; callers skip + continue + so one corrupt EML does not kill the whole INBOX listing. + """ + try: + raw = path.read_bytes() + msg = email.message_from_bytes(raw, policy=email.policy.compat32) + except Exception: # noqa: BLE001 + return None + from_name, from_addr = _split_addr(msg.get("From", "")) + _to_name, to_addr = _split_addr(msg.get("To", "")) + subject = (msg.get("Subject") or "").strip() + date = msg.get("Date") or "" + return { + "uid": uid, + "flags": [], # never \Seen for spool emails — fresh delivery + "from_name": from_name or from_addr.split("@", 1)[0] if from_addr else "Unknown", + "from_addr": from_addr or "unknown@localhost", + "to_addr": to_addr or "unknown@localhost", + "subject": subject or "(no subject)", + "date": date, + # The body field carries the full RFC 822 message — headers + body. + # That mirrors how the hardcoded _BAIT_EMAILS entries are shaped. + "body": raw.decode("utf-8", errors="replace"), + } + + +def _scan_seed_dir(path: Path) -> list[dict]: + """Walk *path* recursively, parse every ``*.eml``, sort by mtime.""" + eml_paths: list[Path] = [] + try: + for p in path.rglob("*.eml"): + if p.is_file(): + eml_paths.append(p) + except OSError: + return [] + eml_paths.sort(key=lambda p: p.stat().st_mtime) + out: list[dict] = [] + for i, p in enumerate(eml_paths, start=1): + d = _eml_to_dict(p, uid=i) + if d is not None: + out.append(d) + return out + + +def _get_emails() -> list[dict]: + """Return the active mailbox list. + + Resolution order: + 1. ``IMAP_EMAIL_SEED`` set + dir exists + at least one parseable EML + → that list (rescan-throttled). + 2. Else → the hardcoded ``_BAIT_EMAILS`` fallback. + """ + global _seed_cache, _seed_cache_dir_mtime, _seed_cache_loaded_at + if not _EMAIL_SEED_PATH: + return _BAIT_EMAILS + seed_dir = Path(_EMAIL_SEED_PATH) + try: + dir_stat = seed_dir.stat() + except OSError: + return _BAIT_EMAILS + now = time.monotonic() + fresh_enough = ( + _seed_cache is not None + and (now - _seed_cache_loaded_at) < _SEED_RESCAN_INTERVAL + and dir_stat.st_mtime == _seed_cache_dir_mtime + ) + if fresh_enough: + return _seed_cache or _BAIT_EMAILS + scanned = _scan_seed_dir(seed_dir) + if not scanned: + # Don't poison the cache with an empty list; a single early + # FETCH before emailgen has run would otherwise stick the + # mailbox at 0 for _SEED_RESCAN_INTERVAL seconds. + return _BAIT_EMAILS + _seed_cache = scanned + _seed_cache_dir_mtime = dir_stat.st_mtime + _seed_cache_loaded_at = now + return scanned + + # ── Logging ─────────────────────────────────────────────────────────────────── def _log(event_type: str, severity: int = 6, **kwargs) -> None: @@ -464,7 +592,8 @@ class IMAPProtocol(asyncio.Protocol): mailbox = parts[0].strip('"') if parts else "INBOX" attr_str = parts[1].strip("()").upper() if len(parts) > 1 else "MESSAGES" - counts = {"MESSAGES": 10, "RECENT": 0, "UNSEEN": 10} if mailbox == "INBOX" \ + n = len(_get_emails()) if mailbox == "INBOX" else 0 + counts = {"MESSAGES": n, "RECENT": 0, "UNSEEN": n} if mailbox == "INBOX" \ else {"MESSAGES": 0, "RECENT": 0, "UNSEEN": 0} result_parts = [] @@ -479,7 +608,8 @@ class IMAPProtocol(asyncio.Protocol): self._w(f"{tag} BAD Not authenticated\r\n") return mailbox = args.strip('"') - total = len(_BAIT_EMAILS) if mailbox == "INBOX" else 0 + emails = _get_emails() + total = len(emails) if mailbox == "INBOX" else 0 self._selected = mailbox self._state = "SELECTED" self._w(f"* {total} EXISTS\r\n") @@ -500,7 +630,8 @@ class IMAPProtocol(asyncio.Protocol): range_str = parts[0] if parts else "1:*" items_str = parts[1] if len(parts) > 1 else "FLAGS" - total = len(_BAIT_EMAILS) + emails = _get_emails() + total = len(emails) indices = _parse_seq_range(range_str, total) items = _parse_fetch_items(items_str) # Ensure UID is included when using UID FETCH @@ -509,14 +640,14 @@ class IMAPProtocol(asyncio.Protocol): for seq in indices: if 1 <= seq <= total: - self._transport.write(_build_fetch_response(seq, _BAIT_EMAILS[seq - 1], items)) + self._transport.write(_build_fetch_response(seq, emails[seq - 1], items)) self._w(f"{tag} OK FETCH completed\r\n") def _cmd_search(self, tag: str, uid_mode: bool = False) -> None: if self._state != "SELECTED": self._w(f"{tag} BAD Not in selected state\r\n") return - nums = " ".join(str(i) for i in range(1, len(_BAIT_EMAILS) + 1)) + nums = " ".join(str(i) for i in range(1, len(_get_emails()) + 1)) self._w(f"* SEARCH {nums}\r\n") self._w(f"{tag} OK SEARCH completed\r\n") diff --git a/decnet/templates/pop3/server.py b/decnet/templates/pop3/server.py index 2363ce8a..b7c82a22 100644 --- a/decnet/templates/pop3/server.py +++ b/decnet/templates/pop3/server.py @@ -11,6 +11,8 @@ Credentials via IMAP_USERS env var (shared with IMAP service). import asyncio import os +import time +from pathlib import Path from syslog_bridge import ( SEVERITY_WARNING, encode_secret, @@ -30,9 +32,13 @@ VALID_USERS: dict[str, str] = { u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)] } -# DEBT-026: path to a JSON file with custom email definitions. -# Wiring (service_cfg["email_seed"] → compose_fragment → env var → here) is deferred. -_EMAIL_SEED_PATH = os.environ.get("POP3_EMAIL_SEED", "") # stub — currently unused +# Path to a directory of ``*.eml`` files dropped by the orchestrator +# emailgen worker (``/var/spool/decnet-emails/`` by convention). When +# set and populated, those EMLs replace the hardcoded fallback list +# below — same semantics as the IMAP template. Empty / missing falls +# back so a fresh deployment is never silent. +_EMAIL_SEED_PATH = os.environ.get("POP3_EMAIL_SEED", "") +_SEED_RESCAN_INTERVAL = float(os.environ.get("POP3_EMAIL_SEED_RESCAN", "5")) # ── Bait emails ─────────────────────────────────────────────────────────────── @@ -163,6 +169,64 @@ _BAIT_EMAILS: list[str] = [ ), ] + +# ── Spool-backed email loader ───────────────────────────────────────────────── +# POP3 stores each message as a single str (full RFC 822 text); when the +# emailgen spool is configured, we read every *.eml in it and serve the +# raw bytes as the corpus. Same caching strategy as the IMAP template. + +_seed_cache: list[str] | None = None +_seed_cache_dir_mtime: float = 0.0 +_seed_cache_loaded_at: float = 0.0 + + +def _scan_seed_dir(path: Path) -> list[str]: + """Walk *path* recursively and return each .eml's raw text content, + sorted by mtime so older threads get lower indices.""" + eml_paths: list[Path] = [] + try: + for p in path.rglob("*.eml"): + if p.is_file(): + eml_paths.append(p) + except OSError: + return [] + eml_paths.sort(key=lambda p: p.stat().st_mtime) + out: list[str] = [] + for p in eml_paths: + try: + out.append(p.read_text(encoding="utf-8", errors="replace")) + except OSError: + continue + return out + + +def _get_emails() -> list[str]: + """Return the active corpus. Same fallback rules as IMAP template.""" + global _seed_cache, _seed_cache_dir_mtime, _seed_cache_loaded_at + if not _EMAIL_SEED_PATH: + return _BAIT_EMAILS + seed_dir = Path(_EMAIL_SEED_PATH) + try: + dir_stat = seed_dir.stat() + except OSError: + return _BAIT_EMAILS + now = time.monotonic() + fresh_enough = ( + _seed_cache is not None + and (now - _seed_cache_loaded_at) < _SEED_RESCAN_INTERVAL + and dir_stat.st_mtime == _seed_cache_dir_mtime + ) + if fresh_enough: + return _seed_cache or _BAIT_EMAILS + scanned = _scan_seed_dir(seed_dir) + if not scanned: + return _BAIT_EMAILS + _seed_cache = scanned + _seed_cache_dir_mtime = dir_stat.st_mtime + _seed_cache_loaded_at = now + return scanned + + # ── Logging ─────────────────────────────────────────────────────────────────── def _log(event_type: str, severity: int = 6, **kwargs) -> None: @@ -287,7 +351,7 @@ class POP3Protocol(asyncio.Protocol): """Return [(1-based-num, body), ...] excluding DELE'd messages.""" return [ (i + 1, body) - for i, body in enumerate(_BAIT_EMAILS) + for i, body in enumerate(_get_emails()) if i not in self._deleted ] @@ -301,14 +365,15 @@ class POP3Protocol(asyncio.Protocol): def _cmd_list(self, args: str) -> None: if not self._require_transaction(): return + emails = _get_emails() if args: try: n = int(args) idx = n - 1 - if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): + if idx in self._deleted or not (0 <= idx < len(emails)): self._transport.write(b"-ERR No such message\r\n") else: - size = len(_BAIT_EMAILS[idx].encode()) + size = len(emails[idx].encode()) self._transport.write(f"+OK {n} {size}\r\n".encode()) except ValueError: self._transport.write(b"-ERR Invalid argument\r\n") @@ -326,10 +391,11 @@ class POP3Protocol(asyncio.Protocol): try: n = int(args) idx = n - 1 - if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): + emails = _get_emails() + if idx in self._deleted or not (0 <= idx < len(emails)): self._transport.write(b"-ERR No such message\r\n") return - body = _BAIT_EMAILS[idx] + body = emails[idx] raw = body.encode() _log("retr", src=self._peer[0], message_num=n) self._transport.write(f"+OK {len(raw)} octets\r\n".encode()) @@ -348,10 +414,11 @@ class POP3Protocol(asyncio.Protocol): n = int(parts[0]) line_count = int(parts[1]) if len(parts) > 1 else 0 idx = n - 1 - if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): + emails = _get_emails() + if idx in self._deleted or not (0 <= idx < len(emails)): self._transport.write(b"-ERR No such message\r\n") return - body = _BAIT_EMAILS[idx] + body = emails[idx] sep = "\r\n\r\n" if sep in body: headers, rest = body.split(sep, 1) @@ -375,7 +442,7 @@ class POP3Protocol(asyncio.Protocol): try: n = int(args) idx = n - 1 - if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): + if idx in self._deleted or not (0 <= idx < len(_get_emails())): self._transport.write(b"-ERR No such message\r\n") else: self._transport.write(f"+OK {n} msg-{n}\r\n".encode()) @@ -393,7 +460,7 @@ class POP3Protocol(asyncio.Protocol): try: n = int(args) idx = n - 1 - if idx in self._deleted or not (0 <= idx < len(_BAIT_EMAILS)): + if idx in self._deleted or not (0 <= idx < len(_get_emails())): self._transport.write(b"-ERR No such message\r\n") else: self._deleted.add(idx) diff --git a/tests/service_testing/test_imap_spool.py b/tests/service_testing/test_imap_spool.py new file mode 100644 index 00000000..c5475206 --- /dev/null +++ b/tests/service_testing/test_imap_spool.py @@ -0,0 +1,148 @@ +"""Spool-backed email loading for the IMAP template. + +Verifies that when ``IMAP_EMAIL_SEED`` points at a directory of .eml +files, the IMAP server serves those (replacing the hardcoded +``_BAIT_EMAILS`` fallback). Empty / missing dir falls back gracefully. +""" +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path +from types import ModuleType +from unittest.mock import MagicMock, patch + +import pytest + + +_EML_TEMPLATE = ( + "From: {from_name} <{from_addr}>\r\n" + "To: Sarah \r\n" + "Subject: {subject}\r\n" + "Message-ID: <{mid}@corp.com>\r\n" + "Date: Mon, 26 Apr 2026 10:00:00 +0000\r\n" + "\r\n" + "{body}\r\n" +) + + +def _make_fake_syslog_bridge() -> ModuleType: + mod = ModuleType("syslog_bridge") + mod.syslog_line = MagicMock(return_value="") + mod.write_syslog_file = MagicMock() + mod.forward_syslog = MagicMock() + mod.SEVERITY_WARNING = 4 + mod.SEVERITY_INFO = 6 + mod.encode_secret = MagicMock(return_value={"secret_printable": "", "secret_b64": ""}) + mod.classify_authorization = MagicMock(return_value=None) + return mod + + +def _load_imap(env_overrides: dict[str, str]): + env = { + "NODE_NAME": "testhost", + "IMAP_USERS": "admin:admin123", + "IMAP_BANNER": "* OK Dovecot ready.", + **env_overrides, + } + for key in list(sys.modules): + if key in ("imap_server", "syslog_bridge"): + del sys.modules[key] + sys.modules["syslog_bridge"] = _make_fake_syslog_bridge() + spec = importlib.util.spec_from_file_location( + "imap_server", "decnet/templates/imap/server.py" + ) + mod = importlib.util.module_from_spec(spec) + with patch.dict("os.environ", env, clear=False): + spec.loader.exec_module(mod) + return mod + + +def _seed(tmp_path: Path, n: int = 3) -> Path: + spool = tmp_path / "spool" + spool.mkdir() + thread = spool / "thr1" + thread.mkdir() + for i in range(n): + eml = thread / f"msg{i}.eml" + eml.write_text(_EML_TEMPLATE.format( + from_name=f"Sender {i}", + from_addr=f"sender{i}@corp.com", + subject=f"Topic {i}", + mid=f"m{i}", + body=f"Body of message {i}.", + )) + return spool + + +def test_falls_back_to_hardcoded_when_seed_unset(tmp_path): + mod = _load_imap({}) + emails = mod._get_emails() + # The shipped fallback ships exactly 10 entries. + assert len(emails) == 10 + assert emails[0]["from_addr"] == "devops@company.internal" + + +def test_falls_back_when_seed_dir_missing(tmp_path): + mod = _load_imap({"IMAP_EMAIL_SEED": str(tmp_path / "does-not-exist")}) + emails = mod._get_emails() + assert len(emails) == 10 # fallback + + +def test_falls_back_when_seed_dir_empty(tmp_path): + (tmp_path / "spool").mkdir() + mod = _load_imap({"IMAP_EMAIL_SEED": str(tmp_path / "spool")}) + assert len(mod._get_emails()) == 10 # fallback (no .eml files) + + +def test_loads_eml_files_from_spool(tmp_path): + spool = _seed(tmp_path, n=3) + mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)}) + emails = mod._get_emails() + assert len(emails) == 3 + senders = {e["from_addr"] for e in emails} + assert senders == {"sender0@corp.com", "sender1@corp.com", "sender2@corp.com"} + # UIDs are 1-based and unique. + assert {e["uid"] for e in emails} == {1, 2, 3} + + +def test_loaded_eml_carries_full_rfc822_body(tmp_path): + spool = _seed(tmp_path, n=1) + mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)}) + emails = mod._get_emails() + assert "From:" in emails[0]["body"] + assert "Subject: Topic 0" in emails[0]["body"] + assert "Body of message 0." in emails[0]["body"] + + +def test_corrupt_eml_skipped_not_fatal(tmp_path): + spool = tmp_path / "spool" + spool.mkdir() + (spool / "good.eml").write_text(_EML_TEMPLATE.format( + from_name="Good", from_addr="good@corp.com", + subject="ok", mid="g", body="ok", + )) + # Make a directory with a .eml extension to provoke an OSError on + # read_bytes — the loader should skip it without crashing. + (spool / "broken.eml").mkdir() + mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)}) + emails = mod._get_emails() + assert len(emails) == 1 + assert emails[0]["from_addr"] == "good@corp.com" + + +def test_select_inbox_reflects_spool_count(tmp_path): + spool = _seed(tmp_path, n=4) + mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)}) + proto = mod.IMAPProtocol() + transport = MagicMock() + written: list[bytes] = [] + transport.write.side_effect = written.append + proto.connection_made(transport) + written.clear() + proto.data_received(b"A0 LOGIN admin admin123\r\n") + written.clear() + proto.data_received(b"B0 SELECT INBOX\r\n") + out = b"".join(written) + assert b"* 4 EXISTS" in out + assert b"[UIDNEXT 5]" in out diff --git a/tests/service_testing/test_pop3_spool.py b/tests/service_testing/test_pop3_spool.py new file mode 100644 index 00000000..43f3590c --- /dev/null +++ b/tests/service_testing/test_pop3_spool.py @@ -0,0 +1,96 @@ +"""Spool-backed email loading for the POP3 template.""" +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path +from types import ModuleType +from unittest.mock import MagicMock, patch + + +_EML_TEMPLATE = ( + "From: Sender \r\n" + "To: Sarah \r\n" + "Subject: {subject}\r\n" + "Message-ID: <{mid}@corp.com>\r\n" + "\r\n" + "{body}\r\n" +) + + +def _make_fake_syslog_bridge() -> ModuleType: + mod = ModuleType("syslog_bridge") + mod.syslog_line = MagicMock(return_value="") + mod.write_syslog_file = MagicMock() + mod.forward_syslog = MagicMock() + mod.SEVERITY_WARNING = 4 + mod.SEVERITY_INFO = 6 + mod.encode_secret = MagicMock(return_value={"secret_printable": "", "secret_b64": ""}) + mod.classify_authorization = MagicMock(return_value=None) + return mod + + +def _load_pop3(env_overrides): + env = { + "NODE_NAME": "testhost", + "IMAP_USERS": "admin:admin123", + **env_overrides, + } + for key in list(sys.modules): + if key in ("pop3_server", "syslog_bridge"): + del sys.modules[key] + sys.modules["syslog_bridge"] = _make_fake_syslog_bridge() + spec = importlib.util.spec_from_file_location( + "pop3_server", "decnet/templates/pop3/server.py" + ) + mod = importlib.util.module_from_spec(spec) + with patch.dict("os.environ", env, clear=False): + spec.loader.exec_module(mod) + return mod + + +def _seed(tmp_path: Path, n: int) -> Path: + spool = tmp_path / "spool" + spool.mkdir() + for i in range(n): + (spool / f"m{i}.eml").write_text(_EML_TEMPLATE.format( + subject=f"Topic {i}", mid=f"m{i}", body=f"Body {i}", + )) + return spool + + +def test_falls_back_when_seed_unset(tmp_path): + mod = _load_pop3({}) + assert len(mod._get_emails()) == 10 # hardcoded fallback + + +def test_falls_back_when_seed_dir_missing(tmp_path): + mod = _load_pop3({"POP3_EMAIL_SEED": str(tmp_path / "nope")}) + assert len(mod._get_emails()) == 10 + + +def test_loads_emls_from_spool(tmp_path): + spool = _seed(tmp_path, n=3) + mod = _load_pop3({"POP3_EMAIL_SEED": str(spool)}) + emails = mod._get_emails() + assert len(emails) == 3 + # POP3 stores raw RFC 822 strings; verify content round-trips. + assert any("Topic 0" in e for e in emails) + assert all(e.startswith("From:") for e in emails) + + +def test_stat_reflects_spool_size(tmp_path): + spool = _seed(tmp_path, n=2) + mod = _load_pop3({"POP3_EMAIL_SEED": str(spool)}) + proto = mod.POP3Protocol() + transport = MagicMock() + written: list[bytes] = [] + transport.write.side_effect = written.append + proto.connection_made(transport) + written.clear() + proto.data_received(b"USER admin\r\n") + proto.data_received(b"PASS admin123\r\n") + written.clear() + proto.data_received(b"STAT\r\n") + out = b"".join(written) + assert out.startswith(b"+OK 2 ")