Files
DECNET/tests/service_testing/test_rdp_basic.py
anti b3d1301925 feat(creds): DEBT-040 Phase 3 — RDP NLA / CredSSP NTLMv2 capture
When RDP_ENABLE_NLA=true (service_cfg.nla=true on the topology side),
confirm PROTOCOL_HYBRID on the X.224 Connection Confirm, upgrade the
socket to TLS using a self-signed cert generated at first start by
the entrypoint, then drive a tiny CredSSP loop:

- Read inbound TSRequest DER (bounded to MAX_TSREQUEST_LEN).
- Scan for the NTLMSSP signature, dispatch on message type:
  Type 1 -> respond with a hand-built TSRequest carrying our Type 2
  challenge. Type 3 -> parse_type3() and emit auth_attempt with the
  universal credential SD shape (secret_kind = ntlmssp_v2).
- Hand-built DER: no pyasn1 dependency.

Also folds in a small fix-up to commit 1: SMB SERVER_CHALLENGE was
hardcoded to 0x11..0x88 across the fleet, which would let a scanner
fingerprint every DECNET decky by its NTLM challenge. Both SMB and
RDP now derive the 8-byte challenge from
instance_seed.random_bytes(8, "ntlm_challenge"), giving each decky a
deterministic-but-distinct value. SMB Dockerfile gets the
instance_seed.py copy too (was synced into the build context but not
COPYed into the image).

- decnet/services/rdp.py: optional service_cfg.nla bool flips
  RDP_ENABLE_NLA in the compose env.
- decnet/templates/rdp/Dockerfile + entrypoint.sh: openssl install +
  per-decky cert generation gated on RDP_ENABLE_NLA.
- 9 NLA unit tests cover the DER reader/builder, _handle_nla round-
  trip with Type 1 / Type 3, oversized-DER rejection, and per-
  NODE_NAME challenge divergence.
- DEBT.md: DEBT-040 closed; full TS_INFO_PACKET capture documented as
  a follow-up if attacker telemetry justifies it.
2026-04-25 07:42:52 -04:00

175 lines
5.9 KiB
Python

