merge testing->tomerge/main #7
@@ -87,9 +87,10 @@ class SMTPProtocol(asyncio.Protocol):
|
||||
|
||||
def data_received(self, data):
|
||||
self._buf += data
|
||||
while b"\r\n" in self._buf:
|
||||
line, self._buf = self._buf.split(b"\r\n", 1)
|
||||
self._handle_line(line.decode(errors="replace"))
|
||||
while b"\n" in self._buf:
|
||||
line, self._buf = self._buf.split(b"\n", 1)
|
||||
# Strip trailing \r so both CRLF and bare LF work
|
||||
self._handle_line(line.rstrip(b"\r").decode(errors="replace"))
|
||||
|
||||
def connection_lost(self, exc):
|
||||
_log("disconnect", src=self._peer[0] if self._peer else "?")
|
||||
@@ -118,7 +119,12 @@ class SMTPProtocol(asyncio.Protocol):
|
||||
self._data_buf.append(line[1:] if line.startswith(".") else line)
|
||||
return
|
||||
|
||||
# ── AUTH multi-step (LOGIN mechanism) ─────────────────────────────────
|
||||
# ── AUTH multi-step (LOGIN / PLAIN continuation) ─────────────────────
|
||||
if self._auth_state == "await_plain":
|
||||
user, password = _decode_auth_plain(line)
|
||||
self._finish_auth(user, password)
|
||||
self._auth_state = ""
|
||||
return
|
||||
if self._auth_state == "await_user":
|
||||
self._auth_user = base64.b64decode(line + "==").decode(errors="replace")
|
||||
self._auth_state = "await_pass"
|
||||
|
||||
@@ -301,3 +301,78 @@ def test_decode_auth_plain_garbage_no_raise(relay_mod):
|
||||
user, pw = relay_mod._decode_auth_plain("!!!notbase64!!!")
|
||||
assert isinstance(user, str)
|
||||
assert isinstance(pw, str)
|
||||
|
||||
|
||||
# ── Bare LF line endings ────────────────────────────────────────────────────
|
||||
|
||||
def _send_bare_lf(proto, *lines: str) -> None:
|
||||
"""Feed LF-only terminated lines to the protocol (simulates telnet/nc)."""
|
||||
for line in lines:
|
||||
proto.data_received((line + "\n").encode())
|
||||
|
||||
|
||||
def test_ehlo_works_with_bare_lf(relay_mod):
|
||||
"""Clients sending bare LF (telnet, nc) must get EHLO responses."""
|
||||
proto, _, written = _make_protocol(relay_mod)
|
||||
_send_bare_lf(proto, "EHLO attacker.com")
|
||||
combined = b"".join(written).decode()
|
||||
assert "250" in combined
|
||||
assert "AUTH" in combined
|
||||
|
||||
|
||||
def test_full_session_with_bare_lf(relay_mod):
|
||||
"""A complete relay session using bare LF line endings."""
|
||||
proto, _, written = _make_protocol(relay_mod)
|
||||
_send_bare_lf(
|
||||
proto,
|
||||
"EHLO attacker.com",
|
||||
"MAIL FROM:<hacker@evil.com>",
|
||||
"RCPT TO:<admin@target.com>",
|
||||
"DATA",
|
||||
"Subject: test",
|
||||
"",
|
||||
"body",
|
||||
".",
|
||||
"QUIT",
|
||||
)
|
||||
replies = _replies(written)
|
||||
assert any("queued as" in r for r in replies)
|
||||
assert any(r.startswith("221") for r in replies)
|
||||
|
||||
|
||||
def test_mixed_line_endings(relay_mod):
|
||||
"""A single data_received call containing a mix of CRLF and bare LF."""
|
||||
proto, _, written = _make_protocol(relay_mod)
|
||||
proto.data_received(b"EHLO test.com\r\nMAIL FROM:<a@b.com>\nRCPT TO:<c@d.com>\r\n")
|
||||
replies = _replies(written)
|
||||
assert any("250" in r for r in replies)
|
||||
assert any(r.startswith("250 2.1.0") for r in replies)
|
||||
assert any(r.startswith("250 2.1.5") for r in replies)
|
||||
|
||||
|
||||
# ── AUTH PLAIN continuation (no inline credentials) ──────────────────────────
|
||||
|
||||
def test_auth_plain_continuation_relay(relay_mod):
|
||||
"""AUTH PLAIN without inline creds should prompt then accept on next line."""
|
||||
proto, _, written = _make_protocol(relay_mod)
|
||||
_send(proto, "AUTH PLAIN")
|
||||
replies = _replies(written)
|
||||
assert any(r.startswith("334") for r in replies), "Expected 334 continuation"
|
||||
written.clear()
|
||||
creds = base64.b64encode(b"\x00admin\x00password").decode()
|
||||
_send(proto, creds)
|
||||
replies = _replies(written)
|
||||
assert any(r.startswith("235") for r in replies), "Expected 235 auth success"
|
||||
|
||||
|
||||
def test_auth_plain_continuation_harvester(harvester_mod):
|
||||
"""AUTH PLAIN continuation in harvester mode should reject with 535."""
|
||||
proto, _, written = _make_protocol(harvester_mod)
|
||||
_send(proto, "AUTH PLAIN")
|
||||
replies = _replies(written)
|
||||
assert any(r.startswith("334") for r in replies)
|
||||
written.clear()
|
||||
creds = base64.b64encode(b"\x00admin\x00password").decode()
|
||||
_send(proto, creds)
|
||||
replies = _replies(written)
|
||||
assert any(r.startswith("535") for r in replies)
|
||||
|
||||
Reference in New Issue
Block a user