#!/usr/bin/env python3 """ MSSQL (TDS)server. Reads TDS pre-login and login7 packets, extracts username, responds with a login failed error. Logs auth attempts as JSON. """ import asyncio import base64 import os import struct import instance_seed as _seed from syslog_bridge import syslog_line, write_syslog_file, forward_syslog NODE_NAME = os.environ.get("NODE_NAME", "dbserver") SERVICE_NAME = "mssql" LOG_TARGET = os.environ.get("LOG_TARGET", "") # Real SQL Server release families. Pairing (major, minor, build) makes a # subsequent OSQL/sqlcmd version probe line up with what MS published. # Builds are taken from publicly documented latest-CU numbers. _MSSQL_RELEASES = [ # (name, major, minor, build, subbuild) ("SQL Server 2016", 13, 0, 6419, 0), ("SQL Server 2017", 14, 0, 2000, 0), ("SQL Server 2017", 14, 0, 3460, 0), ("SQL Server 2019", 15, 0, 2000, 0), ("SQL Server 2019", 15, 0, 4335, 1), ("SQL Server 2022", 16, 0, 1000, 0), ("SQL Server 2022", 16, 0, 4115, 2), ] _MSSQL_NAME, _VER_MAJ, _VER_MIN, _VER_BUILD, _VER_SUB = _seed.pick(_MSSQL_RELEASES) def _build_prelogin_response() -> bytes: """TDS PRELOGIN response. Version option carries major(1) minor(1) build(2, network order) subbuild(2, network order).""" version_data = ( bytes([_VER_MAJ & 0xff, _VER_MIN & 0xff]) + struct.pack(">H", _VER_BUILD & 0xffff) + struct.pack(">H", _VER_SUB & 0xffff) ) # Option directory + data. Offsets are from start of directory. # Five options: VERSION, ENCRYPTION, INSTOPT, THREADID, MARS. # Data fields, in order: encryption = b"\x02" # NOT_SUP instopt = b"\x00" threadid = struct.pack(" None: nonlocal directory, data, running_offset directory += bytes([token]) + struct.pack(">H", running_offset) + struct.pack(">H", len(chunk)) data += chunk running_offset += len(chunk) add_option(0x00, version_data) add_option(0x01, encryption) add_option(0x02, instopt) add_option(0x03, threadid) add_option(0x04, mars) directory += b"\xff" payload = directory + data total_len = 8 + len(payload) header = struct.pack(">BBHBBBB", 0x04, 0x01, total_len, 0x00, 0x00, 0x01, 0x00) return header + payload _PRELOGIN_RESP = _build_prelogin_response() def _log(event_type: str, severity: int = 6, **kwargs) -> None: line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs) write_syslog_file(line) forward_syslog(line, LOG_TARGET) def _tds_error_packet(message: str) -> bytes: msg_enc = message.encode("utf-16-le") # Token type 0xAA = ERROR, followed by length, error number, state, class, msg_len, msg token = ( b"\xaa" + struct.pack("BBHBBBB", 0x04, 0x01, len(payload) + 8, 0x00, 0x00, 0x01, 0x00) return header + payload class MSSQLProtocol(asyncio.Protocol): def __init__(self): self._transport = None self._peer = None self._buf = b"" self._prelogin_done = False def connection_made(self, transport): self._transport = transport self._peer = transport.get_extra_info("peername", ("?", 0)) _log("connect", src=self._peer[0], src_port=self._peer[1]) def data_received(self, data): self._buf += data while len(self._buf) >= 8: pkt_type = self._buf[0] pkt_len = struct.unpack(">H", self._buf[2:4])[0] if pkt_len < 8: _log("unknown_packet", src=self._peer[0], pkt_type=hex(pkt_type)) self._transport.close() self._buf = b"" return if len(self._buf) < pkt_len: break payload = self._buf[8:pkt_len] self._buf = self._buf[pkt_len:] self._handle_packet(pkt_type, payload) if self._transport.is_closing(): self._buf = b"" break def _handle_packet(self, pkt_type: int, payload: bytes): if pkt_type == 0x12: # Pre-login self._transport.write(_PRELOGIN_RESP) self._prelogin_done = True elif pkt_type == 0x10: # Login7 username, password = self._parse_login7_creds(payload) extra: dict = {} if password: pw_bytes = password.encode("utf-8") extra = { "principal": username, "secret_kind": "plaintext", "secret_printable": password, "secret_b64": base64.b64encode(pw_bytes).decode("ascii"), } _log("auth", src=self._peer[0], username=username, **extra) self._transport.write(_tds_error_packet("Login failed for user.")) self._transport.close() else: _log("unknown_packet", src=self._peer[0], pkt_type=hex(pkt_type)) self._transport.close() @staticmethod def _deobfuscate_login7_password(blob: bytes) -> str: """MS-TDS Login7 password obfuscation: each byte was rotated-right 4 bits then XOR'd with 0xa5. Inverse is XOR 0xa5 then rotate-left 4 bits (== nibble swap). Plaintext-recoverable. After deobfuscation the bytes are UTF-16-LE encoded characters.""" out = bytearray(len(blob)) for i, b in enumerate(blob): x = b ^ 0xa5 out[i] = ((x & 0x0f) << 4) | ((x & 0xf0) >> 4) return bytes(out).decode("utf-16-le", errors="replace") def _parse_login7_creds(self, payload: bytes) -> tuple[str, str]: """Login7 offset table starts at payload offset 36: 36-37 ibHostName 38-39 cchHostName 40-41 ibUserName 42-43 cchUserName 44-45 ibPassword 46-47 cchPassword Both lengths are CHARACTER counts; multiply by 2 for byte length. The password field is XOR/swap-obfuscated — see :meth:`_deobfuscate_login7_password`. Plaintext-recoverable. """ try: if len(payload) < 48: return "", "" user_off, user_len = struct.unpack("", "" def connection_lost(self, exc): _log("disconnect", src=self._peer[0] if self._peer else "?") async def main(): _log("startup", msg=f"MSSQL server starting as {NODE_NAME}") loop = asyncio.get_running_loop() server = await loop.create_server(MSSQLProtocol, "0.0.0.0", 1433) # nosec B104 async with server: await server.serve_forever() if __name__ == "__main__": asyncio.run(main())