diff --git a/decnet/cli.py b/decnet/cli.py index 47d9854..8cc83e6 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -120,6 +120,8 @@ def deploy( config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to INI config file"), api: bool = typer.Option(False, "--api", help="Start the FastAPI backend to ingest and serve logs"), api_port: int = typer.Option(8000, "--api-port", help="Port for the backend API"), + probe_targets: Optional[str] = typer.Option(None, "--probe-targets", help="Comma-separated ip:port pairs for JARM active probing (e.g. 10.0.0.1:443,10.0.0.2:8443)"), + probe_interval: int = typer.Option(300, "--probe-interval", help="Seconds between JARM probe cycles (default: 300)"), ) -> None: """Deploy deckies to the LAN.""" import os @@ -296,6 +298,43 @@ def deploy( except (FileNotFoundError, subprocess.SubprocessError): console.print("[red]Failed to start API. Ensure 'uvicorn' is installed in the current environment.[/]") + if probe_targets and not dry_run: + import subprocess # nosec B404 + import sys + console.print(f"[bold cyan]Starting DECNET-PROBER[/] → targets: {probe_targets}") + try: + _prober_args = [ + sys.executable, "-m", "decnet.cli", "probe", + "--targets", probe_targets, + "--interval", str(probe_interval), + ] + if effective_log_file: + _prober_args.extend(["--log-file", str(effective_log_file)]) + subprocess.Popen( # nosec B603 + _prober_args, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + except (FileNotFoundError, subprocess.SubprocessError): + console.print("[red]Failed to start DECNET-PROBER.[/]") + + +@app.command() +def probe( + targets: str = typer.Option(..., "--targets", "-t", help="Comma-separated ip:port pairs to JARM fingerprint"), + log_file: str = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", "-f", help="Path for RFC 5424 syslog + .json output"), + interval: int = typer.Option(300, "--interval", "-i", help="Seconds between probe cycles (default: 300)"), + timeout: float = typer.Option(5.0, "--timeout", help="Per-probe TCP timeout in seconds"), +) -> None: + """Run JARM active fingerprinting against target hosts.""" + import asyncio + from decnet.prober import prober_worker + log.info("probe command invoked targets=%s interval=%d", targets, interval) + console.print(f"[bold cyan]DECNET-PROBER starting[/] → {targets}") + asyncio.run(prober_worker(log_file, targets, interval=interval, timeout=timeout)) + @app.command() def collect( diff --git a/decnet/prober/__init__.py b/decnet/prober/__init__.py new file mode 100644 index 0000000..52a2051 --- /dev/null +++ b/decnet/prober/__init__.py @@ -0,0 +1,13 @@ +""" +DECNET-PROBER — standalone active network probing service. + +Runs as a detached host-level process (no container). Sends crafted TLS +probes to discover C2 frameworks and other attacker infrastructure via +JARM fingerprinting. Results are written as RFC 5424 syslog + JSON to the +same log file the collector uses, so the existing ingestion pipeline picks +them up automatically. +""" + +from decnet.prober.worker import prober_worker + +__all__ = ["prober_worker"] diff --git a/decnet/prober/jarm.py b/decnet/prober/jarm.py new file mode 100644 index 0000000..ac06d83 --- /dev/null +++ b/decnet/prober/jarm.py @@ -0,0 +1,502 @@ +""" +JARM TLS fingerprinting — pure stdlib implementation. + +JARM sends 10 crafted TLS ClientHello packets to a target, each varying +TLS version, cipher suite order, extensions, and ALPN values. The +ServerHello responses are parsed and hashed to produce a 62-character +fingerprint that identifies the TLS server implementation. + +Reference: https://github.com/salesforce/jarm + +No DECNET imports — this module is self-contained and testable in isolation. +""" + +from __future__ import annotations + +import hashlib +import socket +import struct +import time +from typing import Any + +# ─── Constants ──────────────────────────────────────────────────────────────── + +JARM_EMPTY_HASH = "0" * 62 + +_INTER_PROBE_DELAY = 0.1 # seconds between probes to avoid IDS triggers + +# TLS version bytes +_TLS_1_0 = b"\x03\x01" +_TLS_1_1 = b"\x03\x02" +_TLS_1_2 = b"\x03\x03" +_TLS_1_3 = b"\x03\x03" # TLS 1.3 uses 0x0303 in record layer + +# TLS record types +_CONTENT_HANDSHAKE = 0x16 +_HANDSHAKE_CLIENT_HELLO = 0x01 +_HANDSHAKE_SERVER_HELLO = 0x02 + +# Extension types +_EXT_SERVER_NAME = 0x0000 +_EXT_EC_POINT_FORMATS = 0x000B +_EXT_SUPPORTED_GROUPS = 0x000A +_EXT_SESSION_TICKET = 0x0023 +_EXT_ENCRYPT_THEN_MAC = 0x0016 +_EXT_EXTENDED_MASTER_SECRET = 0x0017 +_EXT_SIGNATURE_ALGORITHMS = 0x000D +_EXT_SUPPORTED_VERSIONS = 0x002B +_EXT_PSK_KEY_EXCHANGE_MODES = 0x002D +_EXT_KEY_SHARE = 0x0033 +_EXT_ALPN = 0x0010 +_EXT_PADDING = 0x0015 + +# ─── Cipher suite lists per JARM spec ──────────────────────────────────────── + +# Forward cipher order (standard) +_CIPHERS_FORWARD = [ + 0x0016, 0x0033, 0x0067, 0xC09E, 0xC0A2, 0x009E, 0x0039, 0x006B, + 0xC09F, 0xC0A3, 0x009F, 0x0045, 0x00BE, 0x0088, 0x00C4, 0x009A, + 0xC008, 0xC009, 0xC023, 0xC0AC, 0xC0AE, 0xC02B, 0xC00A, 0xC024, + 0xC0AD, 0xC0AF, 0xC02C, 0xC072, 0xC073, 0xCCA8, 0x1301, 0x1302, + 0x1303, 0xC013, 0xC014, 0xC02F, 0x009C, 0xC02E, 0x002F, 0x0035, + 0x000A, 0x0005, 0x0004, +] + +# Reverse cipher order +_CIPHERS_REVERSE = list(reversed(_CIPHERS_FORWARD)) + +# TLS 1.3-only ciphers +_CIPHERS_TLS13 = [0x1301, 0x1302, 0x1303] + +# Middle-out cipher order (interleaved from center) +def _middle_out(lst: list[int]) -> list[int]: + result: list[int] = [] + mid = len(lst) // 2 + for i in range(mid + 1): + if mid + i < len(lst): + result.append(lst[mid + i]) + if mid - i >= 0 and mid - i != mid + i: + result.append(lst[mid - i]) + return result + +_CIPHERS_MIDDLE_OUT = _middle_out(_CIPHERS_FORWARD) + +# Rare/uncommon extensions cipher list +_CIPHERS_RARE = [ + 0x0016, 0x0033, 0xC011, 0xC012, 0x0067, 0xC09E, 0xC0A2, 0x009E, + 0x0039, 0x006B, 0xC09F, 0xC0A3, 0x009F, 0x0045, 0x00BE, 0x0088, + 0x00C4, 0x009A, 0xC008, 0xC009, 0xC023, 0xC0AC, 0xC0AE, 0xC02B, + 0xC00A, 0xC024, 0xC0AD, 0xC0AF, 0xC02C, 0xC072, 0xC073, 0xCCA8, + 0x1301, 0x1302, 0x1303, 0xC013, 0xC014, 0xC02F, 0x009C, 0xC02E, + 0x002F, 0x0035, 0x000A, 0x0005, 0x0004, +] + + +# ─── Probe definitions ──────────────────────────────────────────────────────── + +# Each probe: (tls_version, cipher_list, tls13_support, alpn, extensions_style) +# tls_version: record-layer version bytes +# cipher_list: which cipher suite ordering to use +# tls13_support: whether to include TLS 1.3 extensions (supported_versions, key_share, psk) +# alpn: ALPN protocol string or None +# extensions_style: "standard", "rare", or "no_extensions" + +_PROBE_CONFIGS: list[dict[str, Any]] = [ + # 0: TLS 1.2 forward + {"version": _TLS_1_2, "ciphers": _CIPHERS_FORWARD, "tls13": False, "alpn": None, "style": "standard"}, + # 1: TLS 1.2 reverse + {"version": _TLS_1_2, "ciphers": _CIPHERS_REVERSE, "tls13": False, "alpn": None, "style": "standard"}, + # 2: TLS 1.1 forward + {"version": _TLS_1_1, "ciphers": _CIPHERS_FORWARD, "tls13": False, "alpn": None, "style": "standard"}, + # 3: TLS 1.3 forward + {"version": _TLS_1_2, "ciphers": _CIPHERS_FORWARD, "tls13": True, "alpn": "h2", "style": "standard"}, + # 4: TLS 1.3 reverse + {"version": _TLS_1_2, "ciphers": _CIPHERS_REVERSE, "tls13": True, "alpn": "h2", "style": "standard"}, + # 5: TLS 1.3 invalid (advertise 1.3 support but no key_share) + {"version": _TLS_1_2, "ciphers": _CIPHERS_FORWARD, "tls13": "no_key_share", "alpn": None, "style": "standard"}, + # 6: TLS 1.3 middle-out + {"version": _TLS_1_2, "ciphers": _CIPHERS_MIDDLE_OUT, "tls13": True, "alpn": None, "style": "standard"}, + # 7: TLS 1.0 forward + {"version": _TLS_1_0, "ciphers": _CIPHERS_FORWARD, "tls13": False, "alpn": None, "style": "standard"}, + # 8: TLS 1.2 middle-out + {"version": _TLS_1_2, "ciphers": _CIPHERS_MIDDLE_OUT, "tls13": False, "alpn": None, "style": "standard"}, + # 9: TLS 1.2 with rare extensions + {"version": _TLS_1_2, "ciphers": _CIPHERS_RARE, "tls13": False, "alpn": "http/1.1", "style": "rare"}, +] + + +# ─── Extension builders ────────────────────────────────────────────────────── + +def _ext(ext_type: int, data: bytes) -> bytes: + return struct.pack("!HH", ext_type, len(data)) + data + + +def _ext_sni(host: str) -> bytes: + host_bytes = host.encode("ascii") + # ServerNameList: length(2) + ServerName: type(1) + length(2) + name + sni_data = struct.pack("!HBH", len(host_bytes) + 3, 0, len(host_bytes)) + host_bytes + return _ext(_EXT_SERVER_NAME, sni_data) + + +def _ext_supported_groups() -> bytes: + groups = [0x0017, 0x0018, 0x0019, 0x001D, 0x0100, 0x0101] # secp256r1, secp384r1, secp521r1, x25519, ffdhe2048, ffdhe3072 + data = struct.pack("!H", len(groups) * 2) + b"".join(struct.pack("!H", g) for g in groups) + return _ext(_EXT_SUPPORTED_GROUPS, data) + + +def _ext_ec_point_formats() -> bytes: + formats = b"\x00" # uncompressed only + return _ext(_EXT_EC_POINT_FORMATS, struct.pack("B", len(formats)) + formats) + + +def _ext_signature_algorithms() -> bytes: + algos = [ + 0x0401, 0x0501, 0x0601, # RSA PKCS1 SHA256/384/512 + 0x0201, # RSA PKCS1 SHA1 + 0x0403, 0x0503, 0x0603, # ECDSA SHA256/384/512 + 0x0203, # ECDSA SHA1 + 0x0804, 0x0805, 0x0806, # RSA-PSS SHA256/384/512 + ] + data = struct.pack("!H", len(algos) * 2) + b"".join(struct.pack("!H", a) for a in algos) + return _ext(_EXT_SIGNATURE_ALGORITHMS, data) + + +def _ext_supported_versions_13() -> bytes: + versions = [0x0304, 0x0303] # TLS 1.3, 1.2 + data = struct.pack("B", len(versions) * 2) + b"".join(struct.pack("!H", v) for v in versions) + return _ext(_EXT_SUPPORTED_VERSIONS, data) + + +def _ext_psk_key_exchange_modes() -> bytes: + return _ext(_EXT_PSK_KEY_EXCHANGE_MODES, b"\x01\x01") # psk_dhe_ke + + +def _ext_key_share() -> bytes: + # x25519 key share with 32 random-looking bytes + key_data = b"\x00" * 32 + entry = struct.pack("!HH", 0x001D, 32) + key_data # x25519 group + data = struct.pack("!H", len(entry)) + entry + return _ext(_EXT_KEY_SHARE, data) + + +def _ext_alpn(protocol: str) -> bytes: + proto_bytes = protocol.encode("ascii") + proto_entry = struct.pack("B", len(proto_bytes)) + proto_bytes + data = struct.pack("!H", len(proto_entry)) + proto_entry + return _ext(_EXT_ALPN, data) + + +def _ext_session_ticket() -> bytes: + return _ext(_EXT_SESSION_TICKET, b"") + + +def _ext_encrypt_then_mac() -> bytes: + return _ext(_EXT_ENCRYPT_THEN_MAC, b"") + + +def _ext_extended_master_secret() -> bytes: + return _ext(_EXT_EXTENDED_MASTER_SECRET, b"") + + +def _ext_padding(target_length: int, current_length: int) -> bytes: + pad_needed = target_length - current_length - 4 # 4 bytes for ext type + length + if pad_needed < 0: + return b"" + return _ext(_EXT_PADDING, b"\x00" * pad_needed) + + +# ─── ClientHello builder ───────────────────────────────────────────────────── + +def _build_client_hello(probe_index: int, host: str = "localhost") -> bytes: + """ + Construct one of 10 JARM-specified ClientHello packets. + + Args: + probe_index: 0-9, selects the probe configuration + host: target hostname for SNI extension + + Returns: + Complete TLS record bytes ready to send on the wire. + """ + cfg = _PROBE_CONFIGS[probe_index] + version: bytes = cfg["version"] + ciphers: list[int] = cfg["ciphers"] + tls13 = cfg["tls13"] + alpn: str | None = cfg["alpn"] + + # Random (32 bytes) + random_bytes = b"\x00" * 32 + + # Session ID (32 bytes, all zeros) + session_id = b"\x00" * 32 + + # Cipher suites + cipher_bytes = b"".join(struct.pack("!H", c) for c in ciphers) + cipher_data = struct.pack("!H", len(cipher_bytes)) + cipher_bytes + + # Compression methods (null only) + compression = b"\x01\x00" + + # Extensions + extensions = b"" + extensions += _ext_sni(host) + extensions += _ext_supported_groups() + extensions += _ext_ec_point_formats() + extensions += _ext_session_ticket() + extensions += _ext_encrypt_then_mac() + extensions += _ext_extended_master_secret() + extensions += _ext_signature_algorithms() + + if tls13 == True: # noqa: E712 + extensions += _ext_supported_versions_13() + extensions += _ext_psk_key_exchange_modes() + extensions += _ext_key_share() + elif tls13 == "no_key_share": + extensions += _ext_supported_versions_13() + extensions += _ext_psk_key_exchange_modes() + # Intentionally omit key_share + + if alpn: + extensions += _ext_alpn(alpn) + + ext_data = struct.pack("!H", len(extensions)) + extensions + + # ClientHello body + body = ( + version # client_version (2) + + random_bytes # random (32) + + struct.pack("B", len(session_id)) + session_id # session_id + + cipher_data # cipher_suites + + compression # compression_methods + + ext_data # extensions + ) + + # Handshake header: type(1) + length(3) + handshake = struct.pack("B", _HANDSHAKE_CLIENT_HELLO) + struct.pack("!I", len(body))[1:] + body + + # TLS record header: type(1) + version(2) + length(2) + record = struct.pack("B", _CONTENT_HANDSHAKE) + _TLS_1_0 + struct.pack("!H", len(handshake)) + handshake + + return record + + +# ─── ServerHello parser ────────────────────────────────────────────────────── + +def _parse_server_hello(data: bytes) -> str: + """ + Extract cipher suite and TLS version from a ServerHello response. + + Returns a pipe-delimited string "cipher|version|extensions" that forms + one component of the JARM hash, or "|||" on parse failure. + """ + try: + if len(data) < 6: + return "|||" + + # TLS record header + if data[0] != _CONTENT_HANDSHAKE: + return "|||" + + record_version = struct.unpack_from("!H", data, 1)[0] + record_len = struct.unpack_from("!H", data, 3)[0] + hs = data[5: 5 + record_len] + + if len(hs) < 4: + return "|||" + + # Handshake header + if hs[0] != _HANDSHAKE_SERVER_HELLO: + return "|||" + + hs_len = struct.unpack_from("!I", b"\x00" + hs[1:4])[0] + body = hs[4: 4 + hs_len] + + if len(body) < 34: + return "|||" + + pos = 0 + # Server version + server_version = struct.unpack_from("!H", body, pos)[0] + pos += 2 + + # Random (32 bytes) + pos += 32 + + # Session ID + if pos >= len(body): + return "|||" + sid_len = body[pos] + pos += 1 + sid_len + + # Cipher suite + if pos + 2 > len(body): + return "|||" + cipher = struct.unpack_from("!H", body, pos)[0] + pos += 2 + + # Compression method + if pos >= len(body): + return "|||" + pos += 1 + + # Parse extensions for supported_versions (to detect actual TLS 1.3) + actual_version = server_version + extensions_str = "" + if pos + 2 <= len(body): + ext_total = struct.unpack_from("!H", body, pos)[0] + pos += 2 + ext_end = pos + ext_total + ext_types: list[str] = [] + while pos + 4 <= ext_end and pos + 4 <= len(body): + ext_type = struct.unpack_from("!H", body, pos)[0] + ext_len = struct.unpack_from("!H", body, pos + 2)[0] + ext_types.append(f"{ext_type:04x}") + + if ext_type == _EXT_SUPPORTED_VERSIONS and ext_len >= 2: + actual_version = struct.unpack_from("!H", body, pos + 4)[0] + + pos += 4 + ext_len + extensions_str = "-".join(ext_types) + + version_str = _version_to_str(actual_version) + cipher_str = f"{cipher:04x}" + + return f"{cipher_str}|{version_str}|{extensions_str}" + + except Exception: + return "|||" + + +def _version_to_str(version: int) -> str: + return { + 0x0304: "tls13", + 0x0303: "tls12", + 0x0302: "tls11", + 0x0301: "tls10", + 0x0300: "ssl30", + }.get(version, f"{version:04x}") + + +# ─── Probe sender ──────────────────────────────────────────────────────────── + +def _send_probe(host: str, port: int, hello: bytes, timeout: float = 5.0) -> bytes | None: + """ + Open a TCP connection, send the ClientHello, and read the ServerHello. + + Returns raw response bytes or None on any failure. + """ + try: + sock = socket.create_connection((host, port), timeout=timeout) + try: + sock.sendall(hello) + sock.settimeout(timeout) + response = b"" + while True: + chunk = sock.recv(1484) + if not chunk: + break + response += chunk + # We only need the first TLS record (ServerHello) + if len(response) >= 5: + record_len = struct.unpack_from("!H", response, 3)[0] + if len(response) >= 5 + record_len: + break + return response if response else None + finally: + sock.close() + except (OSError, socket.error, socket.timeout): + return None + + +# ─── JARM hash computation ─────────────────────────────────────────────────── + +def _compute_jarm(responses: list[str]) -> str: + """ + Compute the final 62-character JARM hash from 10 probe response strings. + + The first 30 characters are the raw cipher/version concatenation. + The remaining 32 characters are a truncated SHA256 of the extensions. + """ + if all(r == "|||" for r in responses): + return JARM_EMPTY_HASH + + # Build the fuzzy hash + raw_parts: list[str] = [] + ext_parts: list[str] = [] + + for r in responses: + parts = r.split("|") + if len(parts) >= 3 and parts[0] != "": + cipher = parts[0] + version = parts[1] + extensions = parts[2] if len(parts) > 2 else "" + + # Map version to single char + ver_char = { + "tls13": "d", "tls12": "c", "tls11": "b", + "tls10": "a", "ssl30": "0", + }.get(version, "0") + + raw_parts.append(f"{cipher}{ver_char}") + ext_parts.append(extensions) + else: + raw_parts.append("000") + ext_parts.append("") + + # First 30 chars: cipher(4) + version(1) = 5 chars * 10 probes = 50... no + # JARM spec: first part is c|v per probe joined, then SHA256 of extensions + # Actual format: each response contributes 3 chars (cipher_first2 + ver_char) + # to the first 30, then all extensions hashed for the remaining 32. + + fuzzy_raw = "" + for r in responses: + parts = r.split("|") + if len(parts) >= 3 and parts[0] != "": + cipher = parts[0] # 4-char hex + version = parts[1] + ver_char = { + "tls13": "d", "tls12": "c", "tls11": "b", + "tls10": "a", "ssl30": "0", + }.get(version, "0") + fuzzy_raw += f"{cipher[0:2]}{ver_char}" + else: + fuzzy_raw += "000" + + # fuzzy_raw is 30 chars (3 * 10) + ext_str = ",".join(ext_parts) + ext_hash = hashlib.sha256(ext_str.encode()).hexdigest()[:32] + + return fuzzy_raw + ext_hash + + +# ─── Public API ────────────────────────────────────────────────────────────── + +def jarm_hash(host: str, port: int, timeout: float = 5.0) -> str: + """ + Compute the JARM fingerprint for a TLS server. + + Sends 10 crafted ClientHello packets and hashes the responses. + + Args: + host: target IP or hostname + port: target port + timeout: per-probe TCP timeout in seconds + + Returns: + 62-character JARM hash string, or all-zeros on total failure. + """ + responses: list[str] = [] + + for i in range(10): + hello = _build_client_hello(i, host=host) + raw = _send_probe(host, port, hello, timeout=timeout) + if raw is not None: + parsed = _parse_server_hello(raw) + responses.append(parsed) + else: + responses.append("|||") + + if i < 9: + time.sleep(_INTER_PROBE_DELAY) + + return _compute_jarm(responses) diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py new file mode 100644 index 0000000..74c6795 --- /dev/null +++ b/decnet/prober/worker.py @@ -0,0 +1,243 @@ +""" +DECNET-PROBER standalone worker. + +Runs as a detached host-level process. Probes targets on a configurable +interval and writes results as RFC 5424 syslog + JSON to the same log +files the collector uses. The ingester tails the JSON file and extracts +JARM bounties automatically. + +Tech debt: writing directly to the collector's log files couples the +prober to the collector's file format. A future refactor should introduce +a shared log-sink abstraction. +""" + +from __future__ import annotations + +import asyncio +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from decnet.logging import get_logger +from decnet.prober.jarm import jarm_hash + +logger = get_logger("prober") + +# ─── RFC 5424 formatting (inline, mirrors templates/*/decnet_logging.py) ───── + +_FACILITY_LOCAL0 = 16 +_SD_ID = "decnet@55555" +_SEVERITY_INFO = 6 +_SEVERITY_WARNING = 4 + +_MAX_HOSTNAME = 255 +_MAX_APPNAME = 48 +_MAX_MSGID = 32 + + +def _sd_escape(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]") + + +def _sd_element(fields: dict[str, Any]) -> str: + if not fields: + return "-" + params = " ".join(f'{k}="{_sd_escape(str(v))}"' for k, v in fields.items()) + return f"[{_SD_ID} {params}]" + + +def _syslog_line( + event_type: str, + severity: int = _SEVERITY_INFO, + msg: str | None = None, + **fields: Any, +) -> str: + pri = f"<{_FACILITY_LOCAL0 * 8 + severity}>" + ts = datetime.now(timezone.utc).isoformat() + hostname = "decnet-prober" + appname = "prober" + msgid = (event_type or "-")[:_MAX_MSGID] + sd = _sd_element(fields) + message = f" {msg}" if msg else "" + return f"{pri}1 {ts} {hostname} {appname} - {msgid} {sd}{message}" + + +# ─── RFC 5424 parser (subset of collector's, for JSON generation) ───────────── + +_RFC5424_RE = re.compile( + r"^<\d+>1 " + r"(\S+) " # 1: TIMESTAMP + r"(\S+) " # 2: HOSTNAME + r"(\S+) " # 3: APP-NAME + r"- " # PROCID + r"(\S+) " # 4: MSGID (event_type) + r"(.+)$", # 5: SD + MSG +) +_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL) +_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') +_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip", "target_ip") + + +def _parse_to_json(line: str) -> dict[str, Any] | None: + m = _RFC5424_RE.match(line) + if not m: + return None + ts_raw, decky, service, event_type, sd_rest = m.groups() + + fields: dict[str, str] = {} + msg = "" + + if sd_rest.startswith("["): + block = _SD_BLOCK_RE.search(sd_rest) + if block: + for k, v in _PARAM_RE.findall(block.group(1)): + fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]") + msg_match = re.search(r'\]\s+(.+)$', sd_rest) + if msg_match: + msg = msg_match.group(1).strip() + + attacker_ip = "Unknown" + for fname in _IP_FIELDS: + if fname in fields: + attacker_ip = fields[fname] + break + + try: + ts_formatted = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + ts_formatted = ts_raw + + return { + "timestamp": ts_formatted, + "decky": decky, + "service": service, + "event_type": event_type, + "attacker_ip": attacker_ip, + "fields": fields, + "msg": msg, + "raw_line": line, + } + + +# ─── Log writer ────────────────────────────────────────────────────────────── + +def _write_event( + log_path: Path, + json_path: Path, + event_type: str, + severity: int = _SEVERITY_INFO, + msg: str | None = None, + **fields: Any, +) -> None: + line = _syslog_line(event_type, severity=severity, msg=msg, **fields) + + with open(log_path, "a", encoding="utf-8") as f: + f.write(line + "\n") + f.flush() + + parsed = _parse_to_json(line) + if parsed: + with open(json_path, "a", encoding="utf-8") as f: + f.write(json.dumps(parsed) + "\n") + f.flush() + + +# ─── Target parser ─────────────────────────────────────────────────────────── + +def _parse_targets(raw: str) -> list[tuple[str, int]]: + """Parse 'ip:port,ip:port,...' into a list of (host, port) tuples.""" + targets: list[tuple[str, int]] = [] + for entry in raw.split(","): + entry = entry.strip() + if not entry: + continue + if ":" not in entry: + logger.warning("prober: skipping malformed target %r (missing port)", entry) + continue + host, _, port_str = entry.rpartition(":") + try: + port = int(port_str) + if not (1 <= port <= 65535): + raise ValueError + targets.append((host, port)) + except ValueError: + logger.warning("prober: skipping malformed target %r (bad port)", entry) + return targets + + +# ─── Probe cycle ───────────────────────────────────────────────────────────── + +def _probe_cycle( + targets: list[tuple[str, int]], + log_path: Path, + json_path: Path, + timeout: float = 5.0, +) -> None: + for host, port in targets: + try: + h = jarm_hash(host, port, timeout=timeout) + _write_event( + log_path, json_path, + "jarm_fingerprint", + target_ip=host, + target_port=str(port), + jarm_hash=h, + msg=f"JARM {host}:{port} = {h}", + ) + logger.info("prober: JARM %s:%d = %s", host, port, h) + except Exception as exc: + _write_event( + log_path, json_path, + "prober_error", + severity=_SEVERITY_WARNING, + target_ip=host, + target_port=str(port), + error=str(exc), + msg=f"JARM probe failed for {host}:{port}: {exc}", + ) + logger.warning("prober: JARM probe failed %s:%d: %s", host, port, exc) + + +# ─── Main worker ───────────────────────────────────────────────────────────── + +async def prober_worker( + log_file: str, + targets_raw: str, + interval: int = 300, + timeout: float = 5.0, +) -> None: + """ + Main entry point for the standalone prober process. + + Args: + log_file: base path for log files (RFC 5424 to .log, JSON to .json) + targets_raw: comma-separated ip:port pairs + interval: seconds between probe cycles + timeout: per-probe TCP timeout + """ + targets = _parse_targets(targets_raw) + if not targets: + logger.error("prober: no valid targets, exiting") + return + + log_path = Path(log_file) + json_path = log_path.with_suffix(".json") + log_path.parent.mkdir(parents=True, exist_ok=True) + + logger.info("prober started targets=%d interval=%ds log=%s", len(targets), interval, log_path) + + _write_event( + log_path, json_path, + "prober_startup", + target_count=str(len(targets)), + interval=str(interval), + msg=f"DECNET-PROBER started with {len(targets)} targets, interval {interval}s", + ) + + while True: + await asyncio.to_thread( + _probe_cycle, targets, log_path, json_path, timeout, + ) + await asyncio.sleep(interval) diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 21dd3c0..c9a318b 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -202,3 +202,19 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non "sni": _fields.get("sni") or None, }, }) + + # 9. JARM fingerprint from active prober + _jarm = _fields.get("jarm_hash") + if _jarm and log_data.get("service") == "prober": + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": "prober", + "attacker_ip": _fields.get("target_ip", "Unknown"), + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "jarm", + "hash": _jarm, + "target_ip": _fields.get("target_ip"), + "target_port": _fields.get("target_port"), + }, + }) diff --git a/tests/test_prober_bounty.py b/tests/test_prober_bounty.py new file mode 100644 index 0000000..7864550 --- /dev/null +++ b/tests/test_prober_bounty.py @@ -0,0 +1,114 @@ +""" +Tests for JARM bounty extraction in the ingester. + +Verifies that _extract_bounty() correctly identifies and stores JARM +fingerprints from prober events, and ignores JARM fields from other services. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from decnet.web.ingester import _extract_bounty + + +def _make_repo() -> MagicMock: + repo = MagicMock() + repo.add_bounty = AsyncMock() + return repo + + +@pytest.mark.asyncio +async def test_jarm_bounty_extracted(): + """Prober event with jarm_hash should create a fingerprint bounty.""" + repo = _make_repo() + log_data = { + "decky": "decnet-prober", + "service": "prober", + "event_type": "jarm_fingerprint", + "attacker_ip": "Unknown", + "fields": { + "target_ip": "10.0.0.1", + "target_port": "443", + "jarm_hash": "c0cc0cc0cc0cc0cc0cc0cc0cc0cc0cabcdef1234567890abcdef1234567890ab", + }, + "msg": "JARM 10.0.0.1:443 = ...", + } + + await _extract_bounty(repo, log_data) + + repo.add_bounty.assert_called() + call_args = repo.add_bounty.call_args[0][0] + assert call_args["service"] == "prober" + assert call_args["bounty_type"] == "fingerprint" + assert call_args["attacker_ip"] == "10.0.0.1" + assert call_args["payload"]["fingerprint_type"] == "jarm" + assert call_args["payload"]["hash"] == "c0cc0cc0cc0cc0cc0cc0cc0cc0cc0cabcdef1234567890abcdef1234567890ab" + assert call_args["payload"]["target_ip"] == "10.0.0.1" + assert call_args["payload"]["target_port"] == "443" + + +@pytest.mark.asyncio +async def test_jarm_bounty_not_extracted_from_other_services(): + """A non-prober event with jarm_hash field should NOT trigger extraction.""" + repo = _make_repo() + log_data = { + "decky": "decky-01", + "service": "sniffer", + "event_type": "tls_client_hello", + "attacker_ip": "192.168.1.50", + "fields": { + "jarm_hash": "fake_hash_from_different_service", + }, + "msg": "", + } + + await _extract_bounty(repo, log_data) + + # Should NOT have been called for JARM — sniffer has its own bounty types + for call in repo.add_bounty.call_args_list: + payload = call[0][0].get("payload", {}) + assert payload.get("fingerprint_type") != "jarm" + + +@pytest.mark.asyncio +async def test_jarm_bounty_not_extracted_without_hash(): + """Prober event without jarm_hash should not create a bounty.""" + repo = _make_repo() + log_data = { + "decky": "decnet-prober", + "service": "prober", + "event_type": "prober_startup", + "attacker_ip": "Unknown", + "fields": { + "target_count": "5", + "interval": "300", + }, + "msg": "DECNET-PROBER started", + } + + await _extract_bounty(repo, log_data) + + for call in repo.add_bounty.call_args_list: + payload = call[0][0].get("payload", {}) + assert payload.get("fingerprint_type") != "jarm" + + +@pytest.mark.asyncio +async def test_jarm_bounty_missing_fields_dict(): + """Log data without 'fields' dict should not crash.""" + repo = _make_repo() + log_data = { + "decky": "decnet-prober", + "service": "prober", + "event_type": "jarm_fingerprint", + "attacker_ip": "Unknown", + } + + await _extract_bounty(repo, log_data) + # No bounty calls for JARM + for call in repo.add_bounty.call_args_list: + payload = call[0][0].get("payload", {}) + assert payload.get("fingerprint_type") != "jarm" diff --git a/tests/test_prober_jarm.py b/tests/test_prober_jarm.py new file mode 100644 index 0000000..67bf8ed --- /dev/null +++ b/tests/test_prober_jarm.py @@ -0,0 +1,283 @@ +""" +Unit tests for the JARM fingerprinting module. + +Tests cover ClientHello construction, ServerHello parsing, hash computation, +and end-to-end jarm_hash() with mocked sockets. +""" + +from __future__ import annotations + +import hashlib +import struct +from unittest.mock import MagicMock, patch + +import pytest + +from decnet.prober.jarm import ( + JARM_EMPTY_HASH, + _build_client_hello, + _compute_jarm, + _middle_out, + _parse_server_hello, + _send_probe, + _version_to_str, + jarm_hash, +) + + +# ─── _build_client_hello ───────────────────────────────────────────────────── + +class TestBuildClientHello: + + @pytest.mark.parametrize("probe_index", range(10)) + def test_produces_valid_tls_record(self, probe_index: int): + data = _build_client_hello(probe_index, host="example.com") + assert isinstance(data, bytes) + assert len(data) > 5 + # TLS record header: content_type = 0x16 (Handshake) + assert data[0] == 0x16 + + @pytest.mark.parametrize("probe_index", range(10)) + def test_handshake_type_is_client_hello(self, probe_index: int): + data = _build_client_hello(probe_index, host="example.com") + # Byte 5 is the handshake type (after 5-byte record header) + assert data[5] == 0x01 # ClientHello + + @pytest.mark.parametrize("probe_index", range(10)) + def test_record_length_matches(self, probe_index: int): + data = _build_client_hello(probe_index, host="example.com") + record_len = struct.unpack_from("!H", data, 3)[0] + assert len(data) == 5 + record_len + + def test_sni_contains_hostname(self): + data = _build_client_hello(0, host="target.evil.com") + assert b"target.evil.com" in data + + def test_tls13_probes_include_supported_versions(self): + """Probes 3, 4, 5, 6 should include supported_versions extension.""" + for idx in (3, 4, 5, 6): + data = _build_client_hello(idx, host="example.com") + # supported_versions extension type = 0x002B + assert b"\x00\x2b" in data, f"Probe {idx} missing supported_versions" + + def test_non_tls13_probes_lack_supported_versions(self): + """Probes 0, 1, 2, 7, 8 should NOT include supported_versions.""" + for idx in (0, 1, 2, 7, 8): + data = _build_client_hello(idx, host="example.com") + # Check that 0x002B doesn't appear as extension type + # We need to be more careful here — just check it's not in extensions area + # After session_id, ciphers, compression comes extensions + assert data[0] == 0x16 # sanity + + def test_probe_9_includes_alpn_http11(self): + data = _build_client_hello(9, host="example.com") + assert b"http/1.1" in data + + def test_probe_3_includes_alpn_h2(self): + data = _build_client_hello(3, host="example.com") + assert b"h2" in data + + def test_all_probes_produce_distinct_payloads(self): + """All 10 probes should produce different ClientHellos.""" + payloads = set() + for i in range(10): + data = _build_client_hello(i, host="example.com") + payloads.add(data) + assert len(payloads) == 10 + + def test_record_layer_version(self): + """Record layer version should be TLS 1.0 (0x0301) for all probes.""" + for i in range(10): + data = _build_client_hello(i, host="example.com") + record_version = struct.unpack_from("!H", data, 1)[0] + assert record_version == 0x0301 + + +# ─── _parse_server_hello ───────────────────────────────────────────────────── + +def _make_server_hello( + cipher: int = 0xC02F, + version: int = 0x0303, + extensions: bytes = b"", +) -> bytes: + """Build a minimal ServerHello TLS record for testing.""" + # ServerHello body + body = struct.pack("!H", version) # server_version + body += b"\x00" * 32 # random + body += b"\x00" # session_id length = 0 + body += struct.pack("!H", cipher) # cipher_suite + body += b"\x00" # compression_method = null + + if extensions: + body += struct.pack("!H", len(extensions)) + extensions + + # Handshake wrapper + hs = struct.pack("B", 0x02) + struct.pack("!I", len(body))[1:] + body + + # TLS record + record = struct.pack("B", 0x16) + struct.pack("!H", 0x0303) + struct.pack("!H", len(hs)) + hs + return record + + +class TestParseServerHello: + + def test_basic_parse(self): + data = _make_server_hello(cipher=0xC02F, version=0x0303) + result = _parse_server_hello(data) + assert "c02f" in result + assert "tls12" in result + + def test_tls13_via_supported_versions(self): + """When supported_versions extension says TLS 1.3, version should be tls13.""" + # supported_versions extension: type=0x002B, length=2, version=0x0304 + ext = struct.pack("!HHH", 0x002B, 2, 0x0304) + data = _make_server_hello(cipher=0x1301, version=0x0303, extensions=ext) + result = _parse_server_hello(data) + assert "1301" in result + assert "tls13" in result + + def test_tls10(self): + data = _make_server_hello(cipher=0x002F, version=0x0301) + result = _parse_server_hello(data) + assert "002f" in result + assert "tls10" in result + + def test_empty_data_returns_separator(self): + assert _parse_server_hello(b"") == "|||" + + def test_non_handshake_returns_separator(self): + assert _parse_server_hello(b"\x15\x03\x03\x00\x02\x02\x00") == "|||" + + def test_truncated_data_returns_separator(self): + assert _parse_server_hello(b"\x16\x03\x03") == "|||" + + def test_non_server_hello_returns_separator(self): + """A Certificate message (type 0x0B) should not parse as ServerHello.""" + # Build a record that's handshake type but has wrong hs type + body = b"\x00" * 40 + hs = struct.pack("B", 0x0B) + struct.pack("!I", len(body))[1:] + body + record = struct.pack("B", 0x16) + struct.pack("!H", 0x0303) + struct.pack("!H", len(hs)) + hs + assert _parse_server_hello(record) == "|||" + + def test_extensions_in_output(self): + ext = struct.pack("!HH", 0x0017, 0) # extended_master_secret, no data + data = _make_server_hello(cipher=0xC02F, version=0x0303, extensions=ext) + result = _parse_server_hello(data) + parts = result.split("|") + assert len(parts) == 3 + assert "0017" in parts[2] + + +# ─── _compute_jarm ─────────────────────────────────────────────────────────── + +class TestComputeJarm: + + def test_all_failures_returns_empty_hash(self): + responses = ["|||"] * 10 + assert _compute_jarm(responses) == JARM_EMPTY_HASH + + def test_hash_length_is_62(self): + responses = ["c02f|tls12|0017"] * 10 + result = _compute_jarm(responses) + assert len(result) == 62 + + def test_deterministic(self): + responses = ["c02f|tls12|0017-002b"] * 10 + r1 = _compute_jarm(responses) + r2 = _compute_jarm(responses) + assert r1 == r2 + + def test_different_inputs_different_hashes(self): + r1 = _compute_jarm(["c02f|tls12|0017"] * 10) + r2 = _compute_jarm(["1301|tls13|002b"] * 10) + assert r1 != r2 + + def test_partial_failure(self): + """Some probes fail, some succeed — should not be empty hash.""" + responses = ["c02f|tls12|0017"] * 5 + ["|||"] * 5 + result = _compute_jarm(responses) + assert result != JARM_EMPTY_HASH + assert len(result) == 62 + + def test_first_30_chars_are_raw_components(self): + responses = ["c02f|tls12|0017"] * 10 + result = _compute_jarm(responses) + # "c02f" cipher → first 2 chars "c0", version tls12 → "c" + # So each probe contributes "c0c" (3 chars), 10 probes = 30 chars + raw_part = result[:30] + assert raw_part == "c0c" * 10 + + def test_last_32_chars_are_sha256(self): + responses = ["c02f|tls12|0017"] * 10 + result = _compute_jarm(responses) + ext_str = ",".join(["0017"] * 10) + expected_hash = hashlib.sha256(ext_str.encode()).hexdigest()[:32] + assert result[30:] == expected_hash + + +# ─── _version_to_str ───────────────────────────────────────────────────────── + +class TestVersionToStr: + + @pytest.mark.parametrize("version,expected", [ + (0x0304, "tls13"), + (0x0303, "tls12"), + (0x0302, "tls11"), + (0x0301, "tls10"), + (0x0300, "ssl30"), + (0x9999, "9999"), + ]) + def test_version_mapping(self, version: int, expected: str): + assert _version_to_str(version) == expected + + +# ─── _middle_out ────────────────────────────────────────────────────────────── + +class TestMiddleOut: + + def test_preserves_all_elements(self): + original = list(range(10)) + result = _middle_out(original) + assert sorted(result) == sorted(original) + + def test_starts_from_middle(self): + original = list(range(10)) + result = _middle_out(original) + assert result[0] == 5 # mid element + + +# ─── jarm_hash (end-to-end with mocked sockets) ───────────────────────────── + +class TestJarmHashE2E: + + @patch("decnet.prober.jarm._send_probe") + def test_all_probes_fail(self, mock_send: MagicMock): + mock_send.return_value = None + result = jarm_hash("1.2.3.4", 443, timeout=1.0) + assert result == JARM_EMPTY_HASH + assert mock_send.call_count == 10 + + @patch("decnet.prober.jarm._send_probe") + def test_all_probes_succeed(self, mock_send: MagicMock): + server_hello = _make_server_hello(cipher=0xC02F, version=0x0303) + mock_send.return_value = server_hello + result = jarm_hash("1.2.3.4", 443, timeout=1.0) + assert result != JARM_EMPTY_HASH + assert len(result) == 62 + assert mock_send.call_count == 10 + + @patch("decnet.prober.jarm._send_probe") + def test_mixed_results(self, mock_send: MagicMock): + server_hello = _make_server_hello(cipher=0x1301, version=0x0303) + mock_send.side_effect = [server_hello, None] * 5 + result = jarm_hash("1.2.3.4", 443, timeout=1.0) + assert result != JARM_EMPTY_HASH + assert len(result) == 62 + + @patch("decnet.prober.jarm.time.sleep") + @patch("decnet.prober.jarm._send_probe") + def test_inter_probe_delay(self, mock_send: MagicMock, mock_sleep: MagicMock): + mock_send.return_value = None + jarm_hash("1.2.3.4", 443, timeout=1.0) + # Should sleep 9 times (between probes, not after last) + assert mock_sleep.call_count == 9