Files
DECNET/decnet/prober/hassh.py
anti 2d65d74069 chore: fix ruff lint errors, bandit suppressions, and pin pip>=26.0
Remove unused imports (ruff F401), suppress B324 false positives on
spec-mandated MD5 in HASSH/JA3/JA3S fingerprinting, drop unused
record_version assignment in JARM parser, and pin pip>=26.0 in dev
deps to address CVE-2025-8869 and CVE-2026-1703.
2026-04-14 17:32:18 -04:00

249 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HASSHServer — SSH server fingerprinting via KEX_INIT algorithm ordering.
Connects to an SSH server, completes the version exchange, captures the
server's SSH_MSG_KEXINIT message, and hashes the server-to-client algorithm
fields (kex, encryption, MAC, compression) into a 32-character MD5 digest.
This is the *server* variant of HASSH (HASSHServer). It fingerprints what
the server *offers*, which identifies the SSH implementation (OpenSSH,
Paramiko, libssh, Cobalt Strike SSH, etc.).
Stdlib only (socket, struct, hashlib). No DECNET imports.
"""
from __future__ import annotations
import hashlib
import socket
import struct
from typing import Any
# SSH protocol constants
_SSH_MSG_KEXINIT = 20
_KEX_INIT_COOKIE_LEN = 16
_KEX_INIT_NAME_LISTS = 10 # 10 name-list fields in KEX_INIT
# Blend in as a normal OpenSSH client
_CLIENT_BANNER = b"SSH-2.0-OpenSSH_9.6\r\n"
# Max bytes to read for server banner
_MAX_BANNER_LEN = 256
# Max bytes for a single SSH packet (KEX_INIT is typically < 2KB)
_MAX_PACKET_LEN = 35000
# ─── SSH connection + KEX_INIT capture ──────────────────────────────────────
def _ssh_connect(
host: str,
port: int,
timeout: float,
) -> tuple[str, bytes] | None:
"""
TCP connect, exchange version strings, read server's KEX_INIT.
Returns (server_banner, kex_init_payload) or None on failure.
The kex_init_payload starts at the SSH_MSG_KEXINIT type byte.
"""
sock = None
try:
sock = socket.create_connection((host, port), timeout=timeout)
sock.settimeout(timeout)
# 1. Read server banner (line ending \r\n or \n)
banner = _read_banner(sock)
if banner is None or not banner.startswith("SSH-"):
return None
# 2. Send our client version string
sock.sendall(_CLIENT_BANNER)
# 3. Read the server's first binary packet (should be KEX_INIT)
payload = _read_ssh_packet(sock)
if payload is None or len(payload) < 1:
return None
if payload[0] != _SSH_MSG_KEXINIT:
return None
return (banner, payload)
except (OSError, socket.timeout, TimeoutError, ConnectionError):
return None
finally:
if sock is not None:
try:
sock.close()
except OSError:
pass
def _read_banner(sock: socket.socket) -> str | None:
"""Read the SSH version banner line from the socket."""
buf = b""
while len(buf) < _MAX_BANNER_LEN:
try:
byte = sock.recv(1)
except (OSError, socket.timeout, TimeoutError):
return None
if not byte:
return None
buf += byte
if buf.endswith(b"\n"):
break
try:
return buf.decode("utf-8", errors="replace").rstrip("\r\n")
except Exception:
return None
def _read_ssh_packet(sock: socket.socket) -> bytes | None:
"""
Read a single SSH binary packet and return its payload.
SSH binary packet format:
uint32 packet_length (not including itself or MAC)
byte padding_length
byte[] payload (packet_length - padding_length - 1)
byte[] padding
"""
header = _recv_exact(sock, 4)
if header is None:
return None
packet_length = struct.unpack("!I", header)[0]
if packet_length < 2 or packet_length > _MAX_PACKET_LEN:
return None
rest = _recv_exact(sock, packet_length)
if rest is None:
return None
padding_length = rest[0]
payload_length = packet_length - padding_length - 1
if payload_length < 1 or payload_length > len(rest) - 1:
return None
return rest[1 : 1 + payload_length]
def _recv_exact(sock: socket.socket, n: int) -> bytes | None:
"""Read exactly n bytes from socket, or None on failure."""
buf = b""
while len(buf) < n:
try:
chunk = sock.recv(n - len(buf))
except (OSError, socket.timeout, TimeoutError):
return None
if not chunk:
return None
buf += chunk
return buf
# ─── KEX_INIT parsing ──────────────────────────────────────────────────────
def _parse_kex_init(payload: bytes) -> dict[str, str] | None:
"""
Parse SSH_MSG_KEXINIT payload and extract the 10 name-list fields.
Payload layout:
byte SSH_MSG_KEXINIT (20)
byte[16] cookie
10 × name-list:
uint32 length
byte[] utf-8 string (comma-separated algorithm names)
bool first_kex_packet_follows
uint32 reserved
Returns dict with keys: kex_algorithms, server_host_key_algorithms,
encryption_client_to_server, encryption_server_to_client,
mac_client_to_server, mac_server_to_client,
compression_client_to_server, compression_server_to_client,
languages_client_to_server, languages_server_to_client.
"""
if len(payload) < 1 + _KEX_INIT_COOKIE_LEN + 4:
return None
offset = 1 + _KEX_INIT_COOKIE_LEN # skip type byte + cookie
field_names = [
"kex_algorithms",
"server_host_key_algorithms",
"encryption_client_to_server",
"encryption_server_to_client",
"mac_client_to_server",
"mac_server_to_client",
"compression_client_to_server",
"compression_server_to_client",
"languages_client_to_server",
"languages_server_to_client",
]
fields: dict[str, str] = {}
for name in field_names:
if offset + 4 > len(payload):
return None
length = struct.unpack("!I", payload[offset : offset + 4])[0]
offset += 4
if offset + length > len(payload):
return None
fields[name] = payload[offset : offset + length].decode(
"utf-8", errors="replace"
)
offset += length
return fields
# ─── HASSH computation ──────────────────────────────────────────────────────
def _compute_hassh(kex: str, enc: str, mac: str, comp: str) -> str:
"""
Compute HASSHServer hash: MD5 of "kex;enc_s2c;mac_s2c;comp_s2c".
Returns 32-character lowercase hex digest.
"""
raw = f"{kex};{enc};{mac};{comp}"
return hashlib.md5(raw.encode("utf-8")).hexdigest() # nosec B324
# ─── Public API ─────────────────────────────────────────────────────────────
def hassh_server(
host: str,
port: int,
timeout: float = 5.0,
) -> dict[str, Any] | None:
"""
Connect to an SSH server and compute its HASSHServer fingerprint.
Returns a dict with the hash, banner, and raw algorithm fields,
or None if the host is not running an SSH server on the given port.
"""
result = _ssh_connect(host, port, timeout)
if result is None:
return None
banner, payload = result
fields = _parse_kex_init(payload)
if fields is None:
return None
kex = fields["kex_algorithms"]
enc = fields["encryption_server_to_client"]
mac = fields["mac_server_to_client"]
comp = fields["compression_server_to_client"]
return {
"hassh_server": _compute_hassh(kex, enc, mac, comp),
"banner": banner,
"kex_algorithms": kex,
"encryption_s2c": enc,
"mac_s2c": mac,
"compression_s2c": comp,
}