Files
DECNET/tests/service_testing/test_rdp_basic.py
anti a8b9c82c97 feat(creds): DEBT-040 Phase 2 — RDP X.224 cookie capture
Replace Twisted-based connection logger with an asyncio handler that
parses the X.224 Connection Request, extracts the mstshash routing
cookie (universal across mstsc / FreeRDP / Hydra / ncrack / MSF
rdp_login), records the rdpNegRequest.requestedProtocols flags, and
answers with a well-formed X.224 Connection Confirm selecting
PROTOCOL_RDP.

Scope-down vs. the original DEBT-040 plan: full TS_INFO_PACKET
extraction would require either Standard-RDP-Security RC4 stream-
cipher implementation (with our own RSA pair + MS-RDPBCGR signing) or
a complete MCS+GCC ASN.1/BER stack for the SSL path — both far
exceed the 150 LoC budget the DEBT cited. The mstshash cookie is the
only piece of credential information that flows in plaintext on the
wire when the attacker speaks RDP, so capturing it is the highest-
value-per-byte signal available without going down either rabbit
hole. Phase 3 (CredSSP/NLA, next commit) is where actual NTLMv2
hashes land.

- Drops Twisted dependency from rdp/Dockerfile; adds ntlmssp.py copy
  ahead of the NLA path that consumes it.
- 7 unit tests cover cookie capture, requestedProtocols recording,
  CC framing, no-cookie path, and oversized/non-TPKT drops.
2026-04-25 07:34:42 -04:00

165 lines
5.6 KiB
Python

"""Tests for decnet/templates/rdp/server.py — X.224 CR cookie capture.
Drives the asyncio handler with an in-memory StreamReader, asserts:
* mstshash cookie in CR is captured as principal/username.
* rdpNegRequest.requestedProtocols is recorded.
* X.224 Connection Confirm is well-formed and selects PROTOCOL_RDP.
* Malformed / oversized TPKT does not crash the handler.
"""
from __future__ import annotations
import asyncio
import importlib.util
import struct
import sys
from unittest.mock import MagicMock
import pytest
from .conftest import load_real_instance_seed, make_fake_syslog_bridge
# ── Module loader ─────────────────────────────────────────────────────────────
def _load_rdp():
for key in ("rdp_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
sys.modules["instance_seed"] = load_real_instance_seed()
spec = importlib.util.spec_from_file_location(
"rdp_server", "decnet/templates/rdp/server.py"
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
@pytest.fixture
def rdp_mod():
return _load_rdp()
# ── PDU builders ──────────────────────────────────────────────────────────────
def _x224_connection_request(cookie: str | None = None, requested_protocols: int | None = None) -> bytes:
"""Build TPKT(X.224 CR [+ Cookie] [+ rdpNegRequest])."""
var = b""
if cookie is not None:
var += f"Cookie: mstshash={cookie}\r\n".encode("ascii")
if requested_protocols is not None:
var += (
bytes([0x01, 0x00])
+ (8).to_bytes(2, "little")
+ requested_protocols.to_bytes(4, "little")
)
li = 6 + len(var) # length indicator covers bytes after itself
x224 = bytes([li, 0xE0, 0x00, 0x00, 0x00, 0x00, 0x00]) + var
tpkt = bytes([0x03, 0x00]) + (4 + len(x224)).to_bytes(2, "big")
return tpkt + x224
def _make_streams():
reader = asyncio.StreamReader()
writer = MagicMock()
written: list[bytes] = []
writer.write.side_effect = written.append
writer.get_extra_info.return_value = ("203.0.113.42", 49152)
async def _drained():
return None
async def _wait_closed():
return None
writer.drain = _drained
writer.wait_closed = _wait_closed
return reader, writer, written
def _drive(rdp_mod, request_bytes: bytes):
async def _run():
reader, writer, written = _make_streams()
reader.feed_data(request_bytes)
reader.feed_eof()
await asyncio.wait_for(rdp_mod._handle_client(reader, writer), timeout=2.0)
return writer, written
return asyncio.run(_run())
# ── Tests ─────────────────────────────────────────────────────────────────────
def test_cookie_is_captured_as_principal():
mod = _load_rdp()
log_mock = sys.modules["syslog_bridge"]
_drive(mod, _x224_connection_request(cookie="alice"))
cookie_calls = [
c for c in log_mock.syslog_line.call_args_list
if len(c.args) >= 3 and c.args[2] == "rdp_cookie"
]
assert cookie_calls, "expected an rdp_cookie event"
kwargs = cookie_calls[0].kwargs
assert kwargs["principal"] == "alice"
assert kwargs["username"] == "alice"
def test_requested_protocols_recorded():
mod = _load_rdp()
log_mock = sys.modules["syslog_bridge"]
_drive(mod, _x224_connection_request(cookie="bob", requested_protocols=0x03)) # SSL|HYBRID
cookie_calls = [
c for c in log_mock.syslog_line.call_args_list
if len(c.args) >= 3 and c.args[2] == "rdp_cookie"
]
assert cookie_calls
assert cookie_calls[0].kwargs["requested_protocols"] == 0x03
def test_connection_confirm_well_formed(rdp_mod):
_, written = _drive(rdp_mod, _x224_connection_request(cookie="charlie"))
blob = b"".join(written)
assert blob[0] == 0x03 # TPKT version
total = int.from_bytes(blob[2:4], "big")
assert total == len(blob)
# X.224 CC type byte at offset 5
assert blob[5] == 0xD0
# rdpNegRsp begins at offset 11; SelectedProtocol at offset 15 (4 bytes LE)
selected = int.from_bytes(blob[15:19], "little")
assert selected == 0x00000000 # PROTOCOL_RDP
def test_no_cookie_still_replies(rdp_mod):
_, written = _drive(rdp_mod, _x224_connection_request(cookie=None, requested_protocols=0x00))
assert written, "server must still reply with X.224 CC even without cookie"
blob = b"".join(written)
assert blob[5] == 0xD0 # CC
def test_no_cookie_emits_connection_request_event():
mod = _load_rdp()
log_mock = sys.modules["syslog_bridge"]
_drive(mod, _x224_connection_request(cookie=None))
types = [
c.args[2] for c in log_mock.syslog_line.call_args_list
if len(c.args) >= 3
]
assert "connection_request" in types
assert "rdp_cookie" not in types
def test_oversized_tpkt_is_dropped(rdp_mod):
# TPKT len = 65535 → above MAX_TPKT_LEN; handler must reject without
# waiting for the full body.
bad = bytes([0x03, 0x00, 0xFF, 0xFF])
_, written = _drive(rdp_mod, bad)
assert written == []
def test_non_tpkt_first_byte_is_dropped(rdp_mod):
bad = b"\x16\x03\x01\x00\x10" + b"\x00" * 11 # looks like TLS ClientHello
_, written = _drive(rdp_mod, bad)
assert written == []