diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 9427b90..21dd3c0 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -143,6 +143,8 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non "fingerprint_type": "ja3", "ja3": _ja3, "ja3s": _fields.get("ja3s"), + "ja4": _fields.get("ja4"), + "ja4s": _fields.get("ja4s"), "tls_version": _fields.get("tls_version"), "sni": _fields.get("sni") or None, "alpn": _fields.get("alpn") or None, @@ -151,3 +153,52 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non "raw_extensions": _fields.get("raw_extensions"), }, }) + + # 6. JA4L latency fingerprint from sniffer + _ja4l_rtt = _fields.get("ja4l_rtt_ms") + if _ja4l_rtt and log_data.get("service") == "sniffer": + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": "sniffer", + "attacker_ip": log_data.get("attacker_ip"), + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "ja4l", + "rtt_ms": _ja4l_rtt, + "client_ttl": _fields.get("ja4l_client_ttl"), + }, + }) + + # 7. TLS session resumption behavior + _resumption = _fields.get("resumption") + if _resumption and log_data.get("service") == "sniffer": + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": "sniffer", + "attacker_ip": log_data.get("attacker_ip"), + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "tls_resumption", + "mechanisms": _resumption, + }, + }) + + # 8. TLS certificate details (TLS 1.2 only — passive extraction) + _subject_cn = _fields.get("subject_cn") + if _subject_cn and log_data.get("service") == "sniffer": + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": "sniffer", + "attacker_ip": log_data.get("attacker_ip"), + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "tls_certificate", + "subject_cn": _subject_cn, + "issuer": _fields.get("issuer"), + "self_signed": _fields.get("self_signed"), + "not_before": _fields.get("not_before"), + "not_after": _fields.get("not_after"), + "sans": _fields.get("sans"), + "sni": _fields.get("sni") or None, + }, + }) diff --git a/templates/sniffer/server.py b/templates/sniffer/server.py index 53c3b79..bc9ccd9 100644 --- a/templates/sniffer/server.py +++ b/templates/sniffer/server.py @@ -3,14 +3,20 @@ DECNET passive TLS sniffer. Captures TLS handshakes on the MACVLAN interface (shared network namespace -with the decky base container). Extracts JA3/JA3S fingerprints and connection +with the decky base container). Extracts fingerprints and connection metadata, then emits structured RFC 5424 log lines to stdout for the host-side collector to ingest. Requires: NET_RAW + NET_ADMIN capabilities (set in compose fragment). -JA3 — MD5(SSLVersion,Ciphers,Extensions,EllipticCurves,ECPointFormats) -JA3S — MD5(SSLVersion,Cipher,Extensions) +Supported fingerprints: + JA3 — MD5(SSLVersion,Ciphers,Extensions,EllipticCurves,ECPointFormats) + JA3S — MD5(SSLVersion,Cipher,Extensions) + JA4 — {proto}{ver}{sni}{#cs}{#ext}{alpn}_{sha256_12(sorted_cs)}_{sha256_12(sorted_ext,sigalgs)} + JA4S — {proto}{ver}{#ext}{alpn}_{sha256_12(cipher,sorted_ext)} + JA4L — TCP RTT latency measurement (client_ttl, server_rtt_ms) + TLS session resumption detection (session tickets, PSK, 0-RTT) + Certificate extraction (TLS ≤1.2 only — 1.3 encrypts certs) GREASE values (RFC 8701) are excluded from all lists before hashing. """ @@ -43,13 +49,22 @@ _GREASE: frozenset[int] = frozenset(0x0A0A + i * 0x1010 for i in range(16)) _TLS_RECORD_HANDSHAKE: int = 0x16 _TLS_HT_CLIENT_HELLO: int = 0x01 _TLS_HT_SERVER_HELLO: int = 0x02 +_TLS_HT_CERTIFICATE: int = 0x0B # TLS extension types we extract for metadata _EXT_SNI: int = 0x0000 _EXT_SUPPORTED_GROUPS: int = 0x000A _EXT_EC_POINT_FORMATS: int = 0x000B +_EXT_SIGNATURE_ALGORITHMS: int = 0x000D _EXT_ALPN: int = 0x0010 _EXT_SESSION_TICKET: int = 0x0023 +_EXT_SUPPORTED_VERSIONS: int = 0x002B +_EXT_PRE_SHARED_KEY: int = 0x0029 +_EXT_EARLY_DATA: int = 0x002A + +# TCP flags +_TCP_SYN: int = 0x02 +_TCP_ACK: int = 0x10 # ─── Session tracking ───────────────────────────────────────────────────────── @@ -58,6 +73,12 @@ _EXT_SESSION_TICKET: int = 0x0023 _sessions: dict[tuple[str, int, str, int], dict[str, Any]] = {} _session_ts: dict[tuple[str, int, str, int], float] = {} +# TCP RTT tracking for JA4L: key = (client_ip, client_port, server_ip, server_port) +# Value: {"syn_time": float, "ttl": int} +_tcp_syn: dict[tuple[str, int, str, int], dict[str, Any]] = {} +# Completed RTT measurements: key = same 4-tuple, value = {"rtt_ms": float, "client_ttl": int} +_tcp_rtt: dict[tuple[str, int, str, int], dict[str, Any]] = {} + # ─── GREASE helpers ─────────────────────────────────────────────────────────── @@ -106,6 +127,7 @@ def _parse_client_hello(data: bytes) -> dict[str, Any] | None: # Session ID session_id_len = body[pos] + session_id = body[pos + 1: pos + 1 + session_id_len] pos += 1 + session_id_len # Cipher Suites @@ -125,8 +147,13 @@ def _parse_client_hello(data: bytes) -> dict[str, Any] | None: extensions: list[int] = [] supported_groups: list[int] = [] ec_point_formats: list[int] = [] + signature_algorithms: list[int] = [] + supported_versions: list[int] = [] sni: str = "" alpn: list[str] = [] + has_session_ticket_data: bool = False + has_pre_shared_key: bool = False + has_early_data: bool = False if pos + 2 <= len(body): ext_total = struct.unpack_from("!H", body, pos)[0] @@ -165,8 +192,33 @@ def _parse_client_hello(data: bytes) -> dict[str, Any] | None: alpn.append(ext_data[ap + 1: ap + 1 + plen].decode("ascii", errors="replace")) ap += 1 + plen + elif ext_type == _EXT_SIGNATURE_ALGORITHMS and len(ext_data) >= 2: + sa_len = struct.unpack_from("!H", ext_data, 0)[0] + signature_algorithms = [ + struct.unpack_from("!H", ext_data, 2 + i * 2)[0] + for i in range(sa_len // 2) + ] + + elif ext_type == _EXT_SUPPORTED_VERSIONS and len(ext_data) >= 1: + sv_len = ext_data[0] + supported_versions = [ + struct.unpack_from("!H", ext_data, 1 + i * 2)[0] + for i in range(sv_len // 2) + ] + + elif ext_type == _EXT_SESSION_TICKET: + has_session_ticket_data = len(ext_data) > 0 + + elif ext_type == _EXT_PRE_SHARED_KEY: + has_pre_shared_key = True + + elif ext_type == _EXT_EARLY_DATA: + has_early_data = True + filtered_ciphers = _filter_grease(cipher_suites) filtered_groups = _filter_grease(supported_groups) + filtered_sig_algs = _filter_grease(signature_algorithms) + filtered_versions = _filter_grease(supported_versions) return { "tls_version": tls_version, @@ -174,8 +226,14 @@ def _parse_client_hello(data: bytes) -> dict[str, Any] | None: "extensions": extensions, "supported_groups": filtered_groups, "ec_point_formats": ec_point_formats, + "signature_algorithms": filtered_sig_algs, + "supported_versions": filtered_versions, "sni": sni, "alpn": alpn, + "session_id": session_id, + "has_session_ticket_data": has_session_ticket_data, + "has_pre_shared_key": has_pre_shared_key, + "has_early_data": has_early_data, } except Exception: @@ -221,6 +279,9 @@ def _parse_server_hello(data: bytes) -> dict[str, Any] | None: pos += 1 extensions: list[int] = [] + selected_version: int | None = None + alpn: str = "" + if pos + 2 <= len(body): ext_total = struct.unpack_from("!H", body, pos)[0] pos += 2 @@ -228,20 +289,329 @@ def _parse_server_hello(data: bytes) -> dict[str, Any] | None: while pos + 4 <= ext_end: ext_type = struct.unpack_from("!H", body, pos)[0] ext_len = struct.unpack_from("!H", body, pos + 2)[0] + ext_data = body[pos + 4: pos + 4 + ext_len] pos += 4 + ext_len if not _is_grease(ext_type): extensions.append(ext_type) + if ext_type == _EXT_SUPPORTED_VERSIONS and len(ext_data) >= 2: + selected_version = struct.unpack_from("!H", ext_data, 0)[0] + + elif ext_type == _EXT_ALPN and len(ext_data) >= 2: + proto_list_len = struct.unpack_from("!H", ext_data, 0)[0] + if proto_list_len > 0 and len(ext_data) >= 4: + plen = ext_data[2] + alpn = ext_data[3: 3 + plen].decode("ascii", errors="replace") + return { "tls_version": tls_version, "cipher_suite": cipher_suite, "extensions": extensions, + "selected_version": selected_version, + "alpn": alpn, } except Exception: return None +def _parse_certificate(data: bytes) -> dict[str, Any] | None: + """ + Parse a TLS Certificate handshake message from raw bytes. + + Only works for TLS 1.2 and below — TLS 1.3 encrypts the Certificate + message. Extracts basic details from the first (leaf) certificate + using minimal DER/ASN.1 parsing. + """ + try: + if len(data) < 6 or data[0] != _TLS_RECORD_HANDSHAKE: + return None + + hs = data[5:] + if hs[0] != _TLS_HT_CERTIFICATE: + return None + + hs_len = struct.unpack_from("!I", b"\x00" + hs[1:4])[0] + body = hs[4: 4 + hs_len] + if len(body) < 3: + return None + + # Certificate list total length (3 bytes) + certs_len = struct.unpack_from("!I", b"\x00" + body[0:3])[0] + if certs_len == 0: + return None + + pos = 3 + # First certificate length (3 bytes) + if pos + 3 > len(body): + return None + cert_len = struct.unpack_from("!I", b"\x00" + body[pos:pos + 3])[0] + pos += 3 + if pos + cert_len > len(body): + return None + + cert_der = body[pos: pos + cert_len] + return _parse_x509_der(cert_der) + + except Exception: + return None + + +# ─── Minimal DER/ASN.1 X.509 parser ───────────────────────────────────────── + +def _der_read_tag_len(data: bytes, pos: int) -> tuple[int, int, int]: + """Read a DER tag and length. Returns (tag, content_start, content_length).""" + tag = data[pos] + pos += 1 + length_byte = data[pos] + pos += 1 + if length_byte & 0x80: + num_bytes = length_byte & 0x7F + length = int.from_bytes(data[pos: pos + num_bytes], "big") + pos += num_bytes + else: + length = length_byte + return tag, pos, length + + +def _der_read_sequence(data: bytes, pos: int) -> tuple[int, int]: + """Read a SEQUENCE tag, return (content_start, content_length).""" + tag, content_start, length = _der_read_tag_len(data, pos) + return content_start, length + + +def _der_read_oid(data: bytes, pos: int, length: int) -> str: + """Decode a DER OID to dotted string.""" + if length < 1: + return "" + first = data[pos] + oid_parts = [str(first // 40), str(first % 40)] + val = 0 + for i in range(1, length): + b = data[pos + i] + val = (val << 7) | (b & 0x7F) + if not (b & 0x80): + oid_parts.append(str(val)) + val = 0 + return ".".join(oid_parts) + + +def _der_extract_cn(data: bytes, start: int, length: int) -> str: + """Walk an X.501 Name (SEQUENCE of SETs of SEQUENCE of OID+value) to find CN.""" + pos = start + end = start + length + while pos < end: + # Each RDN is a SET + set_tag, set_start, set_len = _der_read_tag_len(data, pos) + if set_tag != 0x31: # SET + break + set_end = set_start + set_len + + # Inside the SET, each attribute is a SEQUENCE + attr_pos = set_start + while attr_pos < set_end: + seq_tag, seq_start, seq_len = _der_read_tag_len(data, attr_pos) + if seq_tag != 0x30: # SEQUENCE + break + # OID + oid_tag, oid_start, oid_len = _der_read_tag_len(data, seq_start) + if oid_tag == 0x06: + oid = _der_read_oid(data, oid_start, oid_len) + # CN OID = 2.5.4.3 + if oid == "2.5.4.3": + val_tag, val_start, val_len = _der_read_tag_len(data, oid_start + oid_len) + return data[val_start: val_start + val_len].decode("utf-8", errors="replace") + attr_pos = seq_start + seq_len + + pos = set_end + return "" + + +def _der_extract_name_str(data: bytes, start: int, length: int) -> str: + """Extract a human-readable summary of an X.501 Name (all RDN values joined).""" + parts: list[str] = [] + pos = start + end = start + length + oid_names = { + "2.5.4.3": "CN", + "2.5.4.6": "C", + "2.5.4.7": "L", + "2.5.4.8": "ST", + "2.5.4.10": "O", + "2.5.4.11": "OU", + } + while pos < end: + set_tag, set_start, set_len = _der_read_tag_len(data, pos) + if set_tag != 0x31: + break + set_end = set_start + set_len + attr_pos = set_start + while attr_pos < set_end: + seq_tag, seq_start, seq_len = _der_read_tag_len(data, attr_pos) + if seq_tag != 0x30: + break + oid_tag, oid_start, oid_len = _der_read_tag_len(data, seq_start) + if oid_tag == 0x06: + oid = _der_read_oid(data, oid_start, oid_len) + val_tag, val_start, val_len = _der_read_tag_len(data, oid_start + oid_len) + val = data[val_start: val_start + val_len].decode("utf-8", errors="replace") + name = oid_names.get(oid, oid) + parts.append(f"{name}={val}") + attr_pos = seq_start + seq_len + pos = set_end + return ", ".join(parts) + + +def _parse_x509_der(cert_der: bytes) -> dict[str, Any] | None: + """ + Minimal X.509 DER parser. Extracts subject CN, issuer string, + validity period, and self-signed flag. + + Structure: SEQUENCE { tbsCertificate, signatureAlgorithm, signatureValue } + tbsCertificate: SEQUENCE { + version [0] EXPLICIT, serialNumber, signature, + issuer, validity { notBefore, notAfter }, + subject, subjectPublicKeyInfo, ...extensions + } + """ + try: + # Outer SEQUENCE + outer_start, outer_len = _der_read_sequence(cert_der, 0) + # tbsCertificate SEQUENCE + tbs_tag, tbs_start, tbs_len = _der_read_tag_len(cert_der, outer_start) + tbs_end = tbs_start + tbs_len + pos = tbs_start + + # version [0] EXPLICIT — optional, skip if present + if cert_der[pos] == 0xA0: + _, v_start, v_len = _der_read_tag_len(cert_der, pos) + pos = v_start + v_len + + # serialNumber (INTEGER) + _, sn_start, sn_len = _der_read_tag_len(cert_der, pos) + pos = sn_start + sn_len + + # signature algorithm (SEQUENCE) + _, sa_start, sa_len = _der_read_tag_len(cert_der, pos) + pos = sa_start + sa_len + + # issuer (SEQUENCE) + issuer_tag, issuer_start, issuer_len = _der_read_tag_len(cert_der, pos) + issuer_str = _der_extract_name_str(cert_der, issuer_start, issuer_len) + issuer_cn = _der_extract_cn(cert_der, issuer_start, issuer_len) + pos = issuer_start + issuer_len + + # validity (SEQUENCE of two times) + val_tag, val_start, val_len = _der_read_tag_len(cert_der, pos) + # notBefore + nb_tag, nb_start, nb_len = _der_read_tag_len(cert_der, val_start) + not_before = cert_der[nb_start: nb_start + nb_len].decode("ascii", errors="replace") + # notAfter + na_tag, na_start, na_len = _der_read_tag_len(cert_der, nb_start + nb_len) + not_after = cert_der[na_start: na_start + na_len].decode("ascii", errors="replace") + pos = val_start + val_len + + # subject (SEQUENCE) + subj_tag, subj_start, subj_len = _der_read_tag_len(cert_der, pos) + subject_cn = _der_extract_cn(cert_der, subj_start, subj_len) + subject_str = _der_extract_name_str(cert_der, subj_start, subj_len) + + # Self-signed: issuer CN matches subject CN (basic check) + self_signed = (issuer_cn == subject_cn) and subject_cn != "" + + # SANs are in extensions — attempt to find them + pos = subj_start + subj_len + sans: list[str] = _extract_sans(cert_der, pos, tbs_end) + + return { + "subject_cn": subject_cn, + "subject": subject_str, + "issuer": issuer_str, + "issuer_cn": issuer_cn, + "not_before": not_before, + "not_after": not_after, + "self_signed": self_signed, + "sans": sans, + } + + except Exception: + return None + + +def _extract_sans(cert_der: bytes, pos: int, end: int) -> list[str]: + """ + Attempt to extract Subject Alternative Names from X.509v3 extensions. + SAN OID = 2.5.29.17 + """ + sans: list[str] = [] + try: + # Skip subjectPublicKeyInfo SEQUENCE + if pos >= end: + return sans + spki_tag, spki_start, spki_len = _der_read_tag_len(cert_der, pos) + pos = spki_start + spki_len + + # Extensions are wrapped in [3] EXPLICIT + while pos < end: + tag = cert_der[pos] + if tag == 0xA3: # [3] EXPLICIT — extensions wrapper + _, ext_wrap_start, ext_wrap_len = _der_read_tag_len(cert_der, pos) + # Inner SEQUENCE of extensions + _, exts_start, exts_len = _der_read_tag_len(cert_der, ext_wrap_start) + epos = exts_start + eend = exts_start + exts_len + while epos < eend: + # Each extension is a SEQUENCE { OID, [critical], value } + ext_tag, ext_start, ext_len = _der_read_tag_len(cert_der, epos) + ext_end = ext_start + ext_len + + oid_tag, oid_start, oid_len = _der_read_tag_len(cert_der, ext_start) + if oid_tag == 0x06: + oid = _der_read_oid(cert_der, oid_start, oid_len) + if oid == "2.5.29.17": # SAN + # Find the OCTET STRING containing the SAN value + vpos = oid_start + oid_len + # Skip optional BOOLEAN (critical) + if vpos < ext_end and cert_der[vpos] == 0x01: + _, bs, bl = _der_read_tag_len(cert_der, vpos) + vpos = bs + bl + # OCTET STRING wrapping the SAN SEQUENCE + if vpos < ext_end: + os_tag, os_start, os_len = _der_read_tag_len(cert_der, vpos) + if os_tag == 0x04: + sans = _parse_san_sequence(cert_der, os_start, os_len) + epos = ext_end + break + else: + _, skip_start, skip_len = _der_read_tag_len(cert_der, pos) + pos = skip_start + skip_len + except Exception: + pass + return sans + + +def _parse_san_sequence(data: bytes, start: int, length: int) -> list[str]: + """Parse a GeneralNames SEQUENCE to extract DNS names and IPs.""" + names: list[str] = [] + try: + # The SAN value is itself a SEQUENCE of GeneralName + seq_tag, seq_start, seq_len = _der_read_tag_len(data, start) + pos = seq_start + end = seq_start + seq_len + while pos < end: + tag = data[pos] + _, val_start, val_len = _der_read_tag_len(data, pos) + context_tag = tag & 0x1F + if context_tag == 2: # dNSName + names.append(data[val_start: val_start + val_len].decode("ascii", errors="replace")) + elif context_tag == 7 and val_len == 4: # iPAddress (IPv4) + names.append(".".join(str(b) for b in data[val_start: val_start + val_len])) + pos = val_start + val_len + except Exception: + pass + return names + + # ─── JA3 / JA3S computation ─────────────────────────────────────────────────── def _tls_version_str(version: int) -> str: @@ -279,6 +649,161 @@ def _ja3s(sh: dict[str, Any]) -> tuple[str, str]: return ja3s_str, hashlib.md5(ja3s_str.encode()).hexdigest() +# ─── JA4 / JA4S computation ────────────────────────────────────────────────── + +def _ja4_version(ch: dict[str, Any]) -> str: + """ + Determine JA4 TLS version string (2 chars). + Uses supported_versions extension if present (TLS 1.3 advertises 0x0303 in + ClientHello.version but 0x0304 in supported_versions). + """ + versions = ch.get("supported_versions", []) + if versions: + best = max(versions) + else: + best = ch["tls_version"] + return { + 0x0304: "13", + 0x0303: "12", + 0x0302: "11", + 0x0301: "10", + 0x0300: "s3", + 0x0200: "s2", + }.get(best, "00") + + +def _ja4_alpn_tag(alpn_list: list[str] | str) -> str: + """ + JA4 ALPN tag: first and last character of the first ALPN protocol. + No ALPN → "00". + """ + if isinstance(alpn_list, str): + proto = alpn_list + elif alpn_list: + proto = alpn_list[0] + else: + return "00" + + if not proto: + return "00" + if len(proto) == 1: + return proto[0] + proto[0] + return proto[0] + proto[-1] + + +def _sha256_12(text: str) -> str: + """First 12 hex chars of SHA-256.""" + return hashlib.sha256(text.encode()).hexdigest()[:12] + + +def _ja4(ch: dict[str, Any]) -> str: + """ + Compute JA4 fingerprint from a parsed ClientHello. + + Format: a_b_c where + a = {t|q}{version:2}{d|i}{cipher_count:02d}{ext_count:02d}{alpn_tag:2} + b = sha256_12(sorted_cipher_suites, comma-separated) + c = sha256_12(sorted_extensions,sorted_signature_algorithms) + + Protocol is always 't' (TCP) since we capture on a TCP socket. + SNI present → 'd' (domain), absent → 'i' (IP). + """ + proto = "t" + ver = _ja4_version(ch) + sni_flag = "d" if ch.get("sni") else "i" + + # Counts — GREASE already filtered, but also exclude SNI (0x0000) and ALPN (0x0010) + # from extension count per JA4 spec? No — JA4 counts all non-GREASE extensions. + cs_count = min(len(ch["cipher_suites"]), 99) + ext_count = min(len(ch["extensions"]), 99) + alpn_tag = _ja4_alpn_tag(ch.get("alpn", [])) + + section_a = f"{proto}{ver}{sni_flag}{cs_count:02d}{ext_count:02d}{alpn_tag}" + + # Section b: sorted cipher suites as decimal, comma-separated + sorted_cs = sorted(ch["cipher_suites"]) + section_b = _sha256_12(",".join(str(c) for c in sorted_cs)) + + # Section c: sorted extensions + sorted signature algorithms + sorted_ext = sorted(ch["extensions"]) + sorted_sa = sorted(ch.get("signature_algorithms", [])) + ext_str = ",".join(str(e) for e in sorted_ext) + sa_str = ",".join(str(s) for s in sorted_sa) + combined = f"{ext_str}_{sa_str}" if sa_str else ext_str + section_c = _sha256_12(combined) + + return f"{section_a}_{section_b}_{section_c}" + + +def _ja4s(sh: dict[str, Any]) -> str: + """ + Compute JA4S fingerprint from a parsed ServerHello. + + Format: a_b where + a = {t|q}{version:2}{ext_count:02d}{alpn_tag:2} + b = sha256_12({cipher_suite},{sorted_extensions comma-separated}) + """ + proto = "t" + # Use selected_version from supported_versions ext if available + selected = sh.get("selected_version") + if selected: + ver = {0x0304: "13", 0x0303: "12", 0x0302: "11", 0x0301: "10", + 0x0300: "s3", 0x0200: "s2"}.get(selected, "00") + else: + ver = {0x0304: "13", 0x0303: "12", 0x0302: "11", 0x0301: "10", + 0x0300: "s3", 0x0200: "s2"}.get(sh["tls_version"], "00") + + ext_count = min(len(sh["extensions"]), 99) + alpn_tag = _ja4_alpn_tag(sh.get("alpn", "")) + + section_a = f"{proto}{ver}{ext_count:02d}{alpn_tag}" + + sorted_ext = sorted(sh["extensions"]) + inner = f"{sh['cipher_suite']},{','.join(str(e) for e in sorted_ext)}" + section_b = _sha256_12(inner) + + return f"{section_a}_{section_b}" + + +# ─── JA4L (latency) ────────────────────────────────────────────────────────── + +def _ja4l(key: tuple[str, int, str, int]) -> dict[str, Any] | None: + """ + Retrieve JA4L data for a connection. + + JA4L measures the TCP handshake RTT: time from SYN to SYN-ACK. + Returns {"rtt_ms": float, "client_ttl": int} or None. + """ + return _tcp_rtt.get(key) + + +# ─── Session resumption ────────────────────────────────────────────────────── + +def _session_resumption_info(ch: dict[str, Any]) -> dict[str, Any]: + """ + Analyze ClientHello for TLS session resumption behavior. + Returns a dict describing what resumption mechanisms the client uses. + """ + mechanisms: list[str] = [] + + if ch.get("has_session_ticket_data"): + mechanisms.append("session_ticket") + + if ch.get("has_pre_shared_key"): + mechanisms.append("psk") + + if ch.get("has_early_data"): + mechanisms.append("early_data_0rtt") + + if ch.get("session_id") and len(ch["session_id"]) > 0: + mechanisms.append("session_id") + + return { + "resumption_attempted": len(mechanisms) > 0, + "mechanisms": mechanisms, + } + + # ─── Session cleanup ───────────────────────────────────────────────────────── def _cleanup_sessions() -> None: @@ -287,6 +812,15 @@ def _cleanup_sessions() -> None: for k in stale: _sessions.pop(k, None) _session_ts.pop(k, None) + # Also clean up TCP RTT tracking + stale_syn = [k for k, v in _tcp_syn.items() + if now - v.get("time", 0) > _SESSION_TTL] + for k in stale_syn: + _tcp_syn.pop(k, None) + stale_rtt = [k for k, _ in _tcp_rtt.items() + if k not in _sessions and k not in _session_ts] + for k in stale_rtt: + _tcp_rtt.pop(k, None) # ─── Logging helpers ───────────────────────────────────────────────────────── @@ -305,14 +839,32 @@ def _on_packet(pkt: Any) -> None: ip = pkt[IP] tcp = pkt[TCP] - payload = bytes(tcp.payload) - if not payload: - return - src_ip: str = ip.src dst_ip: str = ip.dst src_port: int = tcp.sport dst_port: int = tcp.dport + flags: int = tcp.flags.value if hasattr(tcp.flags, 'value') else int(tcp.flags) + + # ── TCP SYN tracking for JA4L ── + if flags & _TCP_SYN and not (flags & _TCP_ACK): + # Pure SYN — record timestamp and TTL + key = (src_ip, src_port, dst_ip, dst_port) + _tcp_syn[key] = {"time": time.monotonic(), "ttl": ip.ttl} + + elif flags & _TCP_SYN and flags & _TCP_ACK: + # SYN-ACK — calculate RTT for the original SYN sender + rev_key = (dst_ip, dst_port, src_ip, src_port) + syn_data = _tcp_syn.pop(rev_key, None) + if syn_data: + rtt_ms = round((time.monotonic() - syn_data["time"]) * 1000, 2) + _tcp_rtt[rev_key] = { + "rtt_ms": rtt_ms, + "client_ttl": syn_data["ttl"], + } + + payload = bytes(tcp.payload) + if not payload: + return # TLS record check if payload[0] != _TLS_RECORD_HANDSHAKE: @@ -325,31 +877,47 @@ def _on_packet(pkt: Any) -> None: key = (src_ip, src_port, dst_ip, dst_port) ja3_str, ja3_hash = _ja3(ch) + ja4_hash = _ja4(ch) + resumption = _session_resumption_info(ch) + rtt_data = _ja4l(key) _sessions[key] = { "ja3": ja3_hash, "ja3_str": ja3_str, + "ja4": ja4_hash, "tls_version": ch["tls_version"], "cipher_suites": ch["cipher_suites"], "extensions": ch["extensions"], + "signature_algorithms": ch.get("signature_algorithms", []), + "supported_versions": ch.get("supported_versions", []), "sni": ch["sni"], "alpn": ch["alpn"], + "resumption": resumption, } _session_ts[key] = time.monotonic() - _log( - "tls_client_hello", - src_ip=src_ip, - src_port=str(src_port), - dst_ip=dst_ip, - dst_port=str(dst_port), - ja3=ja3_hash, - tls_version=_tls_version_str(ch["tls_version"]), - sni=ch["sni"] or "", - alpn=",".join(ch["alpn"]), - raw_ciphers="-".join(str(c) for c in ch["cipher_suites"]), - raw_extensions="-".join(str(e) for e in ch["extensions"]), - ) + log_fields: dict[str, Any] = { + "src_ip": src_ip, + "src_port": str(src_port), + "dst_ip": dst_ip, + "dst_port": str(dst_port), + "ja3": ja3_hash, + "ja4": ja4_hash, + "tls_version": _tls_version_str(ch["tls_version"]), + "sni": ch["sni"] or "", + "alpn": ",".join(ch["alpn"]), + "raw_ciphers": "-".join(str(c) for c in ch["cipher_suites"]), + "raw_extensions": "-".join(str(e) for e in ch["extensions"]), + } + + if resumption["resumption_attempted"]: + log_fields["resumption"] = ",".join(resumption["mechanisms"]) + + if rtt_data: + log_fields["ja4l_rtt_ms"] = str(rtt_data["rtt_ms"]) + log_fields["ja4l_client_ttl"] = str(rtt_data["client_ttl"]) + + _log("tls_client_hello", **log_fields) return # Attempt ServerHello parse @@ -361,6 +929,7 @@ def _on_packet(pkt: Any) -> None: _session_ts.pop(rev_key, None) ja3s_str, ja3s_hash = _ja3s(sh) + ja4s_hash = _ja4s(sh) fields: dict[str, Any] = { "src_ip": dst_ip, # original attacker is now the destination @@ -368,17 +937,52 @@ def _on_packet(pkt: Any) -> None: "dst_ip": src_ip, "dst_port": str(src_port), "ja3s": ja3s_hash, + "ja4s": ja4s_hash, "tls_version": _tls_version_str(sh["tls_version"]), } if ch_data: fields["ja3"] = ch_data["ja3"] + fields["ja4"] = ch_data.get("ja4", "") fields["sni"] = ch_data["sni"] or "" fields["alpn"] = ",".join(ch_data["alpn"]) fields["raw_ciphers"] = "-".join(str(c) for c in ch_data["cipher_suites"]) fields["raw_extensions"] = "-".join(str(e) for e in ch_data["extensions"]) + if ch_data.get("resumption", {}).get("resumption_attempted"): + fields["resumption"] = ",".join(ch_data["resumption"]["mechanisms"]) + + rtt_data = _tcp_rtt.pop(rev_key, None) + if rtt_data: + fields["ja4l_rtt_ms"] = str(rtt_data["rtt_ms"]) + fields["ja4l_client_ttl"] = str(rtt_data["client_ttl"]) _log("tls_session", severity=SEVERITY_WARNING, **fields) + return + + # Attempt Certificate parse (TLS 1.2 only — 1.3 encrypts it) + cert = _parse_certificate(payload) + if cert is not None: + # Match to a session — the cert comes from the server side + rev_key = (dst_ip, dst_port, src_ip, src_port) + ch_data = _sessions.get(rev_key) + + cert_fields: dict[str, Any] = { + "src_ip": dst_ip, + "src_port": str(dst_port), + "dst_ip": src_ip, + "dst_port": str(src_port), + "subject_cn": cert["subject_cn"], + "issuer": cert["issuer"], + "self_signed": str(cert["self_signed"]).lower(), + "not_before": cert["not_before"], + "not_after": cert["not_after"], + } + if cert["sans"]: + cert_fields["sans"] = ",".join(cert["sans"]) + if ch_data: + cert_fields["sni"] = ch_data.get("sni", "") + + _log("tls_certificate", **cert_fields) # ─── Entry point ───────────────────────────────────────────────────────────── diff --git a/tests/test_fingerprinting.py b/tests/test_fingerprinting.py index 544efe6..4b90ad2 100644 --- a/tests/test_fingerprinting.py +++ b/tests/test_fingerprinting.py @@ -206,3 +206,199 @@ async def test_fields_missing_entirely_no_crash(): } await _extract_bounty(repo, log_data) repo.add_bounty.assert_not_awaited() + + +# --------------------------------------------------------------------------- +# JA4/JA4S extraction (sniffer) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_ja4_included_in_ja3_bounty(): + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "sniffer", + "attacker_ip": "10.0.0.20", + "event_type": "tls_session", + "fields": { + "ja3": "abc123", + "ja3s": "def456", + "ja4": "t13d0203h2_aabbccddee00_112233445566", + "ja4s": "t1302h2_ffeeddccbbaa", + "tls_version": "TLS 1.3", + "dst_port": "443", + }, + } + await _extract_bounty(repo, log_data) + calls = repo.add_bounty.call_args_list + ja3_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "ja3"] + assert len(ja3_calls) == 1 + payload = ja3_calls[0][0][0]["payload"] + assert payload["ja4"] == "t13d0203h2_aabbccddee00_112233445566" + assert payload["ja4s"] == "t1302h2_ffeeddccbbaa" + + +# --------------------------------------------------------------------------- +# JA4L latency extraction +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_ja4l_bounty_extracted(): + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "sniffer", + "attacker_ip": "10.0.0.21", + "event_type": "tls_session", + "fields": { + "ja4l_rtt_ms": "12.5", + "ja4l_client_ttl": "64", + }, + } + await _extract_bounty(repo, log_data) + calls = repo.add_bounty.call_args_list + ja4l_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "ja4l"] + assert len(ja4l_calls) == 1 + payload = ja4l_calls[0][0][0]["payload"] + assert payload["rtt_ms"] == "12.5" + assert payload["client_ttl"] == "64" + + +@pytest.mark.asyncio +async def test_ja4l_not_extracted_without_rtt(): + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "sniffer", + "attacker_ip": "10.0.0.22", + "event_type": "tls_session", + "fields": { + "ja4l_client_ttl": "64", + }, + } + await _extract_bounty(repo, log_data) + calls = repo.add_bounty.call_args_list + ja4l_calls = [c for c in calls if c[0][0].get("payload", {}).get("fingerprint_type") == "ja4l"] + assert len(ja4l_calls) == 0 + + +# --------------------------------------------------------------------------- +# TLS session resumption extraction +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_tls_resumption_bounty_extracted(): + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "sniffer", + "attacker_ip": "10.0.0.23", + "event_type": "tls_client_hello", + "fields": { + "resumption": "session_ticket,psk", + }, + } + await _extract_bounty(repo, log_data) + calls = repo.add_bounty.call_args_list + res_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_resumption"] + assert len(res_calls) == 1 + assert res_calls[0][0][0]["payload"]["mechanisms"] == "session_ticket,psk" + + +@pytest.mark.asyncio +async def test_no_resumption_no_bounty(): + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "sniffer", + "attacker_ip": "10.0.0.24", + "event_type": "tls_client_hello", + "fields": { + "ja3": "abc123", + }, + } + await _extract_bounty(repo, log_data) + calls = repo.add_bounty.call_args_list + res_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_resumption"] + assert len(res_calls) == 0 + + +# --------------------------------------------------------------------------- +# TLS certificate extraction +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_tls_certificate_bounty_extracted(): + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "sniffer", + "attacker_ip": "10.0.0.25", + "event_type": "tls_certificate", + "fields": { + "subject_cn": "evil.c2.local", + "issuer": "CN=Evil CA", + "self_signed": "true", + "not_before": "230101000000Z", + "not_after": "260101000000Z", + "sans": "evil.c2.local,*.evil.c2.local", + "sni": "evil.c2.local", + }, + } + await _extract_bounty(repo, log_data) + calls = repo.add_bounty.call_args_list + cert_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_certificate"] + assert len(cert_calls) == 1 + payload = cert_calls[0][0][0]["payload"] + assert payload["subject_cn"] == "evil.c2.local" + assert payload["self_signed"] == "true" + assert payload["issuer"] == "CN=Evil CA" + + +@pytest.mark.asyncio +async def test_tls_certificate_not_extracted_from_non_sniffer(): + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "http", + "attacker_ip": "10.0.0.26", + "event_type": "tls_certificate", + "fields": { + "subject_cn": "not-from-sniffer.local", + }, + } + await _extract_bounty(repo, log_data) + calls = repo.add_bounty.call_args_list + cert_calls = [c for c in calls if c[0][0].get("payload", {}).get("fingerprint_type") == "tls_certificate"] + assert len(cert_calls) == 0 + + +# --------------------------------------------------------------------------- +# Multiple fingerprints from single sniffer log +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_sniffer_log_yields_multiple_fingerprint_types(): + """A complete TLS session log with JA3 + JA4L + resumption yields 3 bounties.""" + repo = _make_repo() + log_data = { + "decky": "decky-05", + "service": "sniffer", + "attacker_ip": "10.0.0.30", + "event_type": "tls_session", + "fields": { + "ja3": "abc123", + "ja3s": "def456", + "ja4": "t13d0203h2_aabb_ccdd", + "ja4s": "t1302h2_eeff", + "ja4l_rtt_ms": "5.2", + "ja4l_client_ttl": "128", + "resumption": "session_ticket", + "tls_version": "TLS 1.3", + "dst_port": "443", + }, + } + await _extract_bounty(repo, log_data) + assert repo.add_bounty.await_count == 3 + types = {c[0][0]["payload"]["fingerprint_type"] for c in repo.add_bounty.call_args_list} + assert types == {"ja3", "ja4l", "tls_resumption"} diff --git a/tests/test_sniffer_ja3.py b/tests/test_sniffer_ja3.py index b0e053b..e854544 100644 --- a/tests/test_sniffer_ja3.py +++ b/tests/test_sniffer_ja3.py @@ -43,11 +43,20 @@ _srv = _load_sniffer() _parse_client_hello = _srv._parse_client_hello _parse_server_hello = _srv._parse_server_hello +_parse_certificate = _srv._parse_certificate _ja3 = _srv._ja3 _ja3s = _srv._ja3s +_ja4 = _srv._ja4 +_ja4s = _srv._ja4s +_ja4_version = _srv._ja4_version +_ja4_alpn_tag = _srv._ja4_alpn_tag +_sha256_12 = _srv._sha256_12 +_session_resumption_info = _srv._session_resumption_info _is_grease = _srv._is_grease _filter_grease = _srv._filter_grease _tls_version_str = _srv._tls_version_str +_parse_x509_der = _srv._parse_x509_der +_der_read_oid = _srv._der_read_oid # ─── TLS byte builder helpers ───────────────────────────────────────────────── @@ -435,3 +444,744 @@ class TestRoundTrip: _, ja3_hash_clean = _ja3(ch_clean) assert ja3_hash == ja3_hash_clean + + +# ─── Extension builder helpers for new tests ───────────────────────────────── + +def _build_signature_algorithms_extension(sig_algs: list[int]) -> bytes: + sa_bytes = b"".join(struct.pack("!H", s) for s in sig_algs) + data = struct.pack("!H", len(sa_bytes)) + sa_bytes + return _build_extension(0x000D, data) + + +def _build_supported_versions_extension(versions: list[int]) -> bytes: + v_bytes = b"".join(struct.pack("!H", v) for v in versions) + data = bytes([len(v_bytes)]) + v_bytes + return _build_extension(0x002B, data) + + +def _build_session_ticket_extension(ticket_data: bytes = b"") -> bytes: + return _build_extension(0x0023, ticket_data) + + +def _build_psk_extension() -> bytes: + return _build_extension(0x0029, b"\x00\x01\x00") + + +def _build_early_data_extension() -> bytes: + return _build_extension(0x002A, b"") + + +def _build_server_hello_with_exts( + version: int = 0x0303, + cipher_suite: int = 0x002F, + extensions_bytes: bytes = b"", + selected_version: int | None = None, + alpn: str | None = None, +) -> bytes: + """Build a ServerHello with optional supported_versions and ALPN extensions.""" + ext_parts = b"" + if selected_version is not None: + ext_parts += _build_extension(0x002B, struct.pack("!H", selected_version)) + if alpn is not None: + proto = alpn.encode() + proto_data = bytes([len(proto)]) + proto + alpn_data = struct.pack("!H", len(proto_data)) + proto_data + ext_parts += _build_extension(0x0010, alpn_data) + if extensions_bytes: + ext_parts += extensions_bytes + return _build_server_hello(version=version, cipher_suite=cipher_suite, extensions_bytes=ext_parts) + + +# ─── ClientHello extended field tests ──────────────────────────────────────── + +class TestClientHelloExtendedFields: + def test_signature_algorithms_extracted(self): + ext = _build_signature_algorithms_extension([0x0401, 0x0501, 0x0601]) + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert result["signature_algorithms"] == [0x0401, 0x0501, 0x0601] + + def test_supported_versions_extracted(self): + ext = _build_supported_versions_extension([0x0304, 0x0303]) + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert result["supported_versions"] == [0x0304, 0x0303] + + def test_grease_filtered_from_supported_versions(self): + ext = _build_supported_versions_extension([0x0A0A, 0x0304, 0x0303]) + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert 0x0A0A not in result["supported_versions"] + assert 0x0304 in result["supported_versions"] + + def test_session_ticket_empty_no_resumption(self): + ext = _build_session_ticket_extension(b"") + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert result["has_session_ticket_data"] is False + + def test_session_ticket_with_data_resumption(self): + ext = _build_session_ticket_extension(b"\x01\x02\x03\x04") + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert result["has_session_ticket_data"] is True + + def test_psk_extension_detected(self): + ext = _build_psk_extension() + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert result["has_pre_shared_key"] is True + + def test_early_data_extension_detected(self): + ext = _build_early_data_extension() + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert result["has_early_data"] is True + + def test_no_resumption_by_default(self): + data = _build_client_hello() + result = _parse_client_hello(data) + assert result is not None + assert result["has_session_ticket_data"] is False + assert result["has_pre_shared_key"] is False + assert result["has_early_data"] is False + + def test_combined_extensions_all_parsed(self): + """All new extensions should be parsed alongside existing ones.""" + ext = ( + _build_sni_extension("evil.c2.io") + + _build_supported_groups_extension([0x001D]) + + _build_signature_algorithms_extension([0x0401]) + + _build_supported_versions_extension([0x0304, 0x0303]) + + _build_alpn_extension(["h2"]) + ) + data = _build_client_hello(extensions_bytes=ext) + result = _parse_client_hello(data) + assert result is not None + assert result["sni"] == "evil.c2.io" + assert result["supported_groups"] == [0x001D] + assert result["signature_algorithms"] == [0x0401] + assert result["supported_versions"] == [0x0304, 0x0303] + assert result["alpn"] == ["h2"] + + +# ─── ServerHello extended field tests ──────────────────────────────────────── + +class TestServerHelloExtendedFields: + def test_selected_version_extracted(self): + data = _build_server_hello_with_exts(selected_version=0x0304) + result = _parse_server_hello(data) + assert result is not None + assert result["selected_version"] == 0x0304 + + def test_no_selected_version_returns_none(self): + data = _build_server_hello() + result = _parse_server_hello(data) + assert result is not None + assert result["selected_version"] is None + + def test_alpn_extracted_from_server_hello(self): + data = _build_server_hello_with_exts(alpn="h2") + result = _parse_server_hello(data) + assert result is not None + assert result["alpn"] == "h2" + + +# ─── JA4 tests ─────────────────────────────────────────────────────────────── + +class TestJA4: + def test_ja4_format_three_sections(self): + """JA4 must have format: section_a_section_b_section_c""" + ch = { + "tls_version": 0x0303, + "cipher_suites": [0x002F, 0x0035], + "extensions": [0x000A, 0x000D], + "supported_groups": [0x001D], + "ec_point_formats": [0x00], + "signature_algorithms": [0x0401], + "supported_versions": [], + "sni": "test.com", + "alpn": ["h2"], + } + result = _ja4(ch) + parts = result.split("_") + assert len(parts) == 3 + + def test_ja4_section_a_format(self): + ch = { + "tls_version": 0x0303, + "cipher_suites": [0x002F, 0x0035], + "extensions": [0x000A, 0x000D, 0x0010], + "supported_groups": [], + "ec_point_formats": [], + "signature_algorithms": [0x0401], + "supported_versions": [0x0304, 0x0303], + "sni": "target.local", + "alpn": ["h2", "http/1.1"], + } + result = _ja4(ch) + section_a = result.split("_")[0] + # t = TCP, 13 = TLS 1.3 (from supported_versions), d = has SNI + # 02 = 2 ciphers, 03 = 3 extensions, h2 = ALPN first proto + assert section_a == "t13d0203h2" + + def test_ja4_no_sni_uses_i(self): + ch = { + "tls_version": 0x0303, + "cipher_suites": [0x002F], + "extensions": [], + "supported_groups": [], + "ec_point_formats": [], + "signature_algorithms": [], + "supported_versions": [], + "sni": "", + "alpn": [], + } + result = _ja4(ch) + section_a = result.split("_")[0] + assert section_a[3] == "i" # no SNI → 'i' + + def test_ja4_no_alpn_uses_00(self): + ch = { + "tls_version": 0x0303, + "cipher_suites": [0x002F], + "extensions": [], + "supported_groups": [], + "ec_point_formats": [], + "signature_algorithms": [], + "supported_versions": [], + "sni": "", + "alpn": [], + } + result = _ja4(ch) + section_a = result.split("_")[0] + assert section_a.endswith("00") + + def test_ja4_section_b_is_sha256_12(self): + ch = { + "tls_version": 0x0303, + "cipher_suites": [0x0035, 0x002F], # unsorted + "extensions": [], + "supported_groups": [], + "ec_point_formats": [], + "signature_algorithms": [], + "supported_versions": [], + "sni": "", + "alpn": [], + } + result = _ja4(ch) + section_b = result.split("_")[1] + assert len(section_b) == 12 + # Should be SHA256 of sorted ciphers: "47,53" + expected = hashlib.sha256(b"47,53").hexdigest()[:12] + assert section_b == expected + + def test_ja4_section_c_includes_signature_algorithms(self): + ch = { + "tls_version": 0x0303, + "cipher_suites": [0x002F], + "extensions": [0x000D], # sig_algs extension type + "supported_groups": [], + "ec_point_formats": [], + "signature_algorithms": [0x0601, 0x0401], + "supported_versions": [], + "sni": "", + "alpn": [], + } + result = _ja4(ch) + section_c = result.split("_")[2] + assert len(section_c) == 12 + # combined = "13_1025,1537" (sorted ext=13, sorted sig_algs=0x0401=1025, 0x0601=1537) + expected = hashlib.sha256(b"13_1025,1537").hexdigest()[:12] + assert section_c == expected + + def test_ja4_same_ciphers_different_order_same_hash(self): + base = { + "tls_version": 0x0303, + "extensions": [], + "supported_groups": [], + "ec_point_formats": [], + "signature_algorithms": [], + "supported_versions": [], + "sni": "", + "alpn": [], + } + ch1 = {**base, "cipher_suites": [0x002F, 0x0035]} + ch2 = {**base, "cipher_suites": [0x0035, 0x002F]} + assert _ja4(ch1) == _ja4(ch2) + + def test_ja4_different_ciphers_different_hash(self): + base = { + "tls_version": 0x0303, + "extensions": [], + "supported_groups": [], + "ec_point_formats": [], + "signature_algorithms": [], + "supported_versions": [], + "sni": "", + "alpn": [], + } + ch1 = {**base, "cipher_suites": [0x002F]} + ch2 = {**base, "cipher_suites": [0x0035]} + assert _ja4(ch1) != _ja4(ch2) + + def test_ja4_roundtrip_from_bytes(self): + """Build a ClientHello from bytes and compute JA4.""" + ext = ( + _build_sni_extension("c2.attacker.net") + + _build_signature_algorithms_extension([0x0401, 0x0501]) + + _build_supported_versions_extension([0x0304, 0x0303]) + + _build_alpn_extension(["h2"]) + ) + data = _build_client_hello( + cipher_suites=[0x1301, 0x1302, 0x002F], + extensions_bytes=ext, + ) + ch = _parse_client_hello(data) + assert ch is not None + result = _ja4(ch) + parts = result.split("_") + assert len(parts) == 3 + section_a = parts[0] + assert section_a.startswith("t13") # TLS 1.3 via supported_versions + assert "d" in section_a # has SNI + assert section_a.endswith("h2") # ALPN = h2 + + +# ─── JA4S tests ────────────────────────────────────────────────────────────── + +class TestJA4S: + def test_ja4s_format_two_sections(self): + sh = { + "tls_version": 0x0303, + "cipher_suite": 0x002F, + "extensions": [0xFF01], + "selected_version": None, + "alpn": "", + } + result = _ja4s(sh) + parts = result.split("_") + assert len(parts) == 2 + + def test_ja4s_section_a_format(self): + sh = { + "tls_version": 0x0303, + "cipher_suite": 0x1301, + "extensions": [0xFF01, 0x002B], + "selected_version": 0x0304, + "alpn": "h2", + } + result = _ja4s(sh) + section_a = result.split("_")[0] + # t = TCP, 13 = TLS 1.3 (selected_version), 02 = 2 extensions, h2 = ALPN + assert section_a == "t1302h2" + + def test_ja4s_uses_selected_version_when_available(self): + sh = { + "tls_version": 0x0303, + "cipher_suite": 0x1301, + "extensions": [], + "selected_version": 0x0304, + "alpn": "", + } + result = _ja4s(sh) + section_a = result.split("_")[0] + assert "13" in section_a # TLS 1.3 + + def test_ja4s_falls_back_to_tls_version(self): + sh = { + "tls_version": 0x0303, + "cipher_suite": 0x002F, + "extensions": [], + "selected_version": None, + "alpn": "", + } + result = _ja4s(sh) + section_a = result.split("_")[0] + assert section_a.startswith("t12") # TLS 1.2 + + def test_ja4s_section_b_is_sha256_12(self): + sh = { + "tls_version": 0x0303, + "cipher_suite": 0x002F, # 47 + "extensions": [0xFF01], # 65281 + "selected_version": None, + "alpn": "", + } + result = _ja4s(sh) + section_b = result.split("_")[1] + assert len(section_b) == 12 + expected = hashlib.sha256(b"47,65281").hexdigest()[:12] + assert section_b == expected + + def test_ja4s_roundtrip_from_bytes(self): + data = _build_server_hello_with_exts( + cipher_suite=0x1301, + selected_version=0x0304, + alpn="h2", + ) + sh = _parse_server_hello(data) + assert sh is not None + result = _ja4s(sh) + parts = result.split("_") + assert len(parts) == 2 + assert parts[0].startswith("t13") + + +# ─── JA4 version detection tests ───────────────────────────────────────────── + +class TestJA4Version: + def test_tls13_from_supported_versions(self): + ch = {"supported_versions": [0x0304, 0x0303], "tls_version": 0x0303} + assert _ja4_version(ch) == "13" + + def test_tls12_no_supported_versions(self): + ch = {"supported_versions": [], "tls_version": 0x0303} + assert _ja4_version(ch) == "12" + + def test_tls10(self): + ch = {"supported_versions": [], "tls_version": 0x0301} + assert _ja4_version(ch) == "10" + + def test_ssl30(self): + ch = {"supported_versions": [], "tls_version": 0x0300} + assert _ja4_version(ch) == "s3" + + def test_unknown_version(self): + ch = {"supported_versions": [], "tls_version": 0xFFFF} + assert _ja4_version(ch) == "00" + + +# ─── JA4 ALPN tag tests ────────────────────────────────────────────────────── + +class TestJA4AlpnTag: + def test_h2(self): + assert _ja4_alpn_tag(["h2"]) == "h2" + + def test_http11(self): + assert _ja4_alpn_tag(["http/1.1"]) == "h1" + + def test_no_alpn(self): + assert _ja4_alpn_tag([]) == "00" + + def test_single_char_protocol(self): + assert _ja4_alpn_tag(["x"]) == "xx" + + def test_string_input(self): + assert _ja4_alpn_tag("h2") == "h2" + + def test_empty_string(self): + assert _ja4_alpn_tag("") == "00" + + +# ─── SHA256-12 tests ───────────────────────────────────────────────────────── + +class TestSha256_12: + def test_returns_12_hex_chars(self): + result = _sha256_12("test") + assert len(result) == 12 + assert all(c in "0123456789abcdef" for c in result) + + def test_deterministic(self): + assert _sha256_12("hello") == _sha256_12("hello") + + def test_different_input_different_output(self): + assert _sha256_12("a") != _sha256_12("b") + + def test_matches_hashlib(self): + expected = hashlib.sha256(b"test_input").hexdigest()[:12] + assert _sha256_12("test_input") == expected + + +# ─── Session resumption tests ──────────────────────────────────────────────── + +class TestSessionResumption: + def test_no_resumption_by_default(self): + ch = { + "has_session_ticket_data": False, + "has_pre_shared_key": False, + "has_early_data": False, + "session_id": b"", + } + info = _session_resumption_info(ch) + assert info["resumption_attempted"] is False + assert info["mechanisms"] == [] + + def test_session_ticket_resumption(self): + ch = { + "has_session_ticket_data": True, + "has_pre_shared_key": False, + "has_early_data": False, + "session_id": b"", + } + info = _session_resumption_info(ch) + assert info["resumption_attempted"] is True + assert "session_ticket" in info["mechanisms"] + + def test_psk_resumption(self): + ch = { + "has_session_ticket_data": False, + "has_pre_shared_key": True, + "has_early_data": False, + "session_id": b"", + } + info = _session_resumption_info(ch) + assert info["resumption_attempted"] is True + assert "psk" in info["mechanisms"] + + def test_early_data_0rtt(self): + ch = { + "has_session_ticket_data": False, + "has_pre_shared_key": False, + "has_early_data": True, + "session_id": b"", + } + info = _session_resumption_info(ch) + assert info["resumption_attempted"] is True + assert "early_data_0rtt" in info["mechanisms"] + + def test_session_id_resumption(self): + ch = { + "has_session_ticket_data": False, + "has_pre_shared_key": False, + "has_early_data": False, + "session_id": b"\x01\x02\x03", + } + info = _session_resumption_info(ch) + assert info["resumption_attempted"] is True + assert "session_id" in info["mechanisms"] + + def test_multiple_mechanisms(self): + ch = { + "has_session_ticket_data": True, + "has_pre_shared_key": True, + "has_early_data": True, + "session_id": b"\x01", + } + info = _session_resumption_info(ch) + assert info["resumption_attempted"] is True + assert len(info["mechanisms"]) == 4 + + def test_resumption_from_parsed_client_hello(self): + ext = _build_session_ticket_extension(b"\xDE\xAD\xBE\xEF") + data = _build_client_hello(extensions_bytes=ext) + ch = _parse_client_hello(data) + assert ch is not None + info = _session_resumption_info(ch) + assert info["resumption_attempted"] is True + assert "session_ticket" in info["mechanisms"] + + +# ─── Certificate parsing tests ─────────────────────────────────────────────── + +def _build_der_length(length: int) -> bytes: + """Encode a DER length.""" + if length < 0x80: + return bytes([length]) + elif length < 0x100: + return bytes([0x81, length]) + else: + return bytes([0x82]) + struct.pack("!H", length) + + +def _build_der_sequence(content: bytes) -> bytes: + return b"\x30" + _build_der_length(len(content)) + content + + +def _build_der_set(content: bytes) -> bytes: + return b"\x31" + _build_der_length(len(content)) + content + + +def _build_der_oid_bytes(oid_str: str) -> bytes: + """Encode a dotted OID string to DER OID bytes.""" + parts = [int(x) for x in oid_str.split(".")] + first_byte = parts[0] * 40 + parts[1] + encoded = bytes([first_byte]) + for val in parts[2:]: + if val < 0x80: + encoded += bytes([val]) + else: + octets = [] + while val > 0: + octets.append(val & 0x7F) + val >>= 7 + octets.reverse() + for i in range(len(octets) - 1): + octets[i] |= 0x80 + encoded += bytes(octets) + return b"\x06" + _build_der_length(len(encoded)) + encoded + + +def _build_der_utf8string(text: str) -> bytes: + encoded = text.encode("utf-8") + return b"\x0C" + _build_der_length(len(encoded)) + encoded + + +def _build_der_utctime(time_str: str) -> bytes: + encoded = time_str.encode("ascii") + return b"\x17" + _build_der_length(len(encoded)) + encoded + + +def _build_rdn(oid: str, value: str) -> bytes: + """Build a single RDN SET { SEQUENCE { OID, UTF8String } }.""" + attr = _build_der_sequence(_build_der_oid_bytes(oid) + _build_der_utf8string(value)) + return _build_der_set(attr) + + +def _build_x509_name(cn: str, o: str = "", c: str = "") -> bytes: + """Build an X.501 Name with optional CN, O, C.""" + rdns = b"" + if c: + rdns += _build_rdn("2.5.4.6", c) + if o: + rdns += _build_rdn("2.5.4.10", o) + if cn: + rdns += _build_rdn("2.5.4.3", cn) + return _build_der_sequence(rdns) + + +def _build_minimal_tbs_certificate( + subject_cn: str = "evil.c2.local", + issuer_cn: str = "Evil CA", + not_before: str = "230101000000Z", + not_after: str = "260101000000Z", + self_signed: bool = False, +) -> bytes: + """Build a minimal tbsCertificate DER structure.""" + if self_signed: + issuer_cn = subject_cn + + # version [0] EXPLICIT INTEGER 2 (v3) + version = b"\xa0\x03\x02\x01\x02" + # serialNumber INTEGER + serial = b"\x02\x01\x01" + # signature algorithm (sha256WithRSAEncryption = 1.2.840.113549.1.1.11) + sig_alg = _build_der_sequence(_build_der_oid_bytes("1.2.840.113549.1.1.11") + b"\x05\x00") + # issuer + issuer = _build_x509_name(issuer_cn) + # validity + validity = _build_der_sequence( + _build_der_utctime(not_before) + _build_der_utctime(not_after) + ) + # subject + subject = _build_x509_name(subject_cn) + # subjectPublicKeyInfo (minimal RSA placeholder) + spki = _build_der_sequence( + _build_der_sequence(_build_der_oid_bytes("1.2.840.113549.1.1.1") + b"\x05\x00") + + b"\x03\x03\x00\x00\x01" + ) + + tbs = version + serial + sig_alg + issuer + validity + subject + spki + return _build_der_sequence(tbs) + + +def _build_certificate_der( + subject_cn: str = "evil.c2.local", + issuer_cn: str = "Evil CA", + self_signed: bool = False, + not_before: str = "230101000000Z", + not_after: str = "260101000000Z", +) -> bytes: + """Build a complete X.509 DER certificate (minimal).""" + tbs = _build_minimal_tbs_certificate( + subject_cn=subject_cn, issuer_cn=issuer_cn, + self_signed=self_signed, not_before=not_before, not_after=not_after, + ) + # signatureAlgorithm + sig_alg = _build_der_sequence(_build_der_oid_bytes("1.2.840.113549.1.1.11") + b"\x05\x00") + # signatureValue (BIT STRING, minimal placeholder) + sig_val = b"\x03\x03\x00\x00\x01" + return _build_der_sequence(tbs + sig_alg + sig_val) + + +def _build_tls_certificate_message(cert_der: bytes) -> bytes: + """Wrap a DER certificate in a TLS Certificate handshake message.""" + # Certificate entry: 3-byte length + cert + cert_entry = struct.pack("!I", len(cert_der))[1:] + cert_der + # Certificates list: 3-byte total length + entries + certs_list = struct.pack("!I", len(cert_entry))[1:] + cert_entry + # Handshake header: type(1=0x0B) + 3-byte length + hs = bytes([0x0B]) + struct.pack("!I", len(certs_list))[1:] + certs_list + # TLS record header + return b"\x16\x03\x03" + struct.pack("!H", len(hs)) + hs + + +class TestCertificateParsing: + def test_basic_certificate_parsed(self): + cert_der = _build_certificate_der(subject_cn="pwned.local", issuer_cn="Fake CA") + tls_msg = _build_tls_certificate_message(cert_der) + result = _parse_certificate(tls_msg) + assert result is not None + assert result["subject_cn"] == "pwned.local" + assert "Fake CA" in result["issuer_cn"] + + def test_self_signed_detected(self): + cert_der = _build_certificate_der(subject_cn="selfsigned.evil", self_signed=True) + tls_msg = _build_tls_certificate_message(cert_der) + result = _parse_certificate(tls_msg) + assert result is not None + assert result["self_signed"] is True + assert result["subject_cn"] == "selfsigned.evil" + + def test_not_self_signed(self): + cert_der = _build_certificate_der(subject_cn="legit.com", issuer_cn="DigiCert") + tls_msg = _build_tls_certificate_message(cert_der) + result = _parse_certificate(tls_msg) + assert result is not None + assert result["self_signed"] is False + + def test_validity_period_extracted(self): + cert_der = _build_certificate_der( + not_before="240601120000Z", not_after="250601120000Z" + ) + tls_msg = _build_tls_certificate_message(cert_der) + result = _parse_certificate(tls_msg) + assert result is not None + assert "240601" in result["not_before"] + assert "250601" in result["not_after"] + + def test_non_certificate_message_returns_none(self): + # Build a ClientHello instead + data = _build_client_hello() + assert _parse_certificate(data) is None + + def test_empty_cert_list_returns_none(self): + # Handshake with 0-length certificate list + hs = bytes([0x0B, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00]) + tls = b"\x16\x03\x03" + struct.pack("!H", len(hs)) + hs + assert _parse_certificate(tls) is None + + def test_too_short_returns_none(self): + assert _parse_certificate(b"") is None + assert _parse_certificate(b"\x16\x03\x03") is None + + def test_x509_der_direct(self): + cert_der = _build_certificate_der(subject_cn="direct.test") + result = _parse_x509_der(cert_der) + assert result is not None + assert result["subject_cn"] == "direct.test" + + +# ─── DER OID tests ─────────────────────────────────────────────────────────── + +class TestDerOid: + def test_cn_oid(self): + raw = _build_der_oid_bytes("2.5.4.3") + # Skip tag+length + _, start, length = _srv._der_read_tag_len(raw, 0) + oid = _der_read_oid(raw, start, length) + assert oid == "2.5.4.3" + + def test_sha256_rsa_oid(self): + raw = _build_der_oid_bytes("1.2.840.113549.1.1.11") + _, start, length = _srv._der_read_tag_len(raw, 0) + oid = _der_read_oid(raw, start, length) + assert oid == "1.2.840.113549.1.1.11"