diff --git a/decnet/cli.py b/decnet/cli.py index 0e503e6..947781b 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -324,7 +324,7 @@ def probe( timeout: float = typer.Option(5.0, "--timeout", help="Per-probe TCP timeout in seconds"), daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background (used by deploy, no console output)"), ) -> None: - """JARM-fingerprint all attackers discovered in the log stream.""" + """Fingerprint attackers (JARM + HASSH + TCP/IP stack) discovered in the log stream.""" import asyncio from decnet.prober import prober_worker diff --git a/decnet/prober/hassh.py b/decnet/prober/hassh.py new file mode 100644 index 0000000..9068e07 --- /dev/null +++ b/decnet/prober/hassh.py @@ -0,0 +1,248 @@ +""" +HASSHServer — SSH server fingerprinting via KEX_INIT algorithm ordering. + +Connects to an SSH server, completes the version exchange, captures the +server's SSH_MSG_KEXINIT message, and hashes the server-to-client algorithm +fields (kex, encryption, MAC, compression) into a 32-character MD5 digest. + +This is the *server* variant of HASSH (HASSHServer). It fingerprints what +the server *offers*, which identifies the SSH implementation (OpenSSH, +Paramiko, libssh, Cobalt Strike SSH, etc.). + +Stdlib only (socket, struct, hashlib). No DECNET imports. +""" + +from __future__ import annotations + +import hashlib +import socket +import struct +from typing import Any + +# SSH protocol constants +_SSH_MSG_KEXINIT = 20 +_KEX_INIT_COOKIE_LEN = 16 +_KEX_INIT_NAME_LISTS = 10 # 10 name-list fields in KEX_INIT + +# Blend in as a normal OpenSSH client +_CLIENT_BANNER = b"SSH-2.0-OpenSSH_9.6\r\n" + +# Max bytes to read for server banner +_MAX_BANNER_LEN = 256 + +# Max bytes for a single SSH packet (KEX_INIT is typically < 2KB) +_MAX_PACKET_LEN = 35000 + + +# ─── SSH connection + KEX_INIT capture ────────────────────────────────────── + +def _ssh_connect( + host: str, + port: int, + timeout: float, +) -> tuple[str, bytes] | None: + """ + TCP connect, exchange version strings, read server's KEX_INIT. + + Returns (server_banner, kex_init_payload) or None on failure. + The kex_init_payload starts at the SSH_MSG_KEXINIT type byte. + """ + sock = None + try: + sock = socket.create_connection((host, port), timeout=timeout) + sock.settimeout(timeout) + + # 1. Read server banner (line ending \r\n or \n) + banner = _read_banner(sock) + if banner is None or not banner.startswith("SSH-"): + return None + + # 2. Send our client version string + sock.sendall(_CLIENT_BANNER) + + # 3. Read the server's first binary packet (should be KEX_INIT) + payload = _read_ssh_packet(sock) + if payload is None or len(payload) < 1: + return None + + if payload[0] != _SSH_MSG_KEXINIT: + return None + + return (banner, payload) + + except (OSError, socket.timeout, TimeoutError, ConnectionError): + return None + finally: + if sock is not None: + try: + sock.close() + except OSError: + pass + + +def _read_banner(sock: socket.socket) -> str | None: + """Read the SSH version banner line from the socket.""" + buf = b"" + while len(buf) < _MAX_BANNER_LEN: + try: + byte = sock.recv(1) + except (OSError, socket.timeout, TimeoutError): + return None + if not byte: + return None + buf += byte + if buf.endswith(b"\n"): + break + + try: + return buf.decode("utf-8", errors="replace").rstrip("\r\n") + except Exception: + return None + + +def _read_ssh_packet(sock: socket.socket) -> bytes | None: + """ + Read a single SSH binary packet and return its payload. + + SSH binary packet format: + uint32 packet_length (not including itself or MAC) + byte padding_length + byte[] payload (packet_length - padding_length - 1) + byte[] padding + """ + header = _recv_exact(sock, 4) + if header is None: + return None + + packet_length = struct.unpack("!I", header)[0] + if packet_length < 2 or packet_length > _MAX_PACKET_LEN: + return None + + rest = _recv_exact(sock, packet_length) + if rest is None: + return None + + padding_length = rest[0] + payload_length = packet_length - padding_length - 1 + if payload_length < 1 or payload_length > len(rest) - 1: + return None + + return rest[1 : 1 + payload_length] + + +def _recv_exact(sock: socket.socket, n: int) -> bytes | None: + """Read exactly n bytes from socket, or None on failure.""" + buf = b"" + while len(buf) < n: + try: + chunk = sock.recv(n - len(buf)) + except (OSError, socket.timeout, TimeoutError): + return None + if not chunk: + return None + buf += chunk + return buf + + +# ─── KEX_INIT parsing ────────────────────────────────────────────────────── + +def _parse_kex_init(payload: bytes) -> dict[str, str] | None: + """ + Parse SSH_MSG_KEXINIT payload and extract the 10 name-list fields. + + Payload layout: + byte SSH_MSG_KEXINIT (20) + byte[16] cookie + 10 × name-list: + uint32 length + byte[] utf-8 string (comma-separated algorithm names) + bool first_kex_packet_follows + uint32 reserved + + Returns dict with keys: kex_algorithms, server_host_key_algorithms, + encryption_client_to_server, encryption_server_to_client, + mac_client_to_server, mac_server_to_client, + compression_client_to_server, compression_server_to_client, + languages_client_to_server, languages_server_to_client. + """ + if len(payload) < 1 + _KEX_INIT_COOKIE_LEN + 4: + return None + + offset = 1 + _KEX_INIT_COOKIE_LEN # skip type byte + cookie + + field_names = [ + "kex_algorithms", + "server_host_key_algorithms", + "encryption_client_to_server", + "encryption_server_to_client", + "mac_client_to_server", + "mac_server_to_client", + "compression_client_to_server", + "compression_server_to_client", + "languages_client_to_server", + "languages_server_to_client", + ] + + fields: dict[str, str] = {} + for name in field_names: + if offset + 4 > len(payload): + return None + length = struct.unpack("!I", payload[offset : offset + 4])[0] + offset += 4 + if offset + length > len(payload): + return None + fields[name] = payload[offset : offset + length].decode( + "utf-8", errors="replace" + ) + offset += length + + return fields + + +# ─── HASSH computation ────────────────────────────────────────────────────── + +def _compute_hassh(kex: str, enc: str, mac: str, comp: str) -> str: + """ + Compute HASSHServer hash: MD5 of "kex;enc_s2c;mac_s2c;comp_s2c". + + Returns 32-character lowercase hex digest. + """ + raw = f"{kex};{enc};{mac};{comp}" + return hashlib.md5(raw.encode("utf-8")).hexdigest() + + +# ─── Public API ───────────────────────────────────────────────────────────── + +def hassh_server( + host: str, + port: int, + timeout: float = 5.0, +) -> dict[str, Any] | None: + """ + Connect to an SSH server and compute its HASSHServer fingerprint. + + Returns a dict with the hash, banner, and raw algorithm fields, + or None if the host is not running an SSH server on the given port. + """ + result = _ssh_connect(host, port, timeout) + if result is None: + return None + + banner, payload = result + fields = _parse_kex_init(payload) + if fields is None: + return None + + kex = fields["kex_algorithms"] + enc = fields["encryption_server_to_client"] + mac = fields["mac_server_to_client"] + comp = fields["compression_server_to_client"] + + return { + "hassh_server": _compute_hassh(kex, enc, mac, comp), + "banner": banner, + "kex_algorithms": kex, + "encryption_s2c": enc, + "mac_s2c": mac, + "compression_s2c": comp, + } diff --git a/decnet/prober/tcpfp.py b/decnet/prober/tcpfp.py new file mode 100644 index 0000000..8044c63 --- /dev/null +++ b/decnet/prober/tcpfp.py @@ -0,0 +1,223 @@ +""" +TCP/IP stack fingerprinting via SYN-ACK analysis. + +Sends a crafted TCP SYN packet to a target host:port, captures the +SYN-ACK response, and extracts OS/tool-identifying characteristics: +TTL, window size, DF bit, MSS, window scale, SACK support, timestamps, +and TCP options ordering. + +Uses scapy for packet crafting and parsing. Requires root/CAP_NET_RAW. +""" + +from __future__ import annotations + +import hashlib +import random +from typing import Any + +# Lazy-import scapy to avoid breaking non-root usage of HASSH/JARM. +# The actual import happens inside functions that need it. + +# ─── TCP option short codes ───────────────────────────────────────────────── + +_OPT_CODES: dict[str, str] = { + "MSS": "M", + "WScale": "W", + "SAckOK": "S", + "SAck": "S", + "Timestamp": "T", + "NOP": "N", + "EOL": "E", + "AltChkSum": "A", + "AltChkSumOpt": "A", + "UTO": "U", +} + + +# ─── Packet construction ─────────────────────────────────────────────────── + +def _send_syn( + host: str, + port: int, + timeout: float, +) -> Any | None: + """ + Craft a TCP SYN with common options and send it. Returns the + SYN-ACK response packet or None on timeout/failure. + """ + from scapy.all import IP, TCP, conf, sr1 + + # Suppress scapy's noisy output + conf.verb = 0 + + src_port = random.randint(49152, 65535) + + pkt = ( + IP(dst=host) + / TCP( + sport=src_port, + dport=port, + flags="S", + options=[ + ("MSS", 1460), + ("NOP", None), + ("WScale", 7), + ("NOP", None), + ("NOP", None), + ("Timestamp", (0, 0)), + ("SAckOK", b""), + ("EOL", None), + ], + ) + ) + + try: + resp = sr1(pkt, timeout=timeout, verbose=0) + except (OSError, PermissionError): + return None + + if resp is None: + return None + + # Verify it's a SYN-ACK (flags == 0x12) + from scapy.all import TCP as TCPLayer + if not resp.haslayer(TCPLayer): + return None + if resp[TCPLayer].flags != 0x12: # SYN-ACK + return None + + # Send RST to clean up half-open connection + _send_rst(host, port, src_port, resp) + + return resp + + +def _send_rst( + host: str, + dport: int, + sport: int, + resp: Any, +) -> None: + """Send RST to clean up the half-open connection.""" + try: + from scapy.all import IP, TCP, send + rst = ( + IP(dst=host) + / TCP( + sport=sport, + dport=dport, + flags="R", + seq=resp.ack, + ) + ) + send(rst, verbose=0) + except Exception: + pass # Best-effort cleanup + + +# ─── Response parsing ─────────────────────────────────────────────────────── + +def _parse_synack(resp: Any) -> dict[str, Any]: + """ + Extract fingerprint fields from a scapy SYN-ACK response packet. + """ + from scapy.all import IP, TCP + + ip_layer = resp[IP] + tcp_layer = resp[TCP] + + # IP fields + ttl = ip_layer.ttl + df_bit = 1 if (ip_layer.flags & 0x2) else 0 # DF = bit 1 + ip_id = ip_layer.id + + # TCP fields + window_size = tcp_layer.window + + # Parse TCP options + mss = 0 + window_scale = -1 + sack_ok = 0 + timestamp = 0 + options_order = _extract_options_order(tcp_layer.options) + + for opt_name, opt_value in tcp_layer.options: + if opt_name == "MSS": + mss = opt_value + elif opt_name == "WScale": + window_scale = opt_value + elif opt_name in ("SAckOK", "SAck"): + sack_ok = 1 + elif opt_name == "Timestamp": + timestamp = 1 + + return { + "ttl": ttl, + "window_size": window_size, + "df_bit": df_bit, + "ip_id": ip_id, + "mss": mss, + "window_scale": window_scale, + "sack_ok": sack_ok, + "timestamp": timestamp, + "options_order": options_order, + } + + +def _extract_options_order(options: list[tuple[str, Any]]) -> str: + """ + Map scapy TCP option tuples to a short-code string. + + E.g. [("MSS", 1460), ("NOP", None), ("WScale", 7)] → "M,N,W" + """ + codes = [] + for opt_name, _ in options: + code = _OPT_CODES.get(opt_name, "?") + codes.append(code) + return ",".join(codes) + + +# ─── Fingerprint computation ─────────────────────────────────────────────── + +def _compute_fingerprint(fields: dict[str, Any]) -> tuple[str, str]: + """ + Compute fingerprint raw string and SHA256 hash from parsed fields. + + Returns (raw_string, hash_hex_32). + """ + raw = ( + f"{fields['ttl']}:{fields['window_size']}:{fields['df_bit']}:" + f"{fields['mss']}:{fields['window_scale']}:{fields['sack_ok']}:" + f"{fields['timestamp']}:{fields['options_order']}" + ) + h = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32] + return raw, h + + +# ─── Public API ───────────────────────────────────────────────────────────── + +def tcp_fingerprint( + host: str, + port: int, + timeout: float = 5.0, +) -> dict[str, Any] | None: + """ + Send a TCP SYN to host:port and fingerprint the SYN-ACK response. + + Returns a dict with the hash, raw fingerprint string, and individual + fields, or None if no SYN-ACK was received. + + Requires root/CAP_NET_RAW. + """ + resp = _send_syn(host, port, timeout) + if resp is None: + return None + + fields = _parse_synack(resp) + raw, h = _compute_fingerprint(fields) + + return { + "tcpfp_hash": h, + "tcpfp_raw": raw, + **fields, + } diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index ba133cb..e5769f0 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -2,7 +2,11 @@ DECNET-PROBER standalone worker. Runs as a detached host-level process. Discovers attacker IPs by tailing the -collector's JSON log file, then JARM-probes them on common C2/TLS ports. +collector's JSON log file, then fingerprints them via multiple active probes: +- JARM (TLS server fingerprinting) +- HASSHServer (SSH server fingerprinting) +- TCP/IP stack fingerprinting (OS/tool identification) + Results are written as RFC 5424 syslog + JSON to the same log files. Target discovery is fully automatic — every unique attacker IP seen in the @@ -23,17 +27,25 @@ from pathlib import Path from typing import Any from decnet.logging import get_logger +from decnet.prober.hassh import hassh_server from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash +from decnet.prober.tcpfp import tcp_fingerprint logger = get_logger("prober") -# ─── Default ports to JARM-probe on each attacker IP ───────────────────────── -# Common C2 callback / TLS server ports (Cobalt Strike, Sliver, Metasploit, etc.) +# ─── Default ports per probe type ─────────────────────────────────────────── +# JARM: common C2 callback / TLS server ports DEFAULT_PROBE_PORTS: list[int] = [ 443, 8443, 8080, 4443, 50050, 2222, 993, 995, 8888, 9001, ] +# HASSHServer: common SSH server ports +DEFAULT_SSH_PORTS: list[int] = [22, 2222, 22222, 2022] + +# TCP/IP stack: probe on common service ports +DEFAULT_TCPFP_PORTS: list[int] = [80, 443] + # ─── RFC 5424 formatting (inline, mirrors templates/*/decnet_logging.py) ───── _FACILITY_LOCAL0 = 16 @@ -208,62 +220,175 @@ def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]: def _probe_cycle( targets: set[str], - probed: dict[str, set[int]], - ports: list[int], + probed: dict[str, dict[str, set[int]]], + jarm_ports: list[int], + ssh_ports: list[int], + tcpfp_ports: list[int], log_path: Path, json_path: Path, timeout: float = 5.0, ) -> None: """ - Probe all known attacker IPs on the configured ports. + Probe all known attacker IPs with JARM, HASSH, and TCP/IP fingerprinting. Args: targets: set of attacker IPs to probe - probed: dict mapping IP -> set of ports already successfully probed - ports: list of ports to probe on each IP + probed: dict mapping IP -> {probe_type -> set of ports already probed} + jarm_ports: TLS ports for JARM fingerprinting + ssh_ports: SSH ports for HASSHServer fingerprinting + tcpfp_ports: ports for TCP/IP stack fingerprinting log_path: RFC 5424 log file json_path: JSON log file timeout: per-probe TCP timeout """ for ip in sorted(targets): - already_done = probed.get(ip, set()) - ports_to_probe = [p for p in ports if p not in already_done] + ip_probed = probed.setdefault(ip, {}) - if not ports_to_probe: + # Phase 1: JARM (TLS fingerprinting) + _jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout) + + # Phase 2: HASSHServer (SSH fingerprinting) + _hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout) + + # Phase 3: TCP/IP stack fingerprinting + _tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout) + + +def _jarm_phase( + ip: str, + ip_probed: dict[str, set[int]], + ports: list[int], + log_path: Path, + json_path: Path, + timeout: float, +) -> None: + """JARM-fingerprint an IP on the given TLS ports.""" + done = ip_probed.setdefault("jarm", set()) + for port in ports: + if port in done: continue + try: + h = jarm_hash(ip, port, timeout=timeout) + done.add(port) + if h == JARM_EMPTY_HASH: + continue + _write_event( + log_path, json_path, + "jarm_fingerprint", + target_ip=ip, + target_port=str(port), + jarm_hash=h, + msg=f"JARM {ip}:{port} = {h}", + ) + logger.info("prober: JARM %s:%d = %s", ip, port, h) + except Exception as exc: + done.add(port) + _write_event( + log_path, json_path, + "prober_error", + severity=_SEVERITY_WARNING, + target_ip=ip, + target_port=str(port), + error=str(exc), + msg=f"JARM probe failed for {ip}:{port}: {exc}", + ) + logger.warning("prober: JARM probe failed %s:%d: %s", ip, port, exc) - for port in ports_to_probe: - try: - h = jarm_hash(ip, port, timeout=timeout) - if h == JARM_EMPTY_HASH: - # No TLS server on this port — don't log, don't reprobed - probed.setdefault(ip, set()).add(port) - continue - _write_event( - log_path, json_path, - "jarm_fingerprint", - target_ip=ip, - target_port=str(port), - jarm_hash=h, - msg=f"JARM {ip}:{port} = {h}", - ) - logger.info("prober: JARM %s:%d = %s", ip, port, h) - probed.setdefault(ip, set()).add(port) +def _hassh_phase( + ip: str, + ip_probed: dict[str, set[int]], + ports: list[int], + log_path: Path, + json_path: Path, + timeout: float, +) -> None: + """HASSHServer-fingerprint an IP on the given SSH ports.""" + done = ip_probed.setdefault("hassh", set()) + for port in ports: + if port in done: + continue + try: + result = hassh_server(ip, port, timeout=timeout) + done.add(port) + if result is None: + continue + _write_event( + log_path, json_path, + "hassh_fingerprint", + target_ip=ip, + target_port=str(port), + hassh_server_hash=result["hassh_server"], + ssh_banner=result["banner"], + kex_algorithms=result["kex_algorithms"], + encryption_s2c=result["encryption_s2c"], + mac_s2c=result["mac_s2c"], + compression_s2c=result["compression_s2c"], + msg=f"HASSH {ip}:{port} = {result['hassh_server']}", + ) + logger.info("prober: HASSH %s:%d = %s", ip, port, result["hassh_server"]) + except Exception as exc: + done.add(port) + _write_event( + log_path, json_path, + "prober_error", + severity=_SEVERITY_WARNING, + target_ip=ip, + target_port=str(port), + error=str(exc), + msg=f"HASSH probe failed for {ip}:{port}: {exc}", + ) + logger.warning("prober: HASSH probe failed %s:%d: %s", ip, port, exc) - except Exception as exc: - _write_event( - log_path, json_path, - "prober_error", - severity=_SEVERITY_WARNING, - target_ip=ip, - target_port=str(port), - error=str(exc), - msg=f"JARM probe failed for {ip}:{port}: {exc}", - ) - logger.warning("prober: JARM probe failed %s:%d: %s", ip, port, exc) - # Mark as probed to avoid infinite retries - probed.setdefault(ip, set()).add(port) + +def _tcpfp_phase( + ip: str, + ip_probed: dict[str, set[int]], + ports: list[int], + log_path: Path, + json_path: Path, + timeout: float, +) -> None: + """TCP/IP stack fingerprint an IP on the given ports.""" + done = ip_probed.setdefault("tcpfp", set()) + for port in ports: + if port in done: + continue + try: + result = tcp_fingerprint(ip, port, timeout=timeout) + done.add(port) + if result is None: + continue + _write_event( + log_path, json_path, + "tcpfp_fingerprint", + target_ip=ip, + target_port=str(port), + tcpfp_hash=result["tcpfp_hash"], + tcpfp_raw=result["tcpfp_raw"], + ttl=str(result["ttl"]), + window_size=str(result["window_size"]), + df_bit=str(result["df_bit"]), + mss=str(result["mss"]), + window_scale=str(result["window_scale"]), + sack_ok=str(result["sack_ok"]), + timestamp=str(result["timestamp"]), + options_order=result["options_order"], + msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}", + ) + logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"]) + except Exception as exc: + done.add(port) + _write_event( + log_path, json_path, + "prober_error", + severity=_SEVERITY_WARNING, + target_ip=ip, + target_port=str(port), + error=str(exc), + msg=f"TCPFP probe failed for {ip}:{port}: {exc}", + ) + logger.warning("prober: TCPFP probe failed %s:%d: %s", ip, port, exc) # ─── Main worker ───────────────────────────────────────────────────────────── @@ -273,41 +398,52 @@ async def prober_worker( interval: int = 300, timeout: float = 5.0, ports: list[int] | None = None, + ssh_ports: list[int] | None = None, + tcpfp_ports: list[int] | None = None, ) -> None: """ Main entry point for the standalone prober process. Discovers attacker IPs automatically by tailing the JSON log file, - then JARM-probes each IP on common C2 ports. + then fingerprints each IP via JARM, HASSH, and TCP/IP stack probes. Args: log_file: base path for log files (RFC 5424 to .log, JSON to .json) interval: seconds between probe cycles timeout: per-probe TCP timeout - ports: list of ports to probe (defaults to DEFAULT_PROBE_PORTS) + ports: JARM TLS ports (defaults to DEFAULT_PROBE_PORTS) + ssh_ports: HASSH SSH ports (defaults to DEFAULT_SSH_PORTS) + tcpfp_ports: TCP fingerprint ports (defaults to DEFAULT_TCPFP_PORTS) """ - probe_ports = ports or DEFAULT_PROBE_PORTS + jarm_ports = ports or DEFAULT_PROBE_PORTS + hassh_ports = ssh_ports or DEFAULT_SSH_PORTS + tcp_ports = tcpfp_ports or DEFAULT_TCPFP_PORTS + + all_ports_str = ( + f"jarm={','.join(str(p) for p in jarm_ports)} " + f"ssh={','.join(str(p) for p in hassh_ports)} " + f"tcpfp={','.join(str(p) for p in tcp_ports)}" + ) log_path = Path(log_file) json_path = log_path.with_suffix(".json") log_path.parent.mkdir(parents=True, exist_ok=True) logger.info( - "prober started interval=%ds ports=%s log=%s", - interval, ",".join(str(p) for p in probe_ports), log_path, + "prober started interval=%ds %s log=%s", + interval, all_ports_str, log_path, ) _write_event( log_path, json_path, "prober_startup", interval=str(interval), - probe_ports=",".join(str(p) for p in probe_ports), - msg=f"DECNET-PROBER started, interval {interval}s, " - f"ports {','.join(str(p) for p in probe_ports)}", + probe_ports=all_ports_str, + msg=f"DECNET-PROBER started, interval {interval}s, {all_ports_str}", ) known_attackers: set[str] = set() - probed: dict[str, set[int]] = {} # IP -> set of ports already probed + probed: dict[str, dict[str, set[int]]] = {} # IP -> {type -> ports} log_position: int = 0 while True: @@ -326,7 +462,8 @@ async def prober_worker( if known_attackers: await asyncio.to_thread( - _probe_cycle, known_attackers, probed, probe_ports, + _probe_cycle, known_attackers, probed, + jarm_ports, hassh_ports, tcp_ports, log_path, json_path, timeout, ) diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index c9a318b..513e958 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -218,3 +218,49 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non "target_port": _fields.get("target_port"), }, }) + + # 10. HASSHServer fingerprint from active prober + _hassh = _fields.get("hassh_server_hash") + if _hassh and log_data.get("service") == "prober": + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": "prober", + "attacker_ip": _fields.get("target_ip", "Unknown"), + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "hassh_server", + "hash": _hassh, + "target_ip": _fields.get("target_ip"), + "target_port": _fields.get("target_port"), + "ssh_banner": _fields.get("ssh_banner"), + "kex_algorithms": _fields.get("kex_algorithms"), + "encryption_s2c": _fields.get("encryption_s2c"), + "mac_s2c": _fields.get("mac_s2c"), + "compression_s2c": _fields.get("compression_s2c"), + }, + }) + + # 11. TCP/IP stack fingerprint from active prober + _tcpfp = _fields.get("tcpfp_hash") + if _tcpfp and log_data.get("service") == "prober": + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": "prober", + "attacker_ip": _fields.get("target_ip", "Unknown"), + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "tcpfp", + "hash": _tcpfp, + "raw": _fields.get("tcpfp_raw"), + "target_ip": _fields.get("target_ip"), + "target_port": _fields.get("target_port"), + "ttl": _fields.get("ttl"), + "window_size": _fields.get("window_size"), + "df_bit": _fields.get("df_bit"), + "mss": _fields.get("mss"), + "window_scale": _fields.get("window_scale"), + "sack_ok": _fields.get("sack_ok"), + "timestamp": _fields.get("timestamp"), + "options_order": _fields.get("options_order"), + }, + }) diff --git a/tests/test_prober_bounty.py b/tests/test_prober_bounty.py index 7864550..e09a46a 100644 --- a/tests/test_prober_bounty.py +++ b/tests/test_prober_bounty.py @@ -1,8 +1,9 @@ """ -Tests for JARM bounty extraction in the ingester. +Tests for prober bounty extraction in the ingester. -Verifies that _extract_bounty() correctly identifies and stores JARM -fingerprints from prober events, and ignores JARM fields from other services. +Verifies that _extract_bounty() correctly identifies and stores JARM, +HASSH, and TCP/IP fingerprints from prober events, and ignores these +fields when they come from other services. """ from __future__ import annotations @@ -112,3 +113,131 @@ async def test_jarm_bounty_missing_fields_dict(): for call in repo.add_bounty.call_args_list: payload = call[0][0].get("payload", {}) assert payload.get("fingerprint_type") != "jarm" + + +# ─── HASSH bounty extraction ─────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_hassh_bounty_extracted(): + """Prober event with hassh_server_hash should create a fingerprint bounty.""" + repo = _make_repo() + log_data = { + "decky": "decnet-prober", + "service": "prober", + "event_type": "hassh_fingerprint", + "attacker_ip": "Unknown", + "fields": { + "target_ip": "10.0.0.1", + "target_port": "22", + "hassh_server_hash": "a" * 32, + "ssh_banner": "SSH-2.0-OpenSSH_8.9p1", + "kex_algorithms": "curve25519-sha256", + "encryption_s2c": "aes256-gcm@openssh.com", + "mac_s2c": "hmac-sha2-256-etm@openssh.com", + "compression_s2c": "none", + }, + "msg": "HASSH 10.0.0.1:22 = ...", + } + + await _extract_bounty(repo, log_data) + + # Find the HASSH bounty call + hassh_calls = [ + c for c in repo.add_bounty.call_args_list + if c[0][0].get("payload", {}).get("fingerprint_type") == "hassh_server" + ] + assert len(hassh_calls) == 1 + payload = hassh_calls[0][0][0]["payload"] + assert payload["hash"] == "a" * 32 + assert payload["ssh_banner"] == "SSH-2.0-OpenSSH_8.9p1" + assert payload["kex_algorithms"] == "curve25519-sha256" + assert payload["encryption_s2c"] == "aes256-gcm@openssh.com" + assert payload["mac_s2c"] == "hmac-sha2-256-etm@openssh.com" + assert payload["compression_s2c"] == "none" + + +@pytest.mark.asyncio +async def test_hassh_bounty_not_extracted_from_other_services(): + """A non-prober event with hassh_server_hash should NOT trigger extraction.""" + repo = _make_repo() + log_data = { + "decky": "decky-01", + "service": "ssh", + "event_type": "login_attempt", + "attacker_ip": "192.168.1.50", + "fields": { + "hassh_server_hash": "fake_hash", + }, + "msg": "", + } + + await _extract_bounty(repo, log_data) + + for call in repo.add_bounty.call_args_list: + payload = call[0][0].get("payload", {}) + assert payload.get("fingerprint_type") != "hassh_server" + + +# ─── TCP/IP fingerprint bounty extraction ────────────────────────────────── + +@pytest.mark.asyncio +async def test_tcpfp_bounty_extracted(): + """Prober event with tcpfp_hash should create a fingerprint bounty.""" + repo = _make_repo() + log_data = { + "decky": "decnet-prober", + "service": "prober", + "event_type": "tcpfp_fingerprint", + "attacker_ip": "Unknown", + "fields": { + "target_ip": "10.0.0.1", + "target_port": "443", + "tcpfp_hash": "d" * 32, + "tcpfp_raw": "64:65535:1:1460:7:1:1:M,N,W,N,N,T,S,E", + "ttl": "64", + "window_size": "65535", + "df_bit": "1", + "mss": "1460", + "window_scale": "7", + "sack_ok": "1", + "timestamp": "1", + "options_order": "M,N,W,N,N,T,S,E", + }, + "msg": "TCPFP 10.0.0.1:443 = ...", + } + + await _extract_bounty(repo, log_data) + + tcpfp_calls = [ + c for c in repo.add_bounty.call_args_list + if c[0][0].get("payload", {}).get("fingerprint_type") == "tcpfp" + ] + assert len(tcpfp_calls) == 1 + payload = tcpfp_calls[0][0][0]["payload"] + assert payload["hash"] == "d" * 32 + assert payload["raw"] == "64:65535:1:1460:7:1:1:M,N,W,N,N,T,S,E" + assert payload["ttl"] == "64" + assert payload["window_size"] == "65535" + assert payload["options_order"] == "M,N,W,N,N,T,S,E" + + +@pytest.mark.asyncio +async def test_tcpfp_bounty_not_extracted_from_other_services(): + """A non-prober event with tcpfp_hash should NOT trigger extraction.""" + repo = _make_repo() + log_data = { + "decky": "decky-01", + "service": "sniffer", + "event_type": "something", + "attacker_ip": "192.168.1.50", + "fields": { + "tcpfp_hash": "fake_hash", + }, + "msg": "", + } + + await _extract_bounty(repo, log_data) + + for call in repo.add_bounty.call_args_list: + payload = call[0][0].get("payload", {}) + assert payload.get("fingerprint_type") != "tcpfp" diff --git a/tests/test_prober_hassh.py b/tests/test_prober_hassh.py new file mode 100644 index 0000000..0252ba6 --- /dev/null +++ b/tests/test_prober_hassh.py @@ -0,0 +1,357 @@ +""" +Unit tests for the HASSHServer SSH fingerprinting module. + +Tests cover KEX_INIT parsing, HASSH hash computation, SSH connection +handling, and end-to-end hassh_server() with mocked sockets. +""" + +from __future__ import annotations + +import hashlib +import socket +import struct +from unittest.mock import MagicMock, patch + +import pytest + +from decnet.prober.hassh import ( + _CLIENT_BANNER, + _SSH_MSG_KEXINIT, + _compute_hassh, + _parse_kex_init, + _read_banner, + _read_ssh_packet, + hassh_server, +) + + +# ─── Helpers ──────────────────────────────────────────────────────────────── + +def _build_name_list(value: str) -> bytes: + """Encode a single SSH name-list (uint32 length + utf-8 string).""" + encoded = value.encode("utf-8") + return struct.pack("!I", len(encoded)) + encoded + + +def _build_kex_init( + kex: str = "curve25519-sha256,diffie-hellman-group14-sha256", + host_key: str = "ssh-ed25519,rsa-sha2-512", + enc_c2s: str = "aes256-gcm@openssh.com,aes128-gcm@openssh.com", + enc_s2c: str = "aes256-gcm@openssh.com,chacha20-poly1305@openssh.com", + mac_c2s: str = "hmac-sha2-256-etm@openssh.com", + mac_s2c: str = "hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com", + comp_c2s: str = "none,zlib@openssh.com", + comp_s2c: str = "none,zlib@openssh.com", + lang_c2s: str = "", + lang_s2c: str = "", + cookie: bytes | None = None, +) -> bytes: + """Build a complete SSH_MSG_KEXINIT payload for testing.""" + if cookie is None: + cookie = b"\x00" * 16 + + payload = struct.pack("B", _SSH_MSG_KEXINIT) + cookie + for value in [kex, host_key, enc_c2s, enc_s2c, mac_c2s, mac_s2c, + comp_c2s, comp_s2c, lang_c2s, lang_s2c]: + payload += _build_name_list(value) + # first_kex_packet_follows (bool) + reserved (uint32) + payload += struct.pack("!BI", 0, 0) + return payload + + +def _wrap_ssh_packet(payload: bytes) -> bytes: + """Wrap payload into an SSH binary packet (header only, no MAC).""" + # Padding to 8-byte boundary (minimum 4 bytes) + block_size = 8 + padding_needed = block_size - ((1 + len(payload)) % block_size) + if padding_needed < 4: + padding_needed += block_size + padding = b"\x00" * padding_needed + packet_length = 1 + len(payload) + len(padding) # padding_length(1) + payload + padding + return struct.pack("!IB", packet_length, padding_needed) + payload + padding + + +def _make_socket_with_data(data: bytes) -> MagicMock: + """Create a mock socket that yields data byte-by-byte or in chunks.""" + sock = MagicMock() + pos = [0] + + def recv(n): + if pos[0] >= len(data): + return b"" + chunk = data[pos[0] : pos[0] + n] + pos[0] += n + return chunk + + sock.recv = recv + return sock + + +# ─── _parse_kex_init ──────────────────────────────────────────────────────── + +class TestParseKexInit: + + def test_parses_all_ten_fields(self): + payload = _build_kex_init() + result = _parse_kex_init(payload) + assert result is not None + assert len(result) == 10 + + def test_extracts_correct_field_values(self): + payload = _build_kex_init( + kex="curve25519-sha256", + enc_s2c="chacha20-poly1305@openssh.com", + mac_s2c="hmac-sha2-512-etm@openssh.com", + comp_s2c="none", + ) + result = _parse_kex_init(payload) + assert result["kex_algorithms"] == "curve25519-sha256" + assert result["encryption_server_to_client"] == "chacha20-poly1305@openssh.com" + assert result["mac_server_to_client"] == "hmac-sha2-512-etm@openssh.com" + assert result["compression_server_to_client"] == "none" + + def test_extracts_hassh_server_fields_at_correct_indices(self): + """HASSHServer uses indices 0(kex), 3(enc_s2c), 5(mac_s2c), 7(comp_s2c).""" + payload = _build_kex_init( + kex="KEX_FIELD", + host_key="HOSTKEY_FIELD", + enc_c2s="ENC_C2S_FIELD", + enc_s2c="ENC_S2C_FIELD", + mac_c2s="MAC_C2S_FIELD", + mac_s2c="MAC_S2C_FIELD", + comp_c2s="COMP_C2S_FIELD", + comp_s2c="COMP_S2C_FIELD", + ) + result = _parse_kex_init(payload) + # Indices used by HASSHServer + assert result["kex_algorithms"] == "KEX_FIELD" # index 0 + assert result["encryption_server_to_client"] == "ENC_S2C_FIELD" # index 3 + assert result["mac_server_to_client"] == "MAC_S2C_FIELD" # index 5 + assert result["compression_server_to_client"] == "COMP_S2C_FIELD" # index 7 + + def test_empty_name_lists(self): + payload = _build_kex_init( + kex="", host_key="", enc_c2s="", enc_s2c="", + mac_c2s="", mac_s2c="", comp_c2s="", comp_s2c="", + ) + result = _parse_kex_init(payload) + assert result is not None + assert result["kex_algorithms"] == "" + + def test_truncated_payload_returns_none(self): + # Just the type byte and cookie, no name-lists + payload = struct.pack("B", _SSH_MSG_KEXINIT) + b"\x00" * 16 + assert _parse_kex_init(payload) is None + + def test_truncated_name_list_returns_none(self): + # Type + cookie + length says 100 but only 2 bytes follow + payload = struct.pack("B", _SSH_MSG_KEXINIT) + b"\x00" * 16 + payload += struct.pack("!I", 100) + b"ab" + assert _parse_kex_init(payload) is None + + def test_too_short_returns_none(self): + assert _parse_kex_init(b"") is None + assert _parse_kex_init(b"\x14") is None + + def test_large_algorithm_lists(self): + long_kex = ",".join(f"algo-{i}" for i in range(50)) + payload = _build_kex_init(kex=long_kex) + result = _parse_kex_init(payload) + assert result is not None + assert result["kex_algorithms"] == long_kex + + +# ─── _compute_hassh ───────────────────────────────────────────────────────── + +class TestComputeHashh: + + def test_md5_correctness(self): + kex = "curve25519-sha256" + enc = "aes256-gcm@openssh.com" + mac = "hmac-sha2-256-etm@openssh.com" + comp = "none" + raw = f"{kex};{enc};{mac};{comp}" + expected = hashlib.md5(raw.encode("utf-8")).hexdigest() + assert _compute_hassh(kex, enc, mac, comp) == expected + + def test_hash_length_is_32(self): + result = _compute_hassh("a", "b", "c", "d") + assert len(result) == 32 + + def test_deterministic(self): + r1 = _compute_hassh("kex1", "enc1", "mac1", "comp1") + r2 = _compute_hassh("kex1", "enc1", "mac1", "comp1") + assert r1 == r2 + + def test_different_inputs_different_hashes(self): + r1 = _compute_hassh("kex1", "enc1", "mac1", "comp1") + r2 = _compute_hassh("kex2", "enc2", "mac2", "comp2") + assert r1 != r2 + + def test_empty_fields(self): + result = _compute_hassh("", "", "", "") + expected = hashlib.md5(b";;;").hexdigest() + assert result == expected + + def test_semicolon_delimiter(self): + """The delimiter is semicolon, not comma.""" + result = _compute_hassh("a", "b", "c", "d") + expected = hashlib.md5(b"a;b;c;d").hexdigest() + assert result == expected + + +# ─── _read_banner ─────────────────────────────────────────────────────────── + +class TestReadBanner: + + def test_reads_banner_with_crlf(self): + sock = _make_socket_with_data(b"SSH-2.0-OpenSSH_8.9p1\r\n") + result = _read_banner(sock) + assert result == "SSH-2.0-OpenSSH_8.9p1" + + def test_reads_banner_with_lf(self): + sock = _make_socket_with_data(b"SSH-2.0-OpenSSH_8.9p1\n") + result = _read_banner(sock) + assert result == "SSH-2.0-OpenSSH_8.9p1" + + def test_empty_data_returns_none(self): + sock = _make_socket_with_data(b"") + result = _read_banner(sock) + assert result is None + + def test_no_newline_within_limit(self): + # 256 bytes with no newline — should stop at limit + sock = _make_socket_with_data(b"A" * 256) + result = _read_banner(sock) + assert result == "A" * 256 + + +# ─── _read_ssh_packet ─────────────────────────────────────────────────────── + +class TestReadSSHPacket: + + def test_reads_valid_packet(self): + payload = b"\x14" + b"\x00" * 20 # type 20 + some data + packet_data = _wrap_ssh_packet(payload) + sock = _make_socket_with_data(packet_data) + result = _read_ssh_packet(sock) + assert result is not None + assert result[0] == 0x14 # SSH_MSG_KEXINIT + + def test_empty_socket_returns_none(self): + sock = _make_socket_with_data(b"") + assert _read_ssh_packet(sock) is None + + def test_truncated_header_returns_none(self): + sock = _make_socket_with_data(b"\x00\x00") + assert _read_ssh_packet(sock) is None + + def test_oversized_packet_returns_none(self): + # packet_length = 40000 (over limit) + sock = _make_socket_with_data(struct.pack("!I", 40000)) + assert _read_ssh_packet(sock) is None + + def test_zero_length_returns_none(self): + sock = _make_socket_with_data(struct.pack("!I", 0)) + assert _read_ssh_packet(sock) is None + + +# ─── hassh_server (end-to-end with mocked sockets) ───────────────────────── + +class TestHasshServerE2E: + + @patch("decnet.prober.hassh._ssh_connect") + def test_success(self, mock_connect: MagicMock): + payload = _build_kex_init( + kex="curve25519-sha256", + enc_s2c="aes256-gcm@openssh.com", + mac_s2c="hmac-sha2-256-etm@openssh.com", + comp_s2c="none", + ) + mock_connect.return_value = ("SSH-2.0-OpenSSH_8.9p1", payload) + + result = hassh_server("10.0.0.1", 22, timeout=1.0) + assert result is not None + assert len(result["hassh_server"]) == 32 + assert result["banner"] == "SSH-2.0-OpenSSH_8.9p1" + assert result["kex_algorithms"] == "curve25519-sha256" + assert result["encryption_s2c"] == "aes256-gcm@openssh.com" + assert result["mac_s2c"] == "hmac-sha2-256-etm@openssh.com" + assert result["compression_s2c"] == "none" + + @patch("decnet.prober.hassh._ssh_connect") + def test_connection_failure_returns_none(self, mock_connect: MagicMock): + mock_connect.return_value = None + assert hassh_server("10.0.0.1", 22, timeout=1.0) is None + + @patch("decnet.prober.hassh._ssh_connect") + def test_truncated_kex_init_returns_none(self, mock_connect: MagicMock): + # Payload too short to parse + payload = struct.pack("B", _SSH_MSG_KEXINIT) + b"\x00" * 16 + mock_connect.return_value = ("SSH-2.0-OpenSSH_8.9p1", payload) + assert hassh_server("10.0.0.1", 22, timeout=1.0) is None + + @patch("decnet.prober.hassh._ssh_connect") + def test_hash_is_deterministic(self, mock_connect: MagicMock): + payload = _build_kex_init() + mock_connect.return_value = ("SSH-2.0-OpenSSH_8.9p1", payload) + + r1 = hassh_server("10.0.0.1", 22) + r2 = hassh_server("10.0.0.1", 22) + assert r1["hassh_server"] == r2["hassh_server"] + + @patch("decnet.prober.hassh._ssh_connect") + def test_different_servers_different_hashes(self, mock_connect: MagicMock): + p1 = _build_kex_init(kex="curve25519-sha256", enc_s2c="aes256-gcm@openssh.com") + p2 = _build_kex_init(kex="diffie-hellman-group14-sha1", enc_s2c="aes128-cbc") + + mock_connect.return_value = ("SSH-2.0-OpenSSH_8.9p1", p1) + r1 = hassh_server("10.0.0.1", 22) + + mock_connect.return_value = ("SSH-2.0-Paramiko_3.0", p2) + r2 = hassh_server("10.0.0.2", 22) + + assert r1["hassh_server"] != r2["hassh_server"] + + @patch("decnet.prober.hassh.socket.create_connection") + def test_full_socket_mock(self, mock_create: MagicMock): + """Full integration: mock at socket level, verify banner exchange.""" + kex_payload = _build_kex_init() + kex_packet = _wrap_ssh_packet(kex_payload) + + banner_bytes = b"SSH-2.0-OpenSSH_8.9p1\r\n" + all_data = banner_bytes + kex_packet + + mock_sock = _make_socket_with_data(all_data) + mock_sock.sendall = MagicMock() + mock_sock.settimeout = MagicMock() + mock_sock.close = MagicMock() + mock_create.return_value = mock_sock + + result = hassh_server("10.0.0.1", 22, timeout=2.0) + assert result is not None + assert result["banner"] == "SSH-2.0-OpenSSH_8.9p1" + assert len(result["hassh_server"]) == 32 + + # Verify we sent our client banner + mock_sock.sendall.assert_called_once_with(_CLIENT_BANNER) + + @patch("decnet.prober.hassh.socket.create_connection") + def test_non_ssh_banner_returns_none(self, mock_create: MagicMock): + mock_sock = _make_socket_with_data(b"HTTP/1.1 200 OK\r\n") + mock_sock.sendall = MagicMock() + mock_sock.settimeout = MagicMock() + mock_sock.close = MagicMock() + mock_create.return_value = mock_sock + + assert hassh_server("10.0.0.1", 80, timeout=1.0) is None + + @patch("decnet.prober.hassh.socket.create_connection") + def test_connection_refused(self, mock_create: MagicMock): + mock_create.side_effect = ConnectionRefusedError + assert hassh_server("10.0.0.1", 22, timeout=1.0) is None + + @patch("decnet.prober.hassh.socket.create_connection") + def test_timeout(self, mock_create: MagicMock): + mock_create.side_effect = socket.timeout("timed out") + assert hassh_server("10.0.0.1", 22, timeout=1.0) is None diff --git a/tests/test_prober_tcpfp.py b/tests/test_prober_tcpfp.py new file mode 100644 index 0000000..32f2a5e --- /dev/null +++ b/tests/test_prober_tcpfp.py @@ -0,0 +1,349 @@ +""" +Unit tests for the TCP/IP stack fingerprinting module. + +Tests cover SYN-ACK parsing, options extraction, fingerprint computation, +and end-to-end tcp_fingerprint() with mocked scapy packets. +""" + +from __future__ import annotations + +import hashlib +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +from decnet.prober.tcpfp import ( + _compute_fingerprint, + _extract_options_order, + _parse_synack, + tcp_fingerprint, +) + + +# ─── Helpers ──────────────────────────────────────────────────────────────── + +def _make_synack( + ttl: int = 64, + flags: int = 0x02, # IP flags (DF = 0x02) + ip_id: int = 0, + window: int = 65535, + tcp_flags: int = 0x12, # SYN-ACK + options: list | None = None, + ack: int = 1, +) -> SimpleNamespace: + """Build a fake scapy-like SYN-ACK packet for testing.""" + if options is None: + options = [ + ("MSS", 1460), + ("NOP", None), + ("WScale", 7), + ("NOP", None), + ("NOP", None), + ("Timestamp", (12345, 0)), + ("SAckOK", b""), + ("EOL", None), + ] + + tcp_layer = SimpleNamespace( + flags=tcp_flags, + window=window, + options=options, + dport=12345, + ack=ack, + ) + ip_layer = SimpleNamespace( + ttl=ttl, + flags=flags, + id=ip_id, + ) + + class FakePacket: + def __init__(self): + self._layers = {"IP": ip_layer, "TCP": tcp_layer} + self.ack = ack + + def __getitem__(self, key): + # Support both class and string access + name = key.__name__ if hasattr(key, "__name__") else str(key) + return self._layers[name] + + def haslayer(self, key): + name = key.__name__ if hasattr(key, "__name__") else str(key) + return name in self._layers + + return FakePacket() + + +# ─── _extract_options_order ───────────────────────────────────────────────── + +class TestExtractOptionsOrder: + + def test_standard_linux_options(self): + options = [ + ("MSS", 1460), ("NOP", None), ("WScale", 7), + ("NOP", None), ("NOP", None), ("Timestamp", (0, 0)), + ("SAckOK", b""), ("EOL", None), + ] + assert _extract_options_order(options) == "M,N,W,N,N,T,S,E" + + def test_windows_options(self): + options = [ + ("MSS", 1460), ("NOP", None), ("WScale", 8), + ("NOP", None), ("NOP", None), ("SAckOK", b""), + ] + assert _extract_options_order(options) == "M,N,W,N,N,S" + + def test_empty_options(self): + assert _extract_options_order([]) == "" + + def test_mss_only(self): + assert _extract_options_order([("MSS", 536)]) == "M" + + def test_unknown_option(self): + options = [("MSS", 1460), ("UnknownOpt", 42)] + assert _extract_options_order(options) == "M,?" + + def test_sack_variant(self): + options = [("SAck", (100, 200))] + assert _extract_options_order(options) == "S" + + +# ─── _parse_synack ────────────────────────────────────────────────────────── + +class TestParseSynack: + + def test_linux_64_ttl(self): + resp = _make_synack(ttl=64) + result = _parse_synack(resp) + assert result["ttl"] == 64 + + def test_windows_128_ttl(self): + resp = _make_synack(ttl=128) + result = _parse_synack(resp) + assert result["ttl"] == 128 + + def test_df_bit_set(self): + resp = _make_synack(flags=0x02) # DF set + result = _parse_synack(resp) + assert result["df_bit"] == 1 + + def test_df_bit_unset(self): + resp = _make_synack(flags=0x00) + result = _parse_synack(resp) + assert result["df_bit"] == 0 + + def test_window_size(self): + resp = _make_synack(window=29200) + result = _parse_synack(resp) + assert result["window_size"] == 29200 + + def test_mss_extraction(self): + resp = _make_synack(options=[("MSS", 1460)]) + result = _parse_synack(resp) + assert result["mss"] == 1460 + + def test_window_scale(self): + resp = _make_synack(options=[("WScale", 7)]) + result = _parse_synack(resp) + assert result["window_scale"] == 7 + + def test_sack_ok(self): + resp = _make_synack(options=[("SAckOK", b"")]) + result = _parse_synack(resp) + assert result["sack_ok"] == 1 + + def test_no_sack(self): + resp = _make_synack(options=[("MSS", 1460)]) + result = _parse_synack(resp) + assert result["sack_ok"] == 0 + + def test_timestamp_present(self): + resp = _make_synack(options=[("Timestamp", (12345, 0))]) + result = _parse_synack(resp) + assert result["timestamp"] == 1 + + def test_no_timestamp(self): + resp = _make_synack(options=[("MSS", 1460)]) + result = _parse_synack(resp) + assert result["timestamp"] == 0 + + def test_options_order(self): + resp = _make_synack(options=[ + ("MSS", 1460), ("NOP", None), ("WScale", 7), + ("SAckOK", b""), ("Timestamp", (0, 0)), + ]) + result = _parse_synack(resp) + assert result["options_order"] == "M,N,W,S,T" + + def test_ip_id(self): + resp = _make_synack(ip_id=12345) + result = _parse_synack(resp) + assert result["ip_id"] == 12345 + + def test_empty_options(self): + resp = _make_synack(options=[]) + result = _parse_synack(resp) + assert result["mss"] == 0 + assert result["window_scale"] == -1 + assert result["sack_ok"] == 0 + assert result["timestamp"] == 0 + assert result["options_order"] == "" + + def test_full_linux_fingerprint(self): + """Typical Linux 5.x+ SYN-ACK.""" + resp = _make_synack( + ttl=64, flags=0x02, window=65535, + options=[ + ("MSS", 1460), ("NOP", None), ("WScale", 7), + ("NOP", None), ("NOP", None), ("Timestamp", (0, 0)), + ("SAckOK", b""), ("EOL", None), + ], + ) + result = _parse_synack(resp) + assert result["ttl"] == 64 + assert result["df_bit"] == 1 + assert result["window_size"] == 65535 + assert result["mss"] == 1460 + assert result["window_scale"] == 7 + assert result["sack_ok"] == 1 + assert result["timestamp"] == 1 + assert result["options_order"] == "M,N,W,N,N,T,S,E" + + +# ─── _compute_fingerprint ────────────────────────────────────────────────── + +class TestComputeFingerprint: + + def test_hash_length_is_32(self): + fields = { + "ttl": 64, "window_size": 65535, "df_bit": 1, + "mss": 1460, "window_scale": 7, "sack_ok": 1, + "timestamp": 1, "options_order": "M,N,W,N,N,T,S,E", + } + raw, h = _compute_fingerprint(fields) + assert len(h) == 32 + + def test_deterministic(self): + fields = { + "ttl": 64, "window_size": 65535, "df_bit": 1, + "mss": 1460, "window_scale": 7, "sack_ok": 1, + "timestamp": 1, "options_order": "M,N,W,S,T", + } + _, h1 = _compute_fingerprint(fields) + _, h2 = _compute_fingerprint(fields) + assert h1 == h2 + + def test_different_inputs_different_hashes(self): + f1 = { + "ttl": 64, "window_size": 65535, "df_bit": 1, + "mss": 1460, "window_scale": 7, "sack_ok": 1, + "timestamp": 1, "options_order": "M,N,W,S,T", + } + f2 = { + "ttl": 128, "window_size": 8192, "df_bit": 1, + "mss": 1460, "window_scale": 8, "sack_ok": 1, + "timestamp": 0, "options_order": "M,N,W,N,N,S", + } + _, h1 = _compute_fingerprint(f1) + _, h2 = _compute_fingerprint(f2) + assert h1 != h2 + + def test_raw_format(self): + fields = { + "ttl": 64, "window_size": 65535, "df_bit": 1, + "mss": 1460, "window_scale": 7, "sack_ok": 1, + "timestamp": 1, "options_order": "M,N,W", + } + raw, _ = _compute_fingerprint(fields) + assert raw == "64:65535:1:1460:7:1:1:M,N,W" + + def test_sha256_correctness(self): + fields = { + "ttl": 64, "window_size": 65535, "df_bit": 1, + "mss": 1460, "window_scale": 7, "sack_ok": 1, + "timestamp": 1, "options_order": "M,N,W", + } + raw, h = _compute_fingerprint(fields) + expected = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32] + assert h == expected + + +# ─── tcp_fingerprint (end-to-end with mocked scapy) ──────────────────────── + +class TestTcpFingerprintE2E: + + @patch("decnet.prober.tcpfp._send_syn") + def test_success(self, mock_send: MagicMock): + mock_send.return_value = _make_synack( + ttl=64, flags=0x02, window=65535, + options=[ + ("MSS", 1460), ("NOP", None), ("WScale", 7), + ("SAckOK", b""), ("Timestamp", (0, 0)), + ], + ) + result = tcp_fingerprint("10.0.0.1", 443, timeout=1.0) + assert result is not None + assert len(result["tcpfp_hash"]) == 32 + assert result["ttl"] == 64 + assert result["window_size"] == 65535 + assert result["df_bit"] == 1 + assert result["mss"] == 1460 + assert result["window_scale"] == 7 + assert result["sack_ok"] == 1 + assert result["timestamp"] == 1 + assert result["options_order"] == "M,N,W,S,T" + + @patch("decnet.prober.tcpfp._send_syn") + def test_no_response_returns_none(self, mock_send: MagicMock): + mock_send.return_value = None + assert tcp_fingerprint("10.0.0.1", 443, timeout=1.0) is None + + @patch("decnet.prober.tcpfp._send_syn") + def test_windows_fingerprint(self, mock_send: MagicMock): + mock_send.return_value = _make_synack( + ttl=128, flags=0x02, window=8192, + options=[ + ("MSS", 1460), ("NOP", None), ("WScale", 8), + ("NOP", None), ("NOP", None), ("SAckOK", b""), + ], + ) + result = tcp_fingerprint("10.0.0.1", 443, timeout=1.0) + assert result is not None + assert result["ttl"] == 128 + assert result["window_size"] == 8192 + assert result["window_scale"] == 8 + + @patch("decnet.prober.tcpfp._send_syn") + def test_embedded_device_fingerprint(self, mock_send: MagicMock): + """Embedded devices often have TTL=255, small window, no options.""" + mock_send.return_value = _make_synack( + ttl=255, flags=0x00, window=4096, + options=[("MSS", 536)], + ) + result = tcp_fingerprint("10.0.0.1", 80, timeout=1.0) + assert result is not None + assert result["ttl"] == 255 + assert result["df_bit"] == 0 + assert result["window_size"] == 4096 + assert result["mss"] == 536 + assert result["window_scale"] == -1 + assert result["sack_ok"] == 0 + + @patch("decnet.prober.tcpfp._send_syn") + def test_result_contains_raw_and_hash(self, mock_send: MagicMock): + mock_send.return_value = _make_synack() + result = tcp_fingerprint("10.0.0.1", 443) + assert "tcpfp_hash" in result + assert "tcpfp_raw" in result + assert ":" in result["tcpfp_raw"] + + @patch("decnet.prober.tcpfp._send_syn") + def test_deterministic(self, mock_send: MagicMock): + pkt = _make_synack(ttl=64, window=65535) + mock_send.return_value = pkt + + r1 = tcp_fingerprint("10.0.0.1", 443) + r2 = tcp_fingerprint("10.0.0.1", 443) + assert r1["tcpfp_hash"] == r2["tcpfp_hash"] + assert r1["tcpfp_raw"] == r2["tcpfp_raw"] diff --git a/tests/test_prober_worker.py b/tests/test_prober_worker.py index 208907a..95b882f 100644 --- a/tests/test_prober_worker.py +++ b/tests/test_prober_worker.py @@ -1,6 +1,6 @@ """ Tests for the prober worker — target discovery from the log stream and -probe cycle behavior. +probe cycle behavior (JARM, HASSH, TCP/IP fingerprinting). """ from __future__ import annotations @@ -14,6 +14,8 @@ import pytest from decnet.prober.jarm import JARM_EMPTY_HASH from decnet.prober.worker import ( DEFAULT_PROBE_PORTS, + DEFAULT_SSH_PORTS, + DEFAULT_TCPFP_PORTS, _discover_attackers, _probe_cycle, _write_event, @@ -103,86 +105,357 @@ class TestDiscoverAttackers: assert "10.0.0.1" in ips -# ─── _probe_cycle ──────────────────────────────────────────────────────────── +# ─── _probe_cycle: JARM phase ────────────────────────────────────────────── -class TestProbeCycle: +class TestProbeCycleJARM: + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") @patch("decnet.prober.worker.jarm_hash") - def test_probes_new_ips(self, mock_jarm: MagicMock, tmp_path: Path): + def test_probes_new_ips(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): mock_jarm.return_value = "c0c" * 10 + "a" * 32 # fake 62-char hash + mock_hassh.return_value = None + mock_tcpfp.return_value = None log_path = tmp_path / "decnet.log" json_path = tmp_path / "decnet.json" targets = {"10.0.0.1"} - probed: dict[str, set[int]] = {} + probed: dict[str, dict[str, set[int]]] = {} - _probe_cycle(targets, probed, [443, 8443], log_path, json_path, timeout=1.0) + _probe_cycle(targets, probed, [443, 8443], [], [], log_path, json_path, timeout=1.0) assert mock_jarm.call_count == 2 # two ports - assert 443 in probed["10.0.0.1"] - assert 8443 in probed["10.0.0.1"] + assert 443 in probed["10.0.0.1"]["jarm"] + assert 8443 in probed["10.0.0.1"]["jarm"] + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") @patch("decnet.prober.worker.jarm_hash") - def test_skips_already_probed_ports(self, mock_jarm: MagicMock, tmp_path: Path): + def test_skips_already_probed_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): mock_jarm.return_value = "c0c" * 10 + "a" * 32 + mock_hassh.return_value = None + mock_tcpfp.return_value = None log_path = tmp_path / "decnet.log" json_path = tmp_path / "decnet.json" targets = {"10.0.0.1"} - probed: dict[str, set[int]] = {"10.0.0.1": {443}} + probed: dict[str, dict[str, set[int]]] = {"10.0.0.1": {"jarm": {443}}} - _probe_cycle(targets, probed, [443, 8443], log_path, json_path, timeout=1.0) + _probe_cycle(targets, probed, [443, 8443], [], [], log_path, json_path, timeout=1.0) # Should only probe 8443 (443 already done) assert mock_jarm.call_count == 1 mock_jarm.assert_called_once_with("10.0.0.1", 8443, timeout=1.0) + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") @patch("decnet.prober.worker.jarm_hash") - def test_empty_hash_not_logged(self, mock_jarm: MagicMock, tmp_path: Path): - """All-zeros JARM hash (no TLS server) should not be written as a jarm_fingerprint event.""" + def test_empty_hash_not_logged(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None + mock_tcpfp.return_value = None log_path = tmp_path / "decnet.log" json_path = tmp_path / "decnet.json" targets = {"10.0.0.1"} - probed: dict[str, set[int]] = {} + probed: dict[str, dict[str, set[int]]] = {} - _probe_cycle(targets, probed, [443], log_path, json_path, timeout=1.0) + _probe_cycle(targets, probed, [443], [], [], log_path, json_path, timeout=1.0) - # Port should be marked as probed - assert 443 in probed["10.0.0.1"] - # But no jarm_fingerprint event should be written + assert 443 in probed["10.0.0.1"]["jarm"] if json_path.exists(): content = json_path.read_text() assert "jarm_fingerprint" not in content + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") @patch("decnet.prober.worker.jarm_hash") - def test_exception_marks_port_probed(self, mock_jarm: MagicMock, tmp_path: Path): + def test_exception_marks_port_probed(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): mock_jarm.side_effect = OSError("Connection refused") + mock_hassh.return_value = None + mock_tcpfp.return_value = None log_path = tmp_path / "decnet.log" json_path = tmp_path / "decnet.json" targets = {"10.0.0.1"} - probed: dict[str, set[int]] = {} + probed: dict[str, dict[str, set[int]]] = {} - _probe_cycle(targets, probed, [443], log_path, json_path, timeout=1.0) + _probe_cycle(targets, probed, [443], [], [], log_path, json_path, timeout=1.0) - # Port marked as probed to avoid infinite retries - assert 443 in probed["10.0.0.1"] + assert 443 in probed["10.0.0.1"]["jarm"] + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") @patch("decnet.prober.worker.jarm_hash") - def test_skips_ip_with_all_ports_done(self, mock_jarm: MagicMock, tmp_path: Path): + def test_skips_ip_with_all_ports_done(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): log_path = tmp_path / "decnet.log" json_path = tmp_path / "decnet.json" targets = {"10.0.0.1"} - probed: dict[str, set[int]] = {"10.0.0.1": {443, 8443}} + probed: dict[str, dict[str, set[int]]] = { + "10.0.0.1": {"jarm": {443, 8443}, "hassh": set(), "tcpfp": set()}, + } - _probe_cycle(targets, probed, [443, 8443], log_path, json_path, timeout=1.0) + _probe_cycle(targets, probed, [443, 8443], [], [], log_path, json_path, timeout=1.0) assert mock_jarm.call_count == 0 +# ─── _probe_cycle: HASSH phase ───────────────────────────────────────────── + +class TestProbeCycleHASSH: + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_probes_ssh_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = { + "hassh_server": "a" * 32, + "banner": "SSH-2.0-OpenSSH_8.9p1", + "kex_algorithms": "curve25519-sha256", + "encryption_s2c": "aes256-gcm@openssh.com", + "mac_s2c": "hmac-sha2-256-etm@openssh.com", + "compression_s2c": "none", + } + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [], [22, 2222], [], log_path, json_path, timeout=1.0) + + assert mock_hassh.call_count == 2 + assert 22 in probed["10.0.0.1"]["hassh"] + assert 2222 in probed["10.0.0.1"]["hassh"] + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_hassh_writes_event(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = { + "hassh_server": "b" * 32, + "banner": "SSH-2.0-Paramiko_3.0", + "kex_algorithms": "diffie-hellman-group14-sha1", + "encryption_s2c": "aes128-cbc", + "mac_s2c": "hmac-sha1", + "compression_s2c": "none", + } + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [], [22], [], log_path, json_path, timeout=1.0) + + assert json_path.exists() + content = json_path.read_text() + assert "hassh_fingerprint" in content + record = json.loads(content.strip()) + assert record["fields"]["hassh_server_hash"] == "b" * 32 + assert record["fields"]["ssh_banner"] == "SSH-2.0-Paramiko_3.0" + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_hassh_none_not_logged(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None # No SSH server + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [], [22], [], log_path, json_path, timeout=1.0) + + assert 22 in probed["10.0.0.1"]["hassh"] + if json_path.exists(): + content = json_path.read_text() + assert "hassh_fingerprint" not in content + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_hassh_skips_already_probed(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {"10.0.0.1": {"hassh": {22}}} + + _probe_cycle(targets, probed, [], [22, 2222], [], log_path, json_path, timeout=1.0) + + assert mock_hassh.call_count == 1 # only 2222 + mock_hassh.assert_called_once_with("10.0.0.1", 2222, timeout=1.0) + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_hassh_exception_marks_probed(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.side_effect = OSError("Connection refused") + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [], [22], [], log_path, json_path, timeout=1.0) + + assert 22 in probed["10.0.0.1"]["hassh"] + + +# ─── _probe_cycle: TCPFP phase ───────────────────────────────────────────── + +class TestProbeCycleTCPFP: + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_probes_tcpfp_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None + mock_tcpfp.return_value = { + "tcpfp_hash": "d" * 32, + "tcpfp_raw": "64:65535:1:1460:7:1:1:M,N,W,N,N,T,S,E", + "ttl": 64, "window_size": 65535, "df_bit": 1, + "mss": 1460, "window_scale": 7, "sack_ok": 1, + "timestamp": 1, "options_order": "M,N,W,N,N,T,S,E", + } + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [], [], [80, 443], log_path, json_path, timeout=1.0) + + assert mock_tcpfp.call_count == 2 + assert 80 in probed["10.0.0.1"]["tcpfp"] + assert 443 in probed["10.0.0.1"]["tcpfp"] + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_tcpfp_writes_event_with_all_fields(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None + mock_tcpfp.return_value = { + "tcpfp_hash": "e" * 32, + "tcpfp_raw": "128:8192:1:1460:8:1:0:M,N,W,N,N,S", + "ttl": 128, "window_size": 8192, "df_bit": 1, + "mss": 1460, "window_scale": 8, "sack_ok": 1, + "timestamp": 0, "options_order": "M,N,W,N,N,S", + } + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [], [], [443], log_path, json_path, timeout=1.0) + + content = json_path.read_text() + assert "tcpfp_fingerprint" in content + record = json.loads(content.strip()) + assert record["fields"]["tcpfp_hash"] == "e" * 32 + assert record["fields"]["ttl"] == "128" + assert record["fields"]["window_size"] == "8192" + assert record["fields"]["options_order"] == "M,N,W,N,N,S" + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_tcpfp_none_not_logged(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [], [], [443], log_path, json_path, timeout=1.0) + + assert 443 in probed["10.0.0.1"]["tcpfp"] + if json_path.exists(): + content = json_path.read_text() + assert "tcpfp_fingerprint" not in content + + +# ─── Probe type isolation ─────────────────────────────────────────────────── + +class TestProbeTypeIsolation: + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_jarm_does_not_mark_hassh(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + """JARM probing port 2222 should not mark HASSH port 2222 as done.""" + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + # Probe with JARM on 2222 and HASSH on 2222 + _probe_cycle(targets, probed, [2222], [2222], [], log_path, json_path, timeout=1.0) + + # Both should be called + assert mock_jarm.call_count == 1 + assert mock_hassh.call_count == 1 + assert 2222 in probed["10.0.0.1"]["jarm"] + assert 2222 in probed["10.0.0.1"]["hassh"] + + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_all_three_probes_run(self, mock_jarm: MagicMock, mock_hassh: MagicMock, + mock_tcpfp: MagicMock, tmp_path: Path): + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, dict[str, set[int]]] = {} + + _probe_cycle(targets, probed, [443], [22], [80], log_path, json_path, timeout=1.0) + + assert mock_jarm.call_count == 1 + assert mock_hassh.call_count == 1 + assert mock_tcpfp.call_count == 1 + + # ─── _write_event ──────────────────────────────────────────────────────────── class TestWriteEvent: