Files
DECNET/tests/service_testing/test_pop3.py
anti 3404e3b3a6 feat(creds): Phase 1 — Authorization header + SNMP community capture
Closes the cred-coverage gap for 7 services that already had the data
on the wire but never landed it in the Credential table:

- SNMP — community string lands as secret_kind="snmp_community",
  principal=None (v1/v2c has no per-user identity, the community IS
  the auth).
- SIP — Digest response hash, previously buried in the auth= header
  dump, now classify_authorization()-extracted.
- HTTP / HTTPS — Authorization header was in the headers JSON but
  never extracted. Now Basic decodes to plaintext, Bearer →
  http_bearer (principal=None), Digest → http_digest_md5.
- K8s — already extracted Authorization but didn't normalize. Service-
  account JWTs flow through as Bearer.
- Docker API — headers absent entirely. Adds the headers JSON dump
  and runs Authorization through the classifier.
- Elasticsearch — five distinct request handlers; each gains a
  per-handler _cred_fields() helper.

Adds canonical templates/syslog_bridge.py:classify_authorization().
Recognised: Basic / Bearer / Token / Digest. Unknown schemes (NTLM,
AWS4-HMAC, Negotiate) return None; the header still rides in the
ambient SD-block but isn't normalized as a credential. The SD shape
on the wire collapses sip_digest_md5 into http_digest_md5 — same
algorithm, so cross-protocol reuse correlates correctly when (rare)
nonce collisions allow.

Drive-by repair of tests/core/test_fingerprinting.py:

- The pre-existing `test_http_useragent_extracted` asserted both that
  add_bounty was called exactly once AND that the UA payload carried
  `path` and `method` fields. Both wrong since this session opened:
  the http_quirks fingerprint added later fires too, and the UA
  payload never actually included path/method despite the assertion.
- Adds `path`/`method` to the UA fingerprint payload (real operator
  value: "Nikto hit /admin" beats "Nikto seen on this decky").
- Replaces `assert_awaited_once` with a `_find_ua_bounty()` helper
  that filters add_bounty calls by `fingerprint_type`. New fingerprint
  families landing later won't retroactively break old tests.
- Updates the two credential-bearing tests to use the post-DEBT-039
  native shape (`secret_b64` / `principal`) and `upsert_credential`,
  not the deleted legacy `username+password` adapter.

Also rebuilds the per-service fake `syslog_bridge` modules in
tests/service_testing/{conftest,test_imap,test_pop3,test_snmp,test_mqtt,test_smtp}.py
to expose `encode_secret` + `classify_authorization`. Service templates
that import either now no longer fail at test collection.

173 tests pass in the touched scope. Phases 2-7 still pending.
2026-04-25 07:04:10 -04:00

289 lines
8.5 KiB
Python

"""
Tests for decnet/templates/pop3/server.py
Exercises the full POP3 state machine:
AUTHORIZATION → TRANSACTION
Uses asyncio Protocol directly — no network socket needed.
"""
import importlib.util
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_fake_syslog_bridge() -> ModuleType:
mod = ModuleType("syslog_bridge")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
mod.encode_secret = MagicMock(return_value={"secret_printable": "", "secret_b64": ""})
mod.classify_authorization = MagicMock(return_value=None)
return mod
def _load_pop3():
env = {
"NODE_NAME": "testhost",
"IMAP_USERS": "admin:admin123,root:toor",
"IMAP_BANNER": "+OK [testhost] Dovecot ready.",
}
for key in list(sys.modules):
if key in ("pop3_server", "syslog_bridge"):
del sys.modules[key]
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
spec = importlib.util.spec_from_file_location(
"pop3_server", "decnet/templates/pop3/server.py"
)
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
"""Return (protocol, transport, written). Banner already cleared."""
proto = mod.POP3Protocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
written.clear()
return proto, transport, written
def _send(proto, data: str) -> None:
proto.data_received(data.encode() + b"\r\n")
def _replies(written: list[bytes]) -> bytes:
return b"".join(written)
def _login(proto, written):
_send(proto, "USER admin")
_send(proto, "PASS admin123")
written.clear()
@pytest.fixture
def pop3_mod():
return _load_pop3()
# ── Tests: banner & unauthenticated ──────────────────────────────────────────
def test_pop3_banner_starts_with_ok(pop3_mod):
proto = pop3_mod.POP3Protocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
banner = b"".join(written)
assert banner.startswith(b"+OK")
def test_pop3_capa_contains_top_uidl_user(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "CAPA")
resp = _replies(written)
assert b"TOP" in resp
assert b"UIDL" in resp
assert b"USER" in resp
def test_pop3_login_success(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
assert b"+OK" in _replies(written)
written.clear()
_send(proto, "PASS admin123")
assert b"+OK Logged in" in _replies(written)
assert proto._state == "TRANSACTION"
def test_pop3_login_fail(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
written.clear()
_send(proto, "PASS wrongpass")
assert b"-ERR" in _replies(written)
assert proto._state == "AUTHORIZATION"
def test_pop3_bad_pass_connection_stays_open(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
_send(proto, "PASS wrongpass")
transport.close.assert_not_called()
def test_pop3_retry_after_bad_pass_succeeds(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
_send(proto, "PASS wrongpass")
written.clear()
_send(proto, "USER admin")
_send(proto, "PASS admin123")
assert b"+OK Logged in" in _replies(written)
def test_pop3_pass_before_user(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "PASS admin123")
assert b"-ERR" in _replies(written)
def test_pop3_stat_before_auth(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "STAT")
assert b"-ERR" in _replies(written)
def test_pop3_retr_before_auth(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "RETR 1")
assert b"-ERR" in _replies(written)
def test_pop3_invalid_command(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "INVALID")
assert b"-ERR" in _replies(written)
# ── Tests: TRANSACTION state ──────────────────────────────────────────────────
def test_pop3_stat_10_messages(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "STAT")
resp = _replies(written).decode()
assert resp.startswith("+OK 10 ")
def test_pop3_list_returns_10_entries(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "LIST")
resp = _replies(written).decode()
assert resp.startswith("+OK 10")
# Count individual message lines: "N size\r\n"
entries = [entry for entry in resp.split("\r\n") if entry and entry[0].isdigit()]
assert len(entries) == 10
def test_pop3_retr_after_auth_msg1(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_send(proto, "USER admin")
_send(proto, "PASS admin123")
written.clear()
_send(proto, "RETR 1")
combined = _replies(written)
assert b"+OK" in combined
assert b"AKIAIOSFODNN7EXAMPLE" in combined
def test_pop3_retr_msg5_root_password(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "RETR 5")
resp = _replies(written)
assert b"+OK" in resp
assert b"r00tM3T00!" in resp
def test_pop3_top_returns_headers_plus_lines(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "TOP 1 3")
resp = _replies(written).decode(errors="replace")
assert resp.startswith("+OK")
# Headers must be present
assert "From:" in resp
assert "Subject:" in resp
# Should NOT contain body content beyond 3 lines — but 3 lines of the
# AWS email body are enough to include the access key
assert ".\r\n" in resp
def test_pop3_top_3_body_lines_count(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
# Message 1 body after blank line:
# "Team,\r\n", "\r\n", "New AWS credentials...\r\n", ...
_send(proto, "TOP 1 3")
resp = _replies(written).decode(errors="replace")
# Strip headers up to blank line
parts = resp.split("\r\n\r\n", 1)
assert len(parts) == 2
body_section = parts[1].rstrip("\r\n.")
body_lines = [part for part in body_section.split("\r\n") if part != "."]
assert len(body_lines) <= 3
def test_pop3_uidl_returns_10_entries(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "UIDL")
resp = _replies(written).decode()
assert resp.startswith("+OK")
entries = [entry for entry in resp.split("\r\n") if entry and entry[0].isdigit()]
assert len(entries) == 10
def test_pop3_uidl_format_msg_n(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "UIDL")
resp = _replies(written).decode()
assert "1 msg-1" in resp
assert "5 msg-5" in resp
def test_pop3_dele_removes_message(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "DELE 3")
resp = _replies(written)
assert b"+OK" in resp
assert 2 in proto._deleted # 0-based
def test_pop3_rset_clears_deletions(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "DELE 1")
_send(proto, "DELE 2")
written.clear()
_send(proto, "RSET")
resp = _replies(written)
assert b"+OK" in resp
assert len(proto._deleted) == 0
def test_pop3_dele_then_stat_decrements_count(pop3_mod):
proto, _, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "DELE 1")
written.clear()
_send(proto, "STAT")
resp = _replies(written).decode()
assert resp.startswith("+OK 9 ")
def test_pop3_quit_closes_connection(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_login(proto, written)
_send(proto, "QUIT")
transport.close.assert_called_once()