- Remove unused Optional import (F401) in telemetry.py - Move imports above module-level code (E402) in web/db/models.py - Default API/web hosts to 127.0.0.1 instead of 0.0.0.0 (B104) - Add usedforsecurity=False to MD5 calls in JA3/HASSH fingerprinting (B324) - Annotate intentional try/except/pass blocks with nosec (B110) - Remove stale nosec comments that no longer suppress anything
228 lines
6.2 KiB
Python
228 lines
6.2 KiB
Python
"""
|
|
TCP/IP stack fingerprinting via SYN-ACK analysis.
|
|
|
|
Sends a crafted TCP SYN packet to a target host:port, captures the
|
|
SYN-ACK response, and extracts OS/tool-identifying characteristics:
|
|
TTL, window size, DF bit, MSS, window scale, SACK support, timestamps,
|
|
and TCP options ordering.
|
|
|
|
Uses scapy for packet crafting and parsing. Requires root/CAP_NET_RAW.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import random
|
|
from typing import Any
|
|
|
|
from decnet.telemetry import traced as _traced
|
|
|
|
# Lazy-import scapy to avoid breaking non-root usage of HASSH/JARM.
|
|
# The actual import happens inside functions that need it.
|
|
|
|
# ─── TCP option short codes ─────────────────────────────────────────────────
|
|
|
|
_OPT_CODES: dict[str, str] = {
|
|
"MSS": "M",
|
|
"WScale": "W",
|
|
"SAckOK": "S",
|
|
"SAck": "S",
|
|
"Timestamp": "T",
|
|
"NOP": "N",
|
|
"EOL": "E",
|
|
"AltChkSum": "A",
|
|
"AltChkSumOpt": "A",
|
|
"UTO": "U",
|
|
}
|
|
|
|
|
|
# ─── Packet construction ───────────────────────────────────────────────────
|
|
|
|
@_traced("prober.tcpfp_send_syn")
|
|
def _send_syn(
|
|
host: str,
|
|
port: int,
|
|
timeout: float,
|
|
) -> Any | None:
|
|
"""
|
|
Craft a TCP SYN with common options and send it. Returns the
|
|
SYN-ACK response packet or None on timeout/failure.
|
|
"""
|
|
from scapy.all import IP, TCP, conf, sr1
|
|
|
|
# Suppress scapy's noisy output
|
|
conf.verb = 0
|
|
|
|
src_port = random.randint(49152, 65535) # nosec B311 — ephemeral port, not crypto
|
|
|
|
pkt = (
|
|
IP(dst=host)
|
|
/ TCP(
|
|
sport=src_port,
|
|
dport=port,
|
|
flags="S",
|
|
options=[
|
|
("MSS", 1460),
|
|
("NOP", None),
|
|
("WScale", 7),
|
|
("NOP", None),
|
|
("NOP", None),
|
|
("Timestamp", (0, 0)),
|
|
("SAckOK", b""),
|
|
("EOL", None),
|
|
],
|
|
)
|
|
)
|
|
|
|
try:
|
|
resp = sr1(pkt, timeout=timeout, verbose=0)
|
|
except (OSError, PermissionError):
|
|
return None
|
|
|
|
if resp is None:
|
|
return None
|
|
|
|
# Verify it's a SYN-ACK (flags == 0x12)
|
|
from scapy.all import TCP as TCPLayer
|
|
if not resp.haslayer(TCPLayer):
|
|
return None
|
|
if resp[TCPLayer].flags != 0x12: # SYN-ACK
|
|
return None
|
|
|
|
# Send RST to clean up half-open connection
|
|
_send_rst(host, port, src_port, resp)
|
|
|
|
return resp
|
|
|
|
|
|
def _send_rst(
|
|
host: str,
|
|
dport: int,
|
|
sport: int,
|
|
resp: Any,
|
|
) -> None:
|
|
"""Send RST to clean up the half-open connection."""
|
|
try:
|
|
from scapy.all import IP, TCP, send
|
|
rst = (
|
|
IP(dst=host)
|
|
/ TCP(
|
|
sport=sport,
|
|
dport=dport,
|
|
flags="R",
|
|
seq=resp.ack,
|
|
)
|
|
)
|
|
send(rst, verbose=0)
|
|
except Exception: # nosec B110 — best-effort RST cleanup
|
|
pass
|
|
|
|
|
|
# ─── Response parsing ───────────────────────────────────────────────────────
|
|
|
|
def _parse_synack(resp: Any) -> dict[str, Any]:
|
|
"""
|
|
Extract fingerprint fields from a scapy SYN-ACK response packet.
|
|
"""
|
|
from scapy.all import IP, TCP
|
|
|
|
ip_layer = resp[IP]
|
|
tcp_layer = resp[TCP]
|
|
|
|
# IP fields
|
|
ttl = ip_layer.ttl
|
|
df_bit = 1 if (ip_layer.flags & 0x2) else 0 # DF = bit 1
|
|
ip_id = ip_layer.id
|
|
|
|
# TCP fields
|
|
window_size = tcp_layer.window
|
|
|
|
# Parse TCP options
|
|
mss = 0
|
|
window_scale = -1
|
|
sack_ok = 0
|
|
timestamp = 0
|
|
options_order = _extract_options_order(tcp_layer.options)
|
|
|
|
for opt_name, opt_value in tcp_layer.options:
|
|
if opt_name == "MSS":
|
|
mss = opt_value
|
|
elif opt_name == "WScale":
|
|
window_scale = opt_value
|
|
elif opt_name in ("SAckOK", "SAck"):
|
|
sack_ok = 1
|
|
elif opt_name == "Timestamp":
|
|
timestamp = 1
|
|
|
|
return {
|
|
"ttl": ttl,
|
|
"window_size": window_size,
|
|
"df_bit": df_bit,
|
|
"ip_id": ip_id,
|
|
"mss": mss,
|
|
"window_scale": window_scale,
|
|
"sack_ok": sack_ok,
|
|
"timestamp": timestamp,
|
|
"options_order": options_order,
|
|
}
|
|
|
|
|
|
def _extract_options_order(options: list[tuple[str, Any]]) -> str:
|
|
"""
|
|
Map scapy TCP option tuples to a short-code string.
|
|
|
|
E.g. [("MSS", 1460), ("NOP", None), ("WScale", 7)] → "M,N,W"
|
|
"""
|
|
codes = []
|
|
for opt_name, _ in options:
|
|
code = _OPT_CODES.get(opt_name, "?")
|
|
codes.append(code)
|
|
return ",".join(codes)
|
|
|
|
|
|
# ─── Fingerprint computation ───────────────────────────────────────────────
|
|
|
|
def _compute_fingerprint(fields: dict[str, Any]) -> tuple[str, str]:
|
|
"""
|
|
Compute fingerprint raw string and SHA256 hash from parsed fields.
|
|
|
|
Returns (raw_string, hash_hex_32).
|
|
"""
|
|
raw = (
|
|
f"{fields['ttl']}:{fields['window_size']}:{fields['df_bit']}:"
|
|
f"{fields['mss']}:{fields['window_scale']}:{fields['sack_ok']}:"
|
|
f"{fields['timestamp']}:{fields['options_order']}"
|
|
)
|
|
h = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32]
|
|
return raw, h
|
|
|
|
|
|
# ─── Public API ─────────────────────────────────────────────────────────────
|
|
|
|
@_traced("prober.tcp_fingerprint")
|
|
def tcp_fingerprint(
|
|
host: str,
|
|
port: int,
|
|
timeout: float = 5.0,
|
|
) -> dict[str, Any] | None:
|
|
"""
|
|
Send a TCP SYN to host:port and fingerprint the SYN-ACK response.
|
|
|
|
Returns a dict with the hash, raw fingerprint string, and individual
|
|
fields, or None if no SYN-ACK was received.
|
|
|
|
Requires root/CAP_NET_RAW.
|
|
"""
|
|
resp = _send_syn(host, port, timeout)
|
|
if resp is None:
|
|
return None
|
|
|
|
fields = _parse_synack(resp)
|
|
raw, h = _compute_fingerprint(fields)
|
|
|
|
return {
|
|
"tcpfp_hash": h,
|
|
"tcpfp_raw": raw,
|
|
**fields,
|
|
}
|