"""Tests for decnet/templates/rdp/server.py — X.224 CR cookie capture.
Drives the asyncio handler with an in-memory StreamReader, asserts:
* mstshash cookie in CR is captured as principal/username.
* rdpNegRequest.requestedProtocols is recorded.
* X.224 Connection Confirm is well-formed and selects PROTOCOL_RDP.
* Malformed / oversized TPKT does not crash the handler.
"""
from __future__ import annotations
import asyncio
import importlib.util
import struct
import sys
from unittest.mock import MagicMock
import pytest
from .conftest import load_real_instance_seed, make_fake_syslog_bridge
# ── Module loader ─────────────────────────────────────────────────────────────
def _load_real_ntlmssp():
spec = importlib.util.spec_from_file_location(
"ntlmssp", "decnet/templates/_shared/ntlmssp.py"
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _load_rdp():
for key in ("rdp_server", "syslog_bridge", "instance_seed", "ntlmssp"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
sys.modules["instance_seed"] = load_real_instance_seed()
sys.modules["ntlmssp"] = _load_real_ntlmssp()
spec = importlib.util.spec_from_file_location(
"rdp_server", "decnet/templates/rdp/server.py"
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
@pytest.fixture
def rdp_mod():
return _load_rdp()
# ── PDU builders ──────────────────────────────────────────────────────────────
def _x224_connection_request(cookie: str | None = None, requested_protocols: int | None = None) -> bytes:
"""Build TPKT(X.224 CR [+ Cookie] [+ rdpNegRequest])."""
var = b""
if cookie is not None:
var += f"Cookie: mstshash={cookie}\r\n".encode("ascii")
if requested_protocols is not None:
var += (
bytes([0x01, 0x00])
+ (8).to_bytes(2, "little")
+ requested_protocols.to_bytes(4, "little")
)
li = 6 + len(var) # length indicator covers bytes after itself
x224 = bytes([li, 0xE0, 0x00, 0x00, 0x00, 0x00, 0x00]) + var
tpkt = bytes([0x03, 0x00]) + (4 + len(x224)).to_bytes(2, "big")
return tpkt + x224
def _make_streams():
reader = asyncio.StreamReader()
writer = MagicMock()
written: list[bytes] = []
writer.write.side_effect = written.append
writer.get_extra_info.return_value = ("203.0.113.42", 49152)
async def _drained():
return None
async def _wait_closed():
return None
writer.drain = _drained
writer.wait_closed = _wait_closed
return reader, writer, written
def _drive(rdp_mod, request_bytes: bytes):
async def _run():
reader, writer, written = _make_streams()
reader.feed_data(request_bytes)
reader.feed_eof()
await asyncio.wait_for(rdp_mod._handle_client(reader, writer), timeout=2.0)
return writer, written
return asyncio.run(_run())
# ── Tests ─────────────────────────────────────────────────────────────────────
def test_cookie_is_captured_as_principal():
mod = _load_rdp()
log_mock = sys.modules["syslog_bridge"]
_drive(mod, _x224_connection_request(cookie="alice"))
cookie_calls = [
c for c in log_mock.syslog_line.call_args_list
if len(c.args) >= 3 and c.args[2] == "rdp_cookie"
]
assert cookie_calls, "expected an rdp_cookie event"
kwargs = cookie_calls[0].kwargs
assert kwargs["principal"] == "alice"
assert kwargs["username"] == "alice"
def test_requested_protocols_recorded():
mod = _load_rdp()
log_mock = sys.modules["syslog_bridge"]
_drive(mod, _x224_connection_request(cookie="bob", requested_protocols=0x03)) # SSL|HYBRID
cookie_calls = [
c for c in log_mock.syslog_line.call_args_list
if len(c.args) >= 3 and c.args[2] == "rdp_cookie"
]
assert cookie_calls
assert cookie_calls[0].kwargs["requested_protocols"] == 0x03
def test_connection_confirm_well_formed(rdp_mod):
_, written = _drive(rdp_mod, _x224_connection_request(cookie="charlie"))
blob = b"".join(written)
assert blob[0] == 0x03 # TPKT version
total = int.from_bytes(blob[2:4], "big")
assert total == len(blob)
# X.224 CC type byte at offset 5
assert blob[5] == 0xD0
# rdpNegRsp begins at offset 11; SelectedProtocol at offset 15 (4 bytes LE)
selected = int.from_bytes(blob[15:19], "little")
assert selected == 0x00000000 # PROTOCOL_RDP
def test_no_cookie_still_replies(rdp_mod):
_, written = _drive(rdp_mod, _x224_connection_request(cookie=None, requested_protocols=0x00))
assert written, "server must still reply with X.224 CC even without cookie"
blob = b"".join(written)
assert blob[5] == 0xD0 # CC
def test_no_cookie_emits_connection_request_event():
mod = _load_rdp()
log_mock = sys.modules["syslog_bridge"]
_drive(mod, _x224_connection_request(cookie=None))
types = [
c.args[2] for c in log_mock.syslog_line.call_args_list
if len(c.args) >= 3
]
assert "connection_request" in types
assert "rdp_cookie" not in types
def test_oversized_tpkt_is_dropped(rdp_mod):
# TPKT len = 65535 → above MAX_TPKT_LEN; handler must reject without
# waiting for the full body.
bad = bytes([0x03, 0x00, 0xFF, 0xFF])
_, written = _drive(rdp_mod, bad)
assert written == []
def test_non_tpkt_first_byte_is_dropped(rdp_mod):
bad = b"\x16\x03\x01\x00\x10" + b"\x00" * 11 # looks like TLS ClientHello
_, written = _drive(rdp_mod, bad)
assert written == []