feat(imap,pop3): full IMAP4rev1 + POP3 bait mailbox implementation

IMAP: extended to full IMAP4rev1 — 10 bait emails (AWS keys, DB creds,
tokens, VPN config, root pw etc.), LIST/LSUB/STATUS/FETCH/UID FETCH/
SEARCH/CLOSE/NOOP, proper SELECT untagged responses (EXISTS, UIDNEXT,
FLAGS, PERMANENTFLAGS), CAPABILITY with IDLE/LITERAL+/AUTH=PLAIN.
FETCH correctly handles sequence sets (1:*, 1:3, *), item dispatch
(FLAGS, ENVELOPE, BODY[], RFC822, RFC822.SIZE), and places body literals
last per RFC 3501.

POP3: extended with same 10 bait emails, fixed banner env var key
(POP3_BANNER not IMAP_BANNER), CAPA fully populated (TOP/UIDL/USER/
RESP-CODES/SASL), TOP (headers + N body lines), UIDL (msg-N format),
DELE/RSET with _deleted set tracking, NOOP. _active_messages() helper
excludes DELE'd messages from STAT/LIST/UIDL.

Both: DEBT-026 stub added (_EMAIL_SEED_PATH env var, documented in
DEBT.md for next-session JSON seed file wiring).

Tests: test_imap.py expanded to 27 cases, test_pop3.py to 22 cases —
860 total tests passing.
This commit is contained in:
2026-04-11 03:12:32 -04:00
parent 1196363d0b
commit c7713c6228
5 changed files with 1326 additions and 234 deletions

View File

