feat: JA4/JA4S/JA4L fingerprints, TLS session resumption, certificate extraction
Extend the passive TLS sniffer with next-gen attacker fingerprinting: - JA4 (ClientHello) and JA4S (ServerHello) computation with supported_versions, signature_algorithms, and ALPN parsing - JA4L latency measurement via TCP SYN→SYN-ACK RTT tracking - TLS session resumption detection (session tickets, PSK, 0-RTT early data) - Certificate extraction for TLS ≤1.2 with minimal DER/ASN.1 parser (subject CN, issuer, SANs, validity period, self-signed flag) - Ingester bounty extraction for all new fingerprint types - 116 tests covering all new functionality (1255 total passing)
This commit is contained in:
@@ -206,3 +206,199 @@ async def test_fields_missing_entirely_no_crash():
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
repo.add_bounty.assert_not_awaited()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# JA4/JA4S extraction (sniffer)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ja4_included_in_ja3_bounty():
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "sniffer",
|
||||
"attacker_ip": "10.0.0.20",
|
||||
"event_type": "tls_session",
|
||||
"fields": {
|
||||
"ja3": "abc123",
|
||||
"ja3s": "def456",
|
||||
"ja4": "t13d0203h2_aabbccddee00_112233445566",
|
||||
"ja4s": "t1302h2_ffeeddccbbaa",
|
||||
"tls_version": "TLS 1.3",
|
||||
"dst_port": "443",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
calls = repo.add_bounty.call_args_list
|
||||
ja3_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "ja3"]
|
||||
assert len(ja3_calls) == 1
|
||||
payload = ja3_calls[0][0][0]["payload"]
|
||||
assert payload["ja4"] == "t13d0203h2_aabbccddee00_112233445566"
|
||||
assert payload["ja4s"] == "t1302h2_ffeeddccbbaa"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# JA4L latency extraction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ja4l_bounty_extracted():
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "sniffer",
|
||||
"attacker_ip": "10.0.0.21",
|
||||
"event_type": "tls_session",
|
||||
"fields": {
|
||||
"ja4l_rtt_ms": "12.5",
|
||||
"ja4l_client_ttl": "64",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
calls = repo.add_bounty.call_args_list
|
||||
ja4l_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "ja4l"]
|
||||
assert len(ja4l_calls) == 1
|
||||
payload = ja4l_calls[0][0][0]["payload"]
|
||||
assert payload["rtt_ms"] == "12.5"
|
||||
assert payload["client_ttl"] == "64"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ja4l_not_extracted_without_rtt():
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "sniffer",
|
||||
"attacker_ip": "10.0.0.22",
|
||||
"event_type": "tls_session",
|
||||
"fields": {
|
||||
"ja4l_client_ttl": "64",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
calls = repo.add_bounty.call_args_list
|
||||
ja4l_calls = [c for c in calls if c[0][0].get("payload", {}).get("fingerprint_type") == "ja4l"]
|
||||
assert len(ja4l_calls) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TLS session resumption extraction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tls_resumption_bounty_extracted():
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "sniffer",
|
||||
"attacker_ip": "10.0.0.23",
|
||||
"event_type": "tls_client_hello",
|
||||
"fields": {
|
||||
"resumption": "session_ticket,psk",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
calls = repo.add_bounty.call_args_list
|
||||
res_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_resumption"]
|
||||
assert len(res_calls) == 1
|
||||
assert res_calls[0][0][0]["payload"]["mechanisms"] == "session_ticket,psk"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_resumption_no_bounty():
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "sniffer",
|
||||
"attacker_ip": "10.0.0.24",
|
||||
"event_type": "tls_client_hello",
|
||||
"fields": {
|
||||
"ja3": "abc123",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
calls = repo.add_bounty.call_args_list
|
||||
res_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_resumption"]
|
||||
assert len(res_calls) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TLS certificate extraction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tls_certificate_bounty_extracted():
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "sniffer",
|
||||
"attacker_ip": "10.0.0.25",
|
||||
"event_type": "tls_certificate",
|
||||
"fields": {
|
||||
"subject_cn": "evil.c2.local",
|
||||
"issuer": "CN=Evil CA",
|
||||
"self_signed": "true",
|
||||
"not_before": "230101000000Z",
|
||||
"not_after": "260101000000Z",
|
||||
"sans": "evil.c2.local,*.evil.c2.local",
|
||||
"sni": "evil.c2.local",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
calls = repo.add_bounty.call_args_list
|
||||
cert_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_certificate"]
|
||||
assert len(cert_calls) == 1
|
||||
payload = cert_calls[0][0][0]["payload"]
|
||||
assert payload["subject_cn"] == "evil.c2.local"
|
||||
assert payload["self_signed"] == "true"
|
||||
assert payload["issuer"] == "CN=Evil CA"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tls_certificate_not_extracted_from_non_sniffer():
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "http",
|
||||
"attacker_ip": "10.0.0.26",
|
||||
"event_type": "tls_certificate",
|
||||
"fields": {
|
||||
"subject_cn": "not-from-sniffer.local",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
calls = repo.add_bounty.call_args_list
|
||||
cert_calls = [c for c in calls if c[0][0].get("payload", {}).get("fingerprint_type") == "tls_certificate"]
|
||||
assert len(cert_calls) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Multiple fingerprints from single sniffer log
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sniffer_log_yields_multiple_fingerprint_types():
|
||||
"""A complete TLS session log with JA3 + JA4L + resumption yields 3 bounties."""
|
||||
repo = _make_repo()
|
||||
log_data = {
|
||||
"decky": "decky-05",
|
||||
"service": "sniffer",
|
||||
"attacker_ip": "10.0.0.30",
|
||||
"event_type": "tls_session",
|
||||
"fields": {
|
||||
"ja3": "abc123",
|
||||
"ja3s": "def456",
|
||||
"ja4": "t13d0203h2_aabb_ccdd",
|
||||
"ja4s": "t1302h2_eeff",
|
||||
"ja4l_rtt_ms": "5.2",
|
||||
"ja4l_client_ttl": "128",
|
||||
"resumption": "session_ticket",
|
||||
"tls_version": "TLS 1.3",
|
||||
"dst_port": "443",
|
||||
},
|
||||
}
|
||||
await _extract_bounty(repo, log_data)
|
||||
assert repo.add_bounty.await_count == 3
|
||||
types = {c[0][0]["payload"]["fingerprint_type"] for c in repo.add_bounty.call_args_list}
|
||||
assert types == {"ja3", "ja4l", "tls_resumption"}
|
||||
|
||||
@@ -43,11 +43,20 @@ _srv = _load_sniffer()
|
||||
|
||||
_parse_client_hello = _srv._parse_client_hello
|
||||
_parse_server_hello = _srv._parse_server_hello
|
||||
_parse_certificate = _srv._parse_certificate
|
||||
_ja3 = _srv._ja3
|
||||
_ja3s = _srv._ja3s
|
||||
_ja4 = _srv._ja4
|
||||
_ja4s = _srv._ja4s
|
||||
_ja4_version = _srv._ja4_version
|
||||
_ja4_alpn_tag = _srv._ja4_alpn_tag
|
||||
_sha256_12 = _srv._sha256_12
|
||||
_session_resumption_info = _srv._session_resumption_info
|
||||
_is_grease = _srv._is_grease
|
||||
_filter_grease = _srv._filter_grease
|
||||
_tls_version_str = _srv._tls_version_str
|
||||
_parse_x509_der = _srv._parse_x509_der
|
||||
_der_read_oid = _srv._der_read_oid
|
||||
|
||||
|
||||
# ─── TLS byte builder helpers ─────────────────────────────────────────────────
|
||||
@@ -435,3 +444,744 @@ class TestRoundTrip:
|
||||
_, ja3_hash_clean = _ja3(ch_clean)
|
||||
|
||||
assert ja3_hash == ja3_hash_clean
|
||||
|
||||
|
||||
# ─── Extension builder helpers for new tests ─────────────────────────────────
|
||||
|
||||
def _build_signature_algorithms_extension(sig_algs: list[int]) -> bytes:
|
||||
sa_bytes = b"".join(struct.pack("!H", s) for s in sig_algs)
|
||||
data = struct.pack("!H", len(sa_bytes)) + sa_bytes
|
||||
return _build_extension(0x000D, data)
|
||||
|
||||
|
||||
def _build_supported_versions_extension(versions: list[int]) -> bytes:
|
||||
v_bytes = b"".join(struct.pack("!H", v) for v in versions)
|
||||
data = bytes([len(v_bytes)]) + v_bytes
|
||||
return _build_extension(0x002B, data)
|
||||
|
||||
|
||||
def _build_session_ticket_extension(ticket_data: bytes = b"") -> bytes:
|
||||
return _build_extension(0x0023, ticket_data)
|
||||
|
||||
|
||||
def _build_psk_extension() -> bytes:
|
||||
return _build_extension(0x0029, b"\x00\x01\x00")
|
||||
|
||||
|
||||
def _build_early_data_extension() -> bytes:
|
||||
return _build_extension(0x002A, b"")
|
||||
|
||||
|
||||
def _build_server_hello_with_exts(
|
||||
version: int = 0x0303,
|
||||
cipher_suite: int = 0x002F,
|
||||
extensions_bytes: bytes = b"",
|
||||
selected_version: int | None = None,
|
||||
alpn: str | None = None,
|
||||
) -> bytes:
|
||||
"""Build a ServerHello with optional supported_versions and ALPN extensions."""
|
||||
ext_parts = b""
|
||||
if selected_version is not None:
|
||||
ext_parts += _build_extension(0x002B, struct.pack("!H", selected_version))
|
||||
if alpn is not None:
|
||||
proto = alpn.encode()
|
||||
proto_data = bytes([len(proto)]) + proto
|
||||
alpn_data = struct.pack("!H", len(proto_data)) + proto_data
|
||||
ext_parts += _build_extension(0x0010, alpn_data)
|
||||
if extensions_bytes:
|
||||
ext_parts += extensions_bytes
|
||||
return _build_server_hello(version=version, cipher_suite=cipher_suite, extensions_bytes=ext_parts)
|
||||
|
||||
|
||||
# ─── ClientHello extended field tests ────────────────────────────────────────
|
||||
|
||||
class TestClientHelloExtendedFields:
|
||||
def test_signature_algorithms_extracted(self):
|
||||
ext = _build_signature_algorithms_extension([0x0401, 0x0501, 0x0601])
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["signature_algorithms"] == [0x0401, 0x0501, 0x0601]
|
||||
|
||||
def test_supported_versions_extracted(self):
|
||||
ext = _build_supported_versions_extension([0x0304, 0x0303])
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["supported_versions"] == [0x0304, 0x0303]
|
||||
|
||||
def test_grease_filtered_from_supported_versions(self):
|
||||
ext = _build_supported_versions_extension([0x0A0A, 0x0304, 0x0303])
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert 0x0A0A not in result["supported_versions"]
|
||||
assert 0x0304 in result["supported_versions"]
|
||||
|
||||
def test_session_ticket_empty_no_resumption(self):
|
||||
ext = _build_session_ticket_extension(b"")
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["has_session_ticket_data"] is False
|
||||
|
||||
def test_session_ticket_with_data_resumption(self):
|
||||
ext = _build_session_ticket_extension(b"\x01\x02\x03\x04")
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["has_session_ticket_data"] is True
|
||||
|
||||
def test_psk_extension_detected(self):
|
||||
ext = _build_psk_extension()
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["has_pre_shared_key"] is True
|
||||
|
||||
def test_early_data_extension_detected(self):
|
||||
ext = _build_early_data_extension()
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["has_early_data"] is True
|
||||
|
||||
def test_no_resumption_by_default(self):
|
||||
data = _build_client_hello()
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["has_session_ticket_data"] is False
|
||||
assert result["has_pre_shared_key"] is False
|
||||
assert result["has_early_data"] is False
|
||||
|
||||
def test_combined_extensions_all_parsed(self):
|
||||
"""All new extensions should be parsed alongside existing ones."""
|
||||
ext = (
|
||||
_build_sni_extension("evil.c2.io")
|
||||
+ _build_supported_groups_extension([0x001D])
|
||||
+ _build_signature_algorithms_extension([0x0401])
|
||||
+ _build_supported_versions_extension([0x0304, 0x0303])
|
||||
+ _build_alpn_extension(["h2"])
|
||||
)
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
result = _parse_client_hello(data)
|
||||
assert result is not None
|
||||
assert result["sni"] == "evil.c2.io"
|
||||
assert result["supported_groups"] == [0x001D]
|
||||
assert result["signature_algorithms"] == [0x0401]
|
||||
assert result["supported_versions"] == [0x0304, 0x0303]
|
||||
assert result["alpn"] == ["h2"]
|
||||
|
||||
|
||||
# ─── ServerHello extended field tests ────────────────────────────────────────
|
||||
|
||||
class TestServerHelloExtendedFields:
|
||||
def test_selected_version_extracted(self):
|
||||
data = _build_server_hello_with_exts(selected_version=0x0304)
|
||||
result = _parse_server_hello(data)
|
||||
assert result is not None
|
||||
assert result["selected_version"] == 0x0304
|
||||
|
||||
def test_no_selected_version_returns_none(self):
|
||||
data = _build_server_hello()
|
||||
result = _parse_server_hello(data)
|
||||
assert result is not None
|
||||
assert result["selected_version"] is None
|
||||
|
||||
def test_alpn_extracted_from_server_hello(self):
|
||||
data = _build_server_hello_with_exts(alpn="h2")
|
||||
result = _parse_server_hello(data)
|
||||
assert result is not None
|
||||
assert result["alpn"] == "h2"
|
||||
|
||||
|
||||
# ─── JA4 tests ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestJA4:
|
||||
def test_ja4_format_three_sections(self):
|
||||
"""JA4 must have format: section_a_section_b_section_c"""
|
||||
ch = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suites": [0x002F, 0x0035],
|
||||
"extensions": [0x000A, 0x000D],
|
||||
"supported_groups": [0x001D],
|
||||
"ec_point_formats": [0x00],
|
||||
"signature_algorithms": [0x0401],
|
||||
"supported_versions": [],
|
||||
"sni": "test.com",
|
||||
"alpn": ["h2"],
|
||||
}
|
||||
result = _ja4(ch)
|
||||
parts = result.split("_")
|
||||
assert len(parts) == 3
|
||||
|
||||
def test_ja4_section_a_format(self):
|
||||
ch = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suites": [0x002F, 0x0035],
|
||||
"extensions": [0x000A, 0x000D, 0x0010],
|
||||
"supported_groups": [],
|
||||
"ec_point_formats": [],
|
||||
"signature_algorithms": [0x0401],
|
||||
"supported_versions": [0x0304, 0x0303],
|
||||
"sni": "target.local",
|
||||
"alpn": ["h2", "http/1.1"],
|
||||
}
|
||||
result = _ja4(ch)
|
||||
section_a = result.split("_")[0]
|
||||
# t = TCP, 13 = TLS 1.3 (from supported_versions), d = has SNI
|
||||
# 02 = 2 ciphers, 03 = 3 extensions, h2 = ALPN first proto
|
||||
assert section_a == "t13d0203h2"
|
||||
|
||||
def test_ja4_no_sni_uses_i(self):
|
||||
ch = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suites": [0x002F],
|
||||
"extensions": [],
|
||||
"supported_groups": [],
|
||||
"ec_point_formats": [],
|
||||
"signature_algorithms": [],
|
||||
"supported_versions": [],
|
||||
"sni": "",
|
||||
"alpn": [],
|
||||
}
|
||||
result = _ja4(ch)
|
||||
section_a = result.split("_")[0]
|
||||
assert section_a[3] == "i" # no SNI → 'i'
|
||||
|
||||
def test_ja4_no_alpn_uses_00(self):
|
||||
ch = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suites": [0x002F],
|
||||
"extensions": [],
|
||||
"supported_groups": [],
|
||||
"ec_point_formats": [],
|
||||
"signature_algorithms": [],
|
||||
"supported_versions": [],
|
||||
"sni": "",
|
||||
"alpn": [],
|
||||
}
|
||||
result = _ja4(ch)
|
||||
section_a = result.split("_")[0]
|
||||
assert section_a.endswith("00")
|
||||
|
||||
def test_ja4_section_b_is_sha256_12(self):
|
||||
ch = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suites": [0x0035, 0x002F], # unsorted
|
||||
"extensions": [],
|
||||
"supported_groups": [],
|
||||
"ec_point_formats": [],
|
||||
"signature_algorithms": [],
|
||||
"supported_versions": [],
|
||||
"sni": "",
|
||||
"alpn": [],
|
||||
}
|
||||
result = _ja4(ch)
|
||||
section_b = result.split("_")[1]
|
||||
assert len(section_b) == 12
|
||||
# Should be SHA256 of sorted ciphers: "47,53"
|
||||
expected = hashlib.sha256(b"47,53").hexdigest()[:12]
|
||||
assert section_b == expected
|
||||
|
||||
def test_ja4_section_c_includes_signature_algorithms(self):
|
||||
ch = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suites": [0x002F],
|
||||
"extensions": [0x000D], # sig_algs extension type
|
||||
"supported_groups": [],
|
||||
"ec_point_formats": [],
|
||||
"signature_algorithms": [0x0601, 0x0401],
|
||||
"supported_versions": [],
|
||||
"sni": "",
|
||||
"alpn": [],
|
||||
}
|
||||
result = _ja4(ch)
|
||||
section_c = result.split("_")[2]
|
||||
assert len(section_c) == 12
|
||||
# combined = "13_1025,1537" (sorted ext=13, sorted sig_algs=0x0401=1025, 0x0601=1537)
|
||||
expected = hashlib.sha256(b"13_1025,1537").hexdigest()[:12]
|
||||
assert section_c == expected
|
||||
|
||||
def test_ja4_same_ciphers_different_order_same_hash(self):
|
||||
base = {
|
||||
"tls_version": 0x0303,
|
||||
"extensions": [],
|
||||
"supported_groups": [],
|
||||
"ec_point_formats": [],
|
||||
"signature_algorithms": [],
|
||||
"supported_versions": [],
|
||||
"sni": "",
|
||||
"alpn": [],
|
||||
}
|
||||
ch1 = {**base, "cipher_suites": [0x002F, 0x0035]}
|
||||
ch2 = {**base, "cipher_suites": [0x0035, 0x002F]}
|
||||
assert _ja4(ch1) == _ja4(ch2)
|
||||
|
||||
def test_ja4_different_ciphers_different_hash(self):
|
||||
base = {
|
||||
"tls_version": 0x0303,
|
||||
"extensions": [],
|
||||
"supported_groups": [],
|
||||
"ec_point_formats": [],
|
||||
"signature_algorithms": [],
|
||||
"supported_versions": [],
|
||||
"sni": "",
|
||||
"alpn": [],
|
||||
}
|
||||
ch1 = {**base, "cipher_suites": [0x002F]}
|
||||
ch2 = {**base, "cipher_suites": [0x0035]}
|
||||
assert _ja4(ch1) != _ja4(ch2)
|
||||
|
||||
def test_ja4_roundtrip_from_bytes(self):
|
||||
"""Build a ClientHello from bytes and compute JA4."""
|
||||
ext = (
|
||||
_build_sni_extension("c2.attacker.net")
|
||||
+ _build_signature_algorithms_extension([0x0401, 0x0501])
|
||||
+ _build_supported_versions_extension([0x0304, 0x0303])
|
||||
+ _build_alpn_extension(["h2"])
|
||||
)
|
||||
data = _build_client_hello(
|
||||
cipher_suites=[0x1301, 0x1302, 0x002F],
|
||||
extensions_bytes=ext,
|
||||
)
|
||||
ch = _parse_client_hello(data)
|
||||
assert ch is not None
|
||||
result = _ja4(ch)
|
||||
parts = result.split("_")
|
||||
assert len(parts) == 3
|
||||
section_a = parts[0]
|
||||
assert section_a.startswith("t13") # TLS 1.3 via supported_versions
|
||||
assert "d" in section_a # has SNI
|
||||
assert section_a.endswith("h2") # ALPN = h2
|
||||
|
||||
|
||||
# ─── JA4S tests ──────────────────────────────────────────────────────────────
|
||||
|
||||
class TestJA4S:
|
||||
def test_ja4s_format_two_sections(self):
|
||||
sh = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suite": 0x002F,
|
||||
"extensions": [0xFF01],
|
||||
"selected_version": None,
|
||||
"alpn": "",
|
||||
}
|
||||
result = _ja4s(sh)
|
||||
parts = result.split("_")
|
||||
assert len(parts) == 2
|
||||
|
||||
def test_ja4s_section_a_format(self):
|
||||
sh = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suite": 0x1301,
|
||||
"extensions": [0xFF01, 0x002B],
|
||||
"selected_version": 0x0304,
|
||||
"alpn": "h2",
|
||||
}
|
||||
result = _ja4s(sh)
|
||||
section_a = result.split("_")[0]
|
||||
# t = TCP, 13 = TLS 1.3 (selected_version), 02 = 2 extensions, h2 = ALPN
|
||||
assert section_a == "t1302h2"
|
||||
|
||||
def test_ja4s_uses_selected_version_when_available(self):
|
||||
sh = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suite": 0x1301,
|
||||
"extensions": [],
|
||||
"selected_version": 0x0304,
|
||||
"alpn": "",
|
||||
}
|
||||
result = _ja4s(sh)
|
||||
section_a = result.split("_")[0]
|
||||
assert "13" in section_a # TLS 1.3
|
||||
|
||||
def test_ja4s_falls_back_to_tls_version(self):
|
||||
sh = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suite": 0x002F,
|
||||
"extensions": [],
|
||||
"selected_version": None,
|
||||
"alpn": "",
|
||||
}
|
||||
result = _ja4s(sh)
|
||||
section_a = result.split("_")[0]
|
||||
assert section_a.startswith("t12") # TLS 1.2
|
||||
|
||||
def test_ja4s_section_b_is_sha256_12(self):
|
||||
sh = {
|
||||
"tls_version": 0x0303,
|
||||
"cipher_suite": 0x002F, # 47
|
||||
"extensions": [0xFF01], # 65281
|
||||
"selected_version": None,
|
||||
"alpn": "",
|
||||
}
|
||||
result = _ja4s(sh)
|
||||
section_b = result.split("_")[1]
|
||||
assert len(section_b) == 12
|
||||
expected = hashlib.sha256(b"47,65281").hexdigest()[:12]
|
||||
assert section_b == expected
|
||||
|
||||
def test_ja4s_roundtrip_from_bytes(self):
|
||||
data = _build_server_hello_with_exts(
|
||||
cipher_suite=0x1301,
|
||||
selected_version=0x0304,
|
||||
alpn="h2",
|
||||
)
|
||||
sh = _parse_server_hello(data)
|
||||
assert sh is not None
|
||||
result = _ja4s(sh)
|
||||
parts = result.split("_")
|
||||
assert len(parts) == 2
|
||||
assert parts[0].startswith("t13")
|
||||
|
||||
|
||||
# ─── JA4 version detection tests ─────────────────────────────────────────────
|
||||
|
||||
class TestJA4Version:
|
||||
def test_tls13_from_supported_versions(self):
|
||||
ch = {"supported_versions": [0x0304, 0x0303], "tls_version": 0x0303}
|
||||
assert _ja4_version(ch) == "13"
|
||||
|
||||
def test_tls12_no_supported_versions(self):
|
||||
ch = {"supported_versions": [], "tls_version": 0x0303}
|
||||
assert _ja4_version(ch) == "12"
|
||||
|
||||
def test_tls10(self):
|
||||
ch = {"supported_versions": [], "tls_version": 0x0301}
|
||||
assert _ja4_version(ch) == "10"
|
||||
|
||||
def test_ssl30(self):
|
||||
ch = {"supported_versions": [], "tls_version": 0x0300}
|
||||
assert _ja4_version(ch) == "s3"
|
||||
|
||||
def test_unknown_version(self):
|
||||
ch = {"supported_versions": [], "tls_version": 0xFFFF}
|
||||
assert _ja4_version(ch) == "00"
|
||||
|
||||
|
||||
# ─── JA4 ALPN tag tests ──────────────────────────────────────────────────────
|
||||
|
||||
class TestJA4AlpnTag:
|
||||
def test_h2(self):
|
||||
assert _ja4_alpn_tag(["h2"]) == "h2"
|
||||
|
||||
def test_http11(self):
|
||||
assert _ja4_alpn_tag(["http/1.1"]) == "h1"
|
||||
|
||||
def test_no_alpn(self):
|
||||
assert _ja4_alpn_tag([]) == "00"
|
||||
|
||||
def test_single_char_protocol(self):
|
||||
assert _ja4_alpn_tag(["x"]) == "xx"
|
||||
|
||||
def test_string_input(self):
|
||||
assert _ja4_alpn_tag("h2") == "h2"
|
||||
|
||||
def test_empty_string(self):
|
||||
assert _ja4_alpn_tag("") == "00"
|
||||
|
||||
|
||||
# ─── SHA256-12 tests ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestSha256_12:
|
||||
def test_returns_12_hex_chars(self):
|
||||
result = _sha256_12("test")
|
||||
assert len(result) == 12
|
||||
assert all(c in "0123456789abcdef" for c in result)
|
||||
|
||||
def test_deterministic(self):
|
||||
assert _sha256_12("hello") == _sha256_12("hello")
|
||||
|
||||
def test_different_input_different_output(self):
|
||||
assert _sha256_12("a") != _sha256_12("b")
|
||||
|
||||
def test_matches_hashlib(self):
|
||||
expected = hashlib.sha256(b"test_input").hexdigest()[:12]
|
||||
assert _sha256_12("test_input") == expected
|
||||
|
||||
|
||||
# ─── Session resumption tests ────────────────────────────────────────────────
|
||||
|
||||
class TestSessionResumption:
|
||||
def test_no_resumption_by_default(self):
|
||||
ch = {
|
||||
"has_session_ticket_data": False,
|
||||
"has_pre_shared_key": False,
|
||||
"has_early_data": False,
|
||||
"session_id": b"",
|
||||
}
|
||||
info = _session_resumption_info(ch)
|
||||
assert info["resumption_attempted"] is False
|
||||
assert info["mechanisms"] == []
|
||||
|
||||
def test_session_ticket_resumption(self):
|
||||
ch = {
|
||||
"has_session_ticket_data": True,
|
||||
"has_pre_shared_key": False,
|
||||
"has_early_data": False,
|
||||
"session_id": b"",
|
||||
}
|
||||
info = _session_resumption_info(ch)
|
||||
assert info["resumption_attempted"] is True
|
||||
assert "session_ticket" in info["mechanisms"]
|
||||
|
||||
def test_psk_resumption(self):
|
||||
ch = {
|
||||
"has_session_ticket_data": False,
|
||||
"has_pre_shared_key": True,
|
||||
"has_early_data": False,
|
||||
"session_id": b"",
|
||||
}
|
||||
info = _session_resumption_info(ch)
|
||||
assert info["resumption_attempted"] is True
|
||||
assert "psk" in info["mechanisms"]
|
||||
|
||||
def test_early_data_0rtt(self):
|
||||
ch = {
|
||||
"has_session_ticket_data": False,
|
||||
"has_pre_shared_key": False,
|
||||
"has_early_data": True,
|
||||
"session_id": b"",
|
||||
}
|
||||
info = _session_resumption_info(ch)
|
||||
assert info["resumption_attempted"] is True
|
||||
assert "early_data_0rtt" in info["mechanisms"]
|
||||
|
||||
def test_session_id_resumption(self):
|
||||
ch = {
|
||||
"has_session_ticket_data": False,
|
||||
"has_pre_shared_key": False,
|
||||
"has_early_data": False,
|
||||
"session_id": b"\x01\x02\x03",
|
||||
}
|
||||
info = _session_resumption_info(ch)
|
||||
assert info["resumption_attempted"] is True
|
||||
assert "session_id" in info["mechanisms"]
|
||||
|
||||
def test_multiple_mechanisms(self):
|
||||
ch = {
|
||||
"has_session_ticket_data": True,
|
||||
"has_pre_shared_key": True,
|
||||
"has_early_data": True,
|
||||
"session_id": b"\x01",
|
||||
}
|
||||
info = _session_resumption_info(ch)
|
||||
assert info["resumption_attempted"] is True
|
||||
assert len(info["mechanisms"]) == 4
|
||||
|
||||
def test_resumption_from_parsed_client_hello(self):
|
||||
ext = _build_session_ticket_extension(b"\xDE\xAD\xBE\xEF")
|
||||
data = _build_client_hello(extensions_bytes=ext)
|
||||
ch = _parse_client_hello(data)
|
||||
assert ch is not None
|
||||
info = _session_resumption_info(ch)
|
||||
assert info["resumption_attempted"] is True
|
||||
assert "session_ticket" in info["mechanisms"]
|
||||
|
||||
|
||||
# ─── Certificate parsing tests ───────────────────────────────────────────────
|
||||
|
||||
def _build_der_length(length: int) -> bytes:
|
||||
"""Encode a DER length."""
|
||||
if length < 0x80:
|
||||
return bytes([length])
|
||||
elif length < 0x100:
|
||||
return bytes([0x81, length])
|
||||
else:
|
||||
return bytes([0x82]) + struct.pack("!H", length)
|
||||
|
||||
|
||||
def _build_der_sequence(content: bytes) -> bytes:
|
||||
return b"\x30" + _build_der_length(len(content)) + content
|
||||
|
||||
|
||||
def _build_der_set(content: bytes) -> bytes:
|
||||
return b"\x31" + _build_der_length(len(content)) + content
|
||||
|
||||
|
||||
def _build_der_oid_bytes(oid_str: str) -> bytes:
|
||||
"""Encode a dotted OID string to DER OID bytes."""
|
||||
parts = [int(x) for x in oid_str.split(".")]
|
||||
first_byte = parts[0] * 40 + parts[1]
|
||||
encoded = bytes([first_byte])
|
||||
for val in parts[2:]:
|
||||
if val < 0x80:
|
||||
encoded += bytes([val])
|
||||
else:
|
||||
octets = []
|
||||
while val > 0:
|
||||
octets.append(val & 0x7F)
|
||||
val >>= 7
|
||||
octets.reverse()
|
||||
for i in range(len(octets) - 1):
|
||||
octets[i] |= 0x80
|
||||
encoded += bytes(octets)
|
||||
return b"\x06" + _build_der_length(len(encoded)) + encoded
|
||||
|
||||
|
||||
def _build_der_utf8string(text: str) -> bytes:
|
||||
encoded = text.encode("utf-8")
|
||||
return b"\x0C" + _build_der_length(len(encoded)) + encoded
|
||||
|
||||
|
||||
def _build_der_utctime(time_str: str) -> bytes:
|
||||
encoded = time_str.encode("ascii")
|
||||
return b"\x17" + _build_der_length(len(encoded)) + encoded
|
||||
|
||||
|
||||
def _build_rdn(oid: str, value: str) -> bytes:
|
||||
"""Build a single RDN SET { SEQUENCE { OID, UTF8String } }."""
|
||||
attr = _build_der_sequence(_build_der_oid_bytes(oid) + _build_der_utf8string(value))
|
||||
return _build_der_set(attr)
|
||||
|
||||
|
||||
def _build_x509_name(cn: str, o: str = "", c: str = "") -> bytes:
|
||||
"""Build an X.501 Name with optional CN, O, C."""
|
||||
rdns = b""
|
||||
if c:
|
||||
rdns += _build_rdn("2.5.4.6", c)
|
||||
if o:
|
||||
rdns += _build_rdn("2.5.4.10", o)
|
||||
if cn:
|
||||
rdns += _build_rdn("2.5.4.3", cn)
|
||||
return _build_der_sequence(rdns)
|
||||
|
||||
|
||||
def _build_minimal_tbs_certificate(
|
||||
subject_cn: str = "evil.c2.local",
|
||||
issuer_cn: str = "Evil CA",
|
||||
not_before: str = "230101000000Z",
|
||||
not_after: str = "260101000000Z",
|
||||
self_signed: bool = False,
|
||||
) -> bytes:
|
||||
"""Build a minimal tbsCertificate DER structure."""
|
||||
if self_signed:
|
||||
issuer_cn = subject_cn
|
||||
|
||||
# version [0] EXPLICIT INTEGER 2 (v3)
|
||||
version = b"\xa0\x03\x02\x01\x02"
|
||||
# serialNumber INTEGER
|
||||
serial = b"\x02\x01\x01"
|
||||
# signature algorithm (sha256WithRSAEncryption = 1.2.840.113549.1.1.11)
|
||||
sig_alg = _build_der_sequence(_build_der_oid_bytes("1.2.840.113549.1.1.11") + b"\x05\x00")
|
||||
# issuer
|
||||
issuer = _build_x509_name(issuer_cn)
|
||||
# validity
|
||||
validity = _build_der_sequence(
|
||||
_build_der_utctime(not_before) + _build_der_utctime(not_after)
|
||||
)
|
||||
# subject
|
||||
subject = _build_x509_name(subject_cn)
|
||||
# subjectPublicKeyInfo (minimal RSA placeholder)
|
||||
spki = _build_der_sequence(
|
||||
_build_der_sequence(_build_der_oid_bytes("1.2.840.113549.1.1.1") + b"\x05\x00")
|
||||
+ b"\x03\x03\x00\x00\x01"
|
||||
)
|
||||
|
||||
tbs = version + serial + sig_alg + issuer + validity + subject + spki
|
||||
return _build_der_sequence(tbs)
|
||||
|
||||
|
||||
def _build_certificate_der(
|
||||
subject_cn: str = "evil.c2.local",
|
||||
issuer_cn: str = "Evil CA",
|
||||
self_signed: bool = False,
|
||||
not_before: str = "230101000000Z",
|
||||
not_after: str = "260101000000Z",
|
||||
) -> bytes:
|
||||
"""Build a complete X.509 DER certificate (minimal)."""
|
||||
tbs = _build_minimal_tbs_certificate(
|
||||
subject_cn=subject_cn, issuer_cn=issuer_cn,
|
||||
self_signed=self_signed, not_before=not_before, not_after=not_after,
|
||||
)
|
||||
# signatureAlgorithm
|
||||
sig_alg = _build_der_sequence(_build_der_oid_bytes("1.2.840.113549.1.1.11") + b"\x05\x00")
|
||||
# signatureValue (BIT STRING, minimal placeholder)
|
||||
sig_val = b"\x03\x03\x00\x00\x01"
|
||||
return _build_der_sequence(tbs + sig_alg + sig_val)
|
||||
|
||||
|
||||
def _build_tls_certificate_message(cert_der: bytes) -> bytes:
|
||||
"""Wrap a DER certificate in a TLS Certificate handshake message."""
|
||||
# Certificate entry: 3-byte length + cert
|
||||
cert_entry = struct.pack("!I", len(cert_der))[1:] + cert_der
|
||||
# Certificates list: 3-byte total length + entries
|
||||
certs_list = struct.pack("!I", len(cert_entry))[1:] + cert_entry
|
||||
# Handshake header: type(1=0x0B) + 3-byte length
|
||||
hs = bytes([0x0B]) + struct.pack("!I", len(certs_list))[1:] + certs_list
|
||||
# TLS record header
|
||||
return b"\x16\x03\x03" + struct.pack("!H", len(hs)) + hs
|
||||
|
||||
|
||||
class TestCertificateParsing:
|
||||
def test_basic_certificate_parsed(self):
|
||||
cert_der = _build_certificate_der(subject_cn="pwned.local", issuer_cn="Fake CA")
|
||||
tls_msg = _build_tls_certificate_message(cert_der)
|
||||
result = _parse_certificate(tls_msg)
|
||||
assert result is not None
|
||||
assert result["subject_cn"] == "pwned.local"
|
||||
assert "Fake CA" in result["issuer_cn"]
|
||||
|
||||
def test_self_signed_detected(self):
|
||||
cert_der = _build_certificate_der(subject_cn="selfsigned.evil", self_signed=True)
|
||||
tls_msg = _build_tls_certificate_message(cert_der)
|
||||
result = _parse_certificate(tls_msg)
|
||||
assert result is not None
|
||||
assert result["self_signed"] is True
|
||||
assert result["subject_cn"] == "selfsigned.evil"
|
||||
|
||||
def test_not_self_signed(self):
|
||||
cert_der = _build_certificate_der(subject_cn="legit.com", issuer_cn="DigiCert")
|
||||
tls_msg = _build_tls_certificate_message(cert_der)
|
||||
result = _parse_certificate(tls_msg)
|
||||
assert result is not None
|
||||
assert result["self_signed"] is False
|
||||
|
||||
def test_validity_period_extracted(self):
|
||||
cert_der = _build_certificate_der(
|
||||
not_before="240601120000Z", not_after="250601120000Z"
|
||||
)
|
||||
tls_msg = _build_tls_certificate_message(cert_der)
|
||||
result = _parse_certificate(tls_msg)
|
||||
assert result is not None
|
||||
assert "240601" in result["not_before"]
|
||||
assert "250601" in result["not_after"]
|
||||
|
||||
def test_non_certificate_message_returns_none(self):
|
||||
# Build a ClientHello instead
|
||||
data = _build_client_hello()
|
||||
assert _parse_certificate(data) is None
|
||||
|
||||
def test_empty_cert_list_returns_none(self):
|
||||
# Handshake with 0-length certificate list
|
||||
hs = bytes([0x0B, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00])
|
||||
tls = b"\x16\x03\x03" + struct.pack("!H", len(hs)) + hs
|
||||
assert _parse_certificate(tls) is None
|
||||
|
||||
def test_too_short_returns_none(self):
|
||||
assert _parse_certificate(b"") is None
|
||||
assert _parse_certificate(b"\x16\x03\x03") is None
|
||||
|
||||
def test_x509_der_direct(self):
|
||||
cert_der = _build_certificate_der(subject_cn="direct.test")
|
||||
result = _parse_x509_der(cert_der)
|
||||
assert result is not None
|
||||
assert result["subject_cn"] == "direct.test"
|
||||
|
||||
|
||||
# ─── DER OID tests ───────────────────────────────────────────────────────────
|
||||
|
||||
class TestDerOid:
|
||||
def test_cn_oid(self):
|
||||
raw = _build_der_oid_bytes("2.5.4.3")
|
||||
# Skip tag+length
|
||||
_, start, length = _srv._der_read_tag_len(raw, 0)
|
||||
oid = _der_read_oid(raw, start, length)
|
||||
assert oid == "2.5.4.3"
|
||||
|
||||
def test_sha256_rsa_oid(self):
|
||||
raw = _build_der_oid_bytes("1.2.840.113549.1.1.11")
|
||||
_, start, length = _srv._der_read_tag_len(raw, 0)
|
||||
oid = _der_read_oid(raw, start, length)
|
||||
assert oid == "1.2.840.113549.1.1.11"
|
||||
|
||||
Reference in New Issue
Block a user