"""Tests for decnet/templates/smb/server.py — hand-rolled SMB2 framer. Drives the asyncio handler with an in-memory StreamReader and a mocked StreamWriter. Exercises the full Negotiate → SessionSetup(Type1) → SessionSetup(Type3) flow and asserts that an NTLMSSP Type 3 lands in the universal credential SD shape. """ from __future__ import annotations import asyncio import importlib.util import struct import sys from unittest.mock import MagicMock import pytest from .conftest import load_real_instance_seed, make_fake_syslog_bridge # ── Module loader ───────────────────────────────────────────────────────────── def _load_real_ntlmssp(): spec = importlib.util.spec_from_file_location( "ntlmssp", "decnet/templates/_shared/ntlmssp.py" ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod def _load_smb(): for key in ("smb_server", "syslog_bridge", "instance_seed", "ntlmssp"): sys.modules.pop(key, None) sys.modules["syslog_bridge"] = make_fake_syslog_bridge() sys.modules["instance_seed"] = load_real_instance_seed() sys.modules["ntlmssp"] = _load_real_ntlmssp() spec = importlib.util.spec_from_file_location( "smb_server", "decnet/templates/smb/server.py" ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def smb_mod(): return _load_smb() def _make_streams(): """Return (reader, writer, written) — writer.write() collects bytes. Must be called from inside a running event loop because asyncio.StreamReader's __init__ needs one in Python 3.11. """ reader = asyncio.StreamReader() writer = MagicMock() written: list[bytes] = [] writer.write.side_effect = written.append writer.get_extra_info.return_value = ("198.51.100.7", 51234) async def _wait_closed(): return None writer.wait_closed = _wait_closed return reader, writer, written # ── PDU builders ────────────────────────────────────────────────────────────── def _nbss(payload: bytes) -> bytes: return bytes([0x00]) + len(payload).to_bytes(3, "big") + payload def _smb2_header(command: int, message_id: int, session_id: int = 0) -> bytes: return ( b"\xfeSMB" + struct.pack(" bytes: # SMB2 NEGOTIATE Request (MS-SMB2 §2.2.3) — minimal, 1 dialect body = ( struct.pack(" bytes: body = ( struct.pack(" bytes: return b"NTLMSSP\x00" + struct.pack(" bytes: """Build a minimal valid NTLMSSP Type 3 with NEGOTIATE_UNICODE.""" user_b = username.encode("utf-16-le") dom_b = domain.encode("utf-16-le") workstation = b"" payload = nt_response + dom_b + user_b + workstation # 64-byte header + 8-byte version nt_off = 72 dom_off = nt_off + len(nt_response) user_off = dom_off + len(dom_b) ws_off = user_off + len(user_b) flags = 0x00000001 # NEGOTIATE_UNICODE return ( b"NTLMSSP\x00" + struct.pack("= 2 smb = written[1][4:] status = struct.unpack_from("= 3 and c.args[2] == "auth_attempt" ] assert auth_calls, f"no auth_attempt logged; got: {log_mock.syslog_line.call_args_list}" kwargs = auth_calls[0].kwargs assert kwargs["principal"] == "ACME\\alice" assert kwargs["secret_kind"] == "ntlmssp_v2" assert kwargs["username"] == "alice" assert kwargs["domain"] == "ACME" assert "secret_b64" in kwargs and kwargs["secret_b64"] def test_second_session_setup_returns_logon_failure(smb_mod): nt_response = b"\xbb" * 32 type3 = _ntlmssp_type3("bob", "", nt_response) pkts = ( _nbss(_negotiate_request()) + _nbss(_session_setup_request(1, _ntlmssp_type1())) + _nbss(_session_setup_request(2, type3)) ) _, written = _drive(smb_mod, pkts) smb = written[-1][4:] status = struct.unpack_from(" MAX_NBSS_LEN; framer should bail before allocating bad = bytes([0x00]) + (8 * 1024 * 1024).to_bytes(3, "big") _, written = _drive(smb_mod, bad) assert written == [] def test_smb1_negotiate_drops_connection(smb_mod): # 0xff 'SMB' is the SMB1 magic — our framer doesn't speak it pdu = b"\xffSMB" + b"\x00" * 60 _, written = _drive(smb_mod, _nbss(pdu)) assert written == [] def test_short_pdu_below_64_drops(smb_mod): # NBSS length < 64 should be rejected bad = bytes([0x00]) + (32).to_bytes(3, "big") + b"\x00" * 32 _, written = _drive(smb_mod, bad) assert written == []