@@ -1,7 +1,10 @@
"""
Tests for templates/imap/server.py
Exercises IMAP state machine, auth, and negative tests.
Exercises the full IMAP4rev1 state machine:
NOT_AUTHENTICATED → AUTHENTICATED → SELECTED
Uses asyncio Protocol directly — no network socket needed.
"""
import importlib.util
@@ -12,6 +15,8 @@ from unittest.mock import MagicMock, patch
import pytest
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_fake_decnet_logging() -> ModuleType:
mod = ModuleType("decnet_logging")
mod.syslog_line = MagicMock(return_value="")
@@ -21,11 +26,13 @@ def _make_fake_decnet_logging() -> ModuleType:
mod.SEVERITY_INFO = 6
return mod
def _load_imap():
"""Import imap server module, injecting a stub decnet_logging."""
env = {
"NODE_NAME": "testhost",
"IMAP_USERS": "admin:admin123,root:toor",
"IMAP_BANNER": "* OK [testhost] Dovecot ready."
"IMAP_BANNER": "* OK [testhost] Dovecot ready.",
}
for key in list(sys.modules):
if key in ("imap_server", "decnet_logging"):
@@ -33,13 +40,17 @@ def _load_imap():
sys.modules["decnet_logging"] = _make_fake_decnet_logging()
spec = importlib.util.spec_from_file_location("imap_server", "templates/imap/server.py")
spec = importlib.util.spec_from_file_location(
"imap_server", "templates/imap/server.py"
)
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
"""Return (protocol, transport, written). Banner already cleared."""
proto = mod.IMAPProtocol()
transport = MagicMock()
written: list[bytes] = []
@@ -48,42 +59,270 @@ def _make_protocol(mod):
written.clear()
return proto, transport, written
def _send(proto, data: str) -> None:
proto.data_received(data.encode() + b"\r\n")
def _replies(written: list[bytes]) -> bytes:
return b"".join(written)
def _login(proto, written):
_send(proto, "A0 LOGIN admin admin123")
written.clear()
def _select_inbox(proto, written):
_send(proto, "B0 SELECT INBOX")
written.clear()
@pytest.fixture
def imap_mod():
return _load_imap()
# ── Tests: banner & unauthenticated ──────────────────────────────────────────
def test_imap_banner_on_connect(imap_mod):
proto = imap_mod.IMAPProtocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
banner = b"".join(written)
assert banner.startswith(b"* OK")
def test_imap_capability_contains_idle_and_literal_plus(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_send(proto, "C1 CAPABILITY")
resp = _replies(written)
assert b"IMAP4rev1" in resp
assert b"IDLE" in resp
assert b"LITERAL+" in resp
assert b"AUTH=PLAIN" in resp
def test_imap_login_success(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 LOGIN admin admin123')
assert b"A1 OK" in b"".join(written)
proto, _, written = _make_protocol(imap_mod)
_send(proto, "A1 LOGIN admin admin123")
assert b"A1 OK" in _replies(written)
assert proto._state == "AUTHENTICATED"
def test_imap_login_fail(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 LOGIN admin wrongpass')
assert b"A1 NO" in b"".join(written)
proto, _, written = _make_protocol(imap_mod)
_send(proto, "A1 LOGIN admin wrongpass")
resp = _replies(written)
assert b"A1 NO" in resp
assert b"AUTHENTICATIONFAILED" in resp
assert proto._state == "NOT_AUTHENTICATED"
def test_imap_select_before_auth(imap_mod):
def test_imap_bad_creds_connection_stays_open(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A2 SELECT INBOX')
assert b"A2 BAD" in b"".join(written)
_send(proto, "T1 LOGIN admin wrongpass")
transport.close.assert_not_called()
def test_imap_retry_after_bad_credentials_succeeds(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_send(proto, "T1 LOGIN admin wrongpass")
written.clear()
_send(proto, "T2 LOGIN admin admin123")
assert b"T2 OK" in _replies(written)
assert proto._state == "AUTHENTICATED"
def test_imap_select_before_auth_returns_bad(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_send(proto, "A2 SELECT INBOX")
assert b"A2 BAD" in _replies(written)
def test_imap_noop_unauthenticated_returns_ok(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_send(proto, "N1 NOOP")
assert b"N1 OK" in _replies(written)
def test_imap_unknown_command_returns_bad(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_send(proto, "X1 INVALID_COMMAND")
assert b"X1 BAD" in _replies(written)
# ── Tests: authenticated state ────────────────────────────────────────────────
def test_imap_list_returns_four_mailboxes(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_send(proto, 'L1 LIST "" "*"')
resp = _replies(written)
assert b"INBOX" in resp
assert b"Sent" in resp
assert b"Drafts" in resp
assert b"Archive" in resp
assert b"LIST completed" in resp
def test_imap_lsub_mirrors_list(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_send(proto, 'L2 LSUB "" "*"')
resp = _replies(written)
assert b"INBOX" in resp
assert b"LSUB completed" in resp
def test_imap_status_inbox_messages(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_send(proto, "S0 STATUS INBOX (MESSAGES)")
resp = _replies(written)
assert b"STATUS INBOX" in resp
assert b"MESSAGES 10" in resp
# ── Tests: SELECTED state ─────────────────────────────────────────────────────
def test_imap_select_inbox_exists_count(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_send(proto, "S1 SELECT INBOX")
resp = _replies(written)
assert b"* 10 EXISTS" in resp
def test_imap_select_inbox_uidnext(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_send(proto, "S1 SELECT INBOX")
resp = _replies(written)
assert b"UIDNEXT 11" in resp
def test_imap_select_inbox_read_write(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_send(proto, "S1 SELECT INBOX")
resp = _replies(written)
assert b"READ-WRITE" in resp
def test_imap_examine_inbox_read_only(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_send(proto, "S2 EXAMINE INBOX")
resp = _replies(written)
assert b"READ-ONLY" in resp
def test_imap_search_all_returns_all_seqs(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "Q1 SEARCH ALL")
resp = _replies(written)
assert b"* SEARCH 1 2 3 4 5 6 7 8 9 10" in resp
def test_imap_fetch_single_body_aws_key(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "F1 FETCH 1 BODY[]")
resp = _replies(written)
assert b"AKIAIOSFODNN7EXAMPLE" in resp
assert b"F1 OK" in resp
def test_imap_fetch_after_select(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 LOGIN admin admin123')
proto, _, written = _make_protocol(imap_mod)
_send(proto, "A1 LOGIN admin admin123")
written.clear()
_send(proto, 'A2 SELECT INBOX')
_send(proto, "A2 SELECT INBOX")
written.clear()
_send(proto, 'A3 FETCH 1 RFC822')
combined = b"".join(written)
_send(proto, "A3 FETCH 1 RFC822")
combined = _replies(written)
assert b"A3 OK" in combined
assert b"AKIAIOSFODNN7EXAMPLE" in combined
def test_imap_invalid_command(imap_mod):
def test_imap_fetch_msg5_root_password(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "F2 FETCH 5 BODY[]")
resp = _replies(written)
assert b"r00tM3T00!" in resp
def test_imap_fetch_range_flags_envelope_count(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "F3 FETCH 1:3 (FLAGS ENVELOPE)")
resp = _replies(written)
assert b"* 1 FETCH" in resp
assert b"* 2 FETCH" in resp
assert b"* 3 FETCH" in resp
assert b"FETCH completed" in resp
def test_imap_fetch_star_rfc822size_10_responses(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "F4 FETCH 1:* RFC822.SIZE")
resp = _replies(written).decode(errors="replace")
assert resp.count(" FETCH ") >= 10
assert "F4 OK" in resp
def test_imap_uid_fetch_includes_uid_field(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "U1 UID FETCH 1:10 (FLAGS)")
resp = _replies(written)
assert b"UID 1" in resp
assert b"FETCH completed" in resp
def test_imap_close_returns_to_authenticated(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "C1 CLOSE")
resp = _replies(written)
assert b"CLOSE completed" in resp
assert proto._state == "AUTHENTICATED"
def test_imap_fetch_after_close_returns_bad(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_login(proto, written)
_select_inbox(proto, written)
_send(proto, "C1 CLOSE")
written.clear()
_send(proto, "C2 FETCH 1 FLAGS")
assert b"C2 BAD" in _replies(written)
def test_imap_logout_sends_bye_and_closes(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 INVALID')
assert b"A1 BAD" in b"".join(written)
_login(proto, written)
_send(proto, "L1 LOGOUT")
resp = _replies(written)
assert b"* BYE" in resp
assert b"LOGOUT completed" in resp
transport.close.assert_called_once()
def test_imap_invalid_command(imap_mod):
proto, _, written = _make_protocol(imap_mod)
_send(proto, "A1 INVALID")
assert b"A1 BAD" in _replies(written)

View File

@@ -1,7 +1,10 @@
"""
Tests for templates/pop3/server.py
Exercises POP3 state machine, auth, and negative tests.
Exercises the full POP3 state machine:
AUTHORIZATION → TRANSACTION
Uses asyncio Protocol directly — no network socket needed.
"""
import importlib.util
@@ -12,6 +15,8 @@ from unittest.mock import MagicMock, patch
import pytest
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_fake_decnet_logging() -> ModuleType:
mod = ModuleType("decnet_logging")
mod.syslog_line = MagicMock(return_value="")
@@ -21,11 +26,12 @@ def _make_fake_decnet_logging() -> ModuleType:
mod.SEVERITY_INFO = 6
return mod
def _load_pop3():
env = {
"NODE_NAME": "testhost",
"IMAP_USERS": "admin:admin123,root:toor",
"IMAP_BANNER": "+OK [testhost] Dovecot ready."
"IMAP_BANNER": "+OK [testhost] Dovecot ready.",
}
for key in list(sys.modules):
if key in ("pop3_server", "decnet_logging"):
@@ -33,13 +39,17 @@ def _load_pop3():
sys.modules["decnet_logging"] = _make_fake_decnet_logging()
spec = importlib.util.spec_from_file_location("pop3_server", "templates/pop3/server.py")
spec = importlib.util.spec_from_file_location(
"pop3_server", "templates/pop3/server.py"
)
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
"""Return (protocol, transport, written). Banner already cleared."""
proto = mod.POP3Protocol()
transport = MagicMock()
written: list[bytes] = []
@@ -48,51 +58,229 @@ def _make_protocol(mod):
written.clear()
return proto, transport, written
def _send(proto, data: str) -> None:
proto.data_received(data.encode() + b"\r\n")
def _replies(written: list[bytes]) -> bytes:
return b"".join(written)
def _login(proto, written):
_send(proto, "USER admin")
_send(proto, "PASS admin123")
written.clear()
@pytest.fixture
def pop3_mod():
return _load_pop3()
# ── Tests: banner & unauthenticated ──────────────────────────────────────────
def test_pop3_banner_starts_with_ok(pop3_mod):
proto = pop3_mod.POP3Protocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
banner = b"".join(written)
assert banner.startswith(b"+OK")
def test_pop3_capa_contains_top_uidl_user(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "CAPA")
resp = _replies(written)
assert b"TOP" in resp
assert b"UIDL" in resp
assert b"USER" in resp
def test_pop3_login_success(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'USER admin')
assert b"+OK" in b"".join(written)
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
assert b"+OK" in _replies(written)
written.clear()
_send(proto, 'PASS admin123')
assert b"+OK Logged in" in b"".join(written)
_send(proto, "PASS admin123")
assert b"+OK Logged in" in _replies(written)
assert proto._state == "TRANSACTION"
def test_pop3_login_fail(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'USER admin')
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
written.clear()
_send(proto, 'PASS wrongpass')
assert b"-ERR" in b"".join(written)
_send(proto, "PASS wrongpass")
assert b"-ERR" in _replies(written)
assert proto._state == "AUTHORIZATION"
def test_pop3_pass_before_user(pop3_mod):
def test_pop3_bad_pass_connection_stays_open(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'PASS admin123')
assert b"-ERR" in b"".join(written)
_send(proto, "USER admin")
_send(proto, "PASS wrongpass")
transport.close.assert_not_called()
def test_pop3_retry_after_bad_pass_succeeds(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
_send(proto, "PASS wrongpass")
written.clear()
_send(proto, "USER admin")
_send(proto, "PASS admin123")
assert b"+OK Logged in" in _replies(written)
def test_pop3_pass_before_user(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "PASS admin123")
assert b"-ERR" in _replies(written)
def test_pop3_stat_before_auth(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'STAT')
assert b"-ERR" in b"".join(written)
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "STAT")
assert b"-ERR" in _replies(written)
def test_pop3_retr_after_auth(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'USER admin')
_send(proto, 'PASS admin123')
def test_pop3_retr_before_auth(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "RETR 1")
assert b"-ERR" in _replies(written)
def test_pop3_invalid_command(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "INVALID")
assert b"-ERR" in _replies(written)
# ── Tests: TRANSACTION state ──────────────────────────────────────────────────
def test_pop3_stat_10_messages(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "STAT")
resp = _replies(written).decode()
assert resp.startswith("+OK 10 ")
def test_pop3_list_returns_10_entries(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "LIST")
resp = _replies(written).decode()
assert resp.startswith("+OK 10")
# Count individual message lines: "N size\r\n"
entries = [l for l in resp.split("\r\n") if l and l[0].isdigit()]
assert len(entries) == 10
def test_pop3_retr_after_auth_msg1(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
_send(proto, "PASS admin123")
written.clear()
_send(proto, 'RETR 1')
combined = b"".join(written)
_send(proto, "RETR 1")
combined = _replies(written)
assert b"+OK" in combined
assert b"AKIAIOSFODNN7EXAMPLE" in combined
def test_pop3_invalid_command(pop3_mod):
def test_pop3_retr_msg5_root_password(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "RETR 5")
resp = _replies(written)
assert b"+OK" in resp
assert b"r00tM3T00!" in resp
def test_pop3_top_returns_headers_plus_lines(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "TOP 1 3")
resp = _replies(written).decode(errors="replace")
assert resp.startswith("+OK")
# Headers must be present
assert "From:" in resp
assert "Subject:" in resp
# Should NOT contain body content beyond 3 lines — but 3 lines of the
# AWS email body are enough to include the access key
assert ".\r\n" in resp
def test_pop3_top_3_body_lines_count(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
# Message 1 body after blank line:
# "Team,\r\n", "\r\n", "New AWS credentials...\r\n", ...
_send(proto, "TOP 1 3")
resp = _replies(written).decode(errors="replace")
# Strip headers up to blank line
parts = resp.split("\r\n\r\n", 1)
assert len(parts) == 2
body_section = parts[1].rstrip("\r\n.")
body_lines = [l for l in body_section.split("\r\n") if l != "."]
assert len(body_lines) <= 3
def test_pop3_uidl_returns_10_entries(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "UIDL")
resp = _replies(written).decode()
assert resp.startswith("+OK")
entries = [l for l in resp.split("\r\n") if l and l[0].isdigit()]
assert len(entries) == 10
def test_pop3_uidl_format_msg_n(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "UIDL")
resp = _replies(written).decode()
assert "1 msg-1" in resp
assert "5 msg-5" in resp
def test_pop3_dele_removes_message(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "DELE 3")
resp = _replies(written)
assert b"+OK" in resp
assert 2 in proto._deleted # 0-based
def test_pop3_rset_clears_deletions(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "DELE 1")
_send(proto, "DELE 2")
written.clear()
_send(proto, "RSET")
resp = _replies(written)
assert b"+OK" in resp
assert len(proto._deleted) == 0
def test_pop3_dele_then_stat_decrements_count(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "DELE 1")
written.clear()
_send(proto, "STAT")
resp = _replies(written).decode()
assert resp.startswith("+OK 9 ")
def test_pop3_quit_closes_connection(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'INVALID')
assert b"-ERR" in b"".join(written)
_login(proto, written)
_send(proto, "QUIT")
transport.close.assert_called_once()