feat: add advanced OS fingerprinting via p0f integration
- decnet/sniffer/fingerprint.py: enhance TCP/IP fingerprinting pipeline - decnet/sniffer/p0f.py: integrate p0f for passive OS classification - Improves attacker profiling accuracy in honeypot interaction analysis
This commit is contained in:
@@ -14,6 +14,8 @@ import struct
|
||||
import time
|
||||
from typing import Any, Callable
|
||||
|
||||
from decnet.prober.tcpfp import _extract_options_order
|
||||
from decnet.sniffer.p0f import guess_os, hop_distance, initial_ttl
|
||||
from decnet.sniffer.syslog import SEVERITY_INFO, SEVERITY_WARNING, syslog_line
|
||||
|
||||
# ─── Constants ───────────────────────────────────────────────────────────────
|
||||
@@ -23,6 +25,10 @@ SERVICE_NAME: str = "sniffer"
|
||||
_SESSION_TTL: float = 60.0
|
||||
_DEDUP_TTL: float = 300.0
|
||||
|
||||
# Inactivity after which a TCP flow is considered closed and its timing
|
||||
# summary is flushed as an event.
|
||||
_FLOW_IDLE_TIMEOUT: float = 120.0
|
||||
|
||||
_GREASE: frozenset[int] = frozenset(0x0A0A + i * 0x1010 for i in range(16))
|
||||
|
||||
_TLS_RECORD_HANDSHAKE: int = 0x16
|
||||
@@ -42,6 +48,38 @@ _EXT_EARLY_DATA: int = 0x002A
|
||||
|
||||
_TCP_SYN: int = 0x02
|
||||
_TCP_ACK: int = 0x10
|
||||
_TCP_FIN: int = 0x01
|
||||
_TCP_RST: int = 0x04
|
||||
|
||||
|
||||
# ─── TCP option extraction for passive fingerprinting ───────────────────────
|
||||
|
||||
def _extract_tcp_fingerprint(tcp_options: list) -> dict[str, Any]:
|
||||
"""
|
||||
Extract MSS, window-scale, SACK, timestamp flags, and the options order
|
||||
signature from a scapy TCP options list.
|
||||
"""
|
||||
mss = 0
|
||||
wscale: int | None = None
|
||||
sack_ok = False
|
||||
has_ts = False
|
||||
for opt_name, opt_value in tcp_options or []:
|
||||
if opt_name == "MSS":
|
||||
mss = opt_value
|
||||
elif opt_name == "WScale":
|
||||
wscale = opt_value
|
||||
elif opt_name in ("SAckOK", "SAck"):
|
||||
sack_ok = True
|
||||
elif opt_name == "Timestamp":
|
||||
has_ts = True
|
||||
options_sig = _extract_options_order(tcp_options or [])
|
||||
return {
|
||||
"mss": mss,
|
||||
"wscale": wscale,
|
||||
"sack_ok": sack_ok,
|
||||
"has_timestamps": has_ts,
|
||||
"options_sig": options_sig,
|
||||
}
|
||||
|
||||
|
||||
# ─── GREASE helpers ──────────────────────────────────────────────────────────
|
||||
@@ -655,6 +693,13 @@ class SnifferEngine:
|
||||
self._tcp_syn: dict[tuple[str, int, str, int], dict[str, Any]] = {}
|
||||
self._tcp_rtt: dict[tuple[str, int, str, int], dict[str, Any]] = {}
|
||||
|
||||
# Per-flow timing aggregator. Key: (src_ip, src_port, dst_ip, dst_port).
|
||||
# Flow direction is client→decky; reverse packets are associated back
|
||||
# to the forward flow so we can track retransmits and inter-arrival.
|
||||
self._flows: dict[tuple[str, int, str, int], dict[str, Any]] = {}
|
||||
self._flow_last_cleanup: float = 0.0
|
||||
self._FLOW_CLEANUP_INTERVAL: float = 30.0
|
||||
|
||||
self._dedup_cache: dict[tuple[str, str, str], float] = {}
|
||||
self._dedup_last_cleanup: float = 0.0
|
||||
self._DEDUP_CLEANUP_INTERVAL: float = 60.0
|
||||
@@ -693,6 +738,16 @@ class SnifferEngine:
|
||||
"|" + fields.get("ja4", "") + "|" + fields.get("ja4s", ""))
|
||||
if event_type == "tls_certificate":
|
||||
return fields.get("subject_cn", "") + "|" + fields.get("issuer", "")
|
||||
if event_type == "tcp_syn_fingerprint":
|
||||
# Dedupe per (OS signature, options layout). One event per unique
|
||||
# stack profile from this attacker IP per dedup window.
|
||||
return fields.get("os_guess", "") + "|" + fields.get("options_sig", "")
|
||||
if event_type == "tcp_flow_timing":
|
||||
# Dedup per (attacker_ip, decky_port) — src_port is deliberately
|
||||
# excluded so a port scanner rotating source ports only produces
|
||||
# one timing event per dedup window. Behavior cadence doesn't
|
||||
# need per-ephemeral-port fidelity.
|
||||
return fields.get("dst_ip", "") + "|" + fields.get("dst_port", "")
|
||||
return fields.get("mechanisms", fields.get("resumption", ""))
|
||||
|
||||
def _is_duplicate(self, event_type: str, fields: dict[str, Any]) -> bool:
|
||||
@@ -719,6 +774,149 @@ class SnifferEngine:
|
||||
line = syslog_line(SERVICE_NAME, node_name, event_type, severity=severity, **fields)
|
||||
self._write_fn(line)
|
||||
|
||||
# ── Flow tracking (per-TCP-4-tuple timing + retransmits) ────────────────
|
||||
|
||||
def _flow_key(
|
||||
self,
|
||||
src_ip: str,
|
||||
src_port: int,
|
||||
dst_ip: str,
|
||||
dst_port: int,
|
||||
) -> tuple[str, int, str, int]:
|
||||
"""
|
||||
Canonicalize a packet to the *client→decky* direction so forward and
|
||||
reverse packets share one flow record.
|
||||
"""
|
||||
if dst_ip in self._ip_to_decky:
|
||||
return (src_ip, src_port, dst_ip, dst_port)
|
||||
# Otherwise src is the decky, flip.
|
||||
return (dst_ip, dst_port, src_ip, src_port)
|
||||
|
||||
def _update_flow(
|
||||
self,
|
||||
flow_key: tuple[str, int, str, int],
|
||||
now: float,
|
||||
seq: int,
|
||||
payload_len: int,
|
||||
direction_forward: bool,
|
||||
) -> None:
|
||||
"""Record one packet into the flow aggregator."""
|
||||
flow = self._flows.get(flow_key)
|
||||
if flow is None:
|
||||
flow = {
|
||||
"start": now,
|
||||
"last": now,
|
||||
"packets": 0,
|
||||
"bytes": 0,
|
||||
"iat_sum": 0.0,
|
||||
"iat_min": float("inf"),
|
||||
"iat_max": 0.0,
|
||||
"iat_count": 0,
|
||||
"forward_seqs": set(),
|
||||
"retransmits": 0,
|
||||
"emitted": False,
|
||||
}
|
||||
self._flows[flow_key] = flow
|
||||
|
||||
if flow["packets"] > 0:
|
||||
iat = now - flow["last"]
|
||||
if iat >= 0:
|
||||
flow["iat_sum"] += iat
|
||||
flow["iat_count"] += 1
|
||||
if iat < flow["iat_min"]:
|
||||
flow["iat_min"] = iat
|
||||
if iat > flow["iat_max"]:
|
||||
flow["iat_max"] = iat
|
||||
|
||||
flow["last"] = now
|
||||
flow["packets"] += 1
|
||||
flow["bytes"] += payload_len
|
||||
|
||||
# Retransmit detection: a forward-direction packet with payload whose
|
||||
# sequence number we've already seen is a retransmit. Empty SYN/ACKs
|
||||
# are excluded because they share seq legitimately.
|
||||
if direction_forward and payload_len > 0:
|
||||
if seq in flow["forward_seqs"]:
|
||||
flow["retransmits"] += 1
|
||||
else:
|
||||
flow["forward_seqs"].add(seq)
|
||||
|
||||
def _flush_flow(
|
||||
self,
|
||||
flow_key: tuple[str, int, str, int],
|
||||
node_name: str,
|
||||
) -> None:
|
||||
"""Emit one `tcp_flow_timing` event for *flow_key* and drop its state.
|
||||
|
||||
Trivial flows (scan probes: 1–2 packets, sub-second duration) are
|
||||
dropped silently — they add noise to the log pipeline without carrying
|
||||
usable behavioral signal (beacon cadence, exfil timing, retransmits
|
||||
are all meaningful only on longer-lived flows).
|
||||
"""
|
||||
flow = self._flows.pop(flow_key, None)
|
||||
if flow is None or flow.get("emitted"):
|
||||
return
|
||||
flow["emitted"] = True
|
||||
|
||||
# Skip uninteresting flows — keep the log pipeline from being flooded
|
||||
# by short-lived scan probes.
|
||||
duration = flow["last"] - flow["start"]
|
||||
if flow["packets"] < 4 and flow["retransmits"] == 0 and duration < 1.0:
|
||||
return
|
||||
|
||||
src_ip, src_port, dst_ip, dst_port = flow_key
|
||||
iat_count = flow["iat_count"]
|
||||
mean_iat_ms = round((flow["iat_sum"] / iat_count) * 1000, 2) if iat_count else 0.0
|
||||
min_iat_ms = round(flow["iat_min"] * 1000, 2) if iat_count else 0.0
|
||||
max_iat_ms = round(flow["iat_max"] * 1000, 2) if iat_count else 0.0
|
||||
duration_s = round(duration, 3)
|
||||
|
||||
self._log(
|
||||
node_name,
|
||||
"tcp_flow_timing",
|
||||
src_ip=src_ip,
|
||||
src_port=str(src_port),
|
||||
dst_ip=dst_ip,
|
||||
dst_port=str(dst_port),
|
||||
packets=str(flow["packets"]),
|
||||
bytes=str(flow["bytes"]),
|
||||
duration_s=str(duration_s),
|
||||
mean_iat_ms=str(mean_iat_ms),
|
||||
min_iat_ms=str(min_iat_ms),
|
||||
max_iat_ms=str(max_iat_ms),
|
||||
retransmits=str(flow["retransmits"]),
|
||||
)
|
||||
|
||||
def flush_all_flows(self) -> None:
|
||||
"""
|
||||
Flush every tracked flow (emit `tcp_flow_timing` events) and drop
|
||||
state. Safe to call from outside the sniff thread; used during
|
||||
shutdown and in tests.
|
||||
"""
|
||||
for key in list(self._flows.keys()):
|
||||
decky = self._ip_to_decky.get(key[2])
|
||||
if decky:
|
||||
self._flush_flow(key, decky)
|
||||
else:
|
||||
self._flows.pop(key, None)
|
||||
|
||||
def _flush_idle_flows(self) -> None:
|
||||
"""Flush any flow whose last packet was more than _FLOW_IDLE_TIMEOUT ago."""
|
||||
now = time.monotonic()
|
||||
if now - self._flow_last_cleanup < self._FLOW_CLEANUP_INTERVAL:
|
||||
return
|
||||
self._flow_last_cleanup = now
|
||||
stale: list[tuple[str, int, str, int]] = [
|
||||
k for k, f in self._flows.items()
|
||||
if now - f["last"] > _FLOW_IDLE_TIMEOUT
|
||||
]
|
||||
for key in stale:
|
||||
decky = self._ip_to_decky.get(key[2])
|
||||
if decky:
|
||||
self._flush_flow(key, decky)
|
||||
else:
|
||||
self._flows.pop(key, None)
|
||||
|
||||
def on_packet(self, pkt: Any) -> None:
|
||||
"""Process a single scapy packet. Called from the sniff thread."""
|
||||
try:
|
||||
@@ -743,21 +941,74 @@ class SnifferEngine:
|
||||
if node_name is None:
|
||||
return
|
||||
|
||||
# TCP SYN tracking for JA4L
|
||||
now = time.monotonic()
|
||||
|
||||
# Per-flow timing aggregation (covers all TCP traffic, not just TLS)
|
||||
flow_key = self._flow_key(src_ip, src_port, dst_ip, dst_port)
|
||||
direction_forward = (flow_key[0] == src_ip and flow_key[1] == src_port)
|
||||
tcp_payload_len = len(bytes(tcp.payload))
|
||||
self._update_flow(
|
||||
flow_key,
|
||||
now=now,
|
||||
seq=int(tcp.seq),
|
||||
payload_len=tcp_payload_len,
|
||||
direction_forward=direction_forward,
|
||||
)
|
||||
self._flush_idle_flows()
|
||||
|
||||
# TCP SYN tracking for JA4L + passive SYN fingerprint
|
||||
if flags & _TCP_SYN and not (flags & _TCP_ACK):
|
||||
key = (src_ip, src_port, dst_ip, dst_port)
|
||||
self._tcp_syn[key] = {"time": time.monotonic(), "ttl": ip.ttl}
|
||||
self._tcp_syn[key] = {"time": now, "ttl": ip.ttl}
|
||||
|
||||
# Emit passive OS fingerprint on the *client* SYN. Only do this
|
||||
# when the destination is a known decky, i.e. we're seeing an
|
||||
# attacker's initial packet.
|
||||
if dst_ip in self._ip_to_decky:
|
||||
tcp_fp = _extract_tcp_fingerprint(list(tcp.options or []))
|
||||
os_label = guess_os(
|
||||
ttl=ip.ttl,
|
||||
window=int(tcp.window),
|
||||
mss=tcp_fp["mss"],
|
||||
wscale=tcp_fp["wscale"],
|
||||
options_sig=tcp_fp["options_sig"],
|
||||
)
|
||||
target_node = self._ip_to_decky[dst_ip]
|
||||
self._log(
|
||||
target_node,
|
||||
"tcp_syn_fingerprint",
|
||||
src_ip=src_ip,
|
||||
src_port=str(src_port),
|
||||
dst_ip=dst_ip,
|
||||
dst_port=str(dst_port),
|
||||
ttl=str(ip.ttl),
|
||||
initial_ttl=str(initial_ttl(ip.ttl)),
|
||||
hop_distance=str(hop_distance(ip.ttl)),
|
||||
window=str(int(tcp.window)),
|
||||
mss=str(tcp_fp["mss"]),
|
||||
wscale=("" if tcp_fp["wscale"] is None else str(tcp_fp["wscale"])),
|
||||
options_sig=tcp_fp["options_sig"],
|
||||
has_sack=str(tcp_fp["sack_ok"]).lower(),
|
||||
has_timestamps=str(tcp_fp["has_timestamps"]).lower(),
|
||||
os_guess=os_label,
|
||||
)
|
||||
|
||||
elif flags & _TCP_SYN and flags & _TCP_ACK:
|
||||
rev_key = (dst_ip, dst_port, src_ip, src_port)
|
||||
syn_data = self._tcp_syn.pop(rev_key, None)
|
||||
if syn_data:
|
||||
rtt_ms = round((time.monotonic() - syn_data["time"]) * 1000, 2)
|
||||
rtt_ms = round((now - syn_data["time"]) * 1000, 2)
|
||||
self._tcp_rtt[rev_key] = {
|
||||
"rtt_ms": rtt_ms,
|
||||
"client_ttl": syn_data["ttl"],
|
||||
}
|
||||
|
||||
# Flush flow on FIN/RST (terminal packets).
|
||||
if flags & (_TCP_FIN | _TCP_RST):
|
||||
decky = self._ip_to_decky.get(flow_key[2])
|
||||
if decky:
|
||||
self._flush_flow(flow_key, decky)
|
||||
|
||||
payload = bytes(tcp.payload)
|
||||
if not payload:
|
||||
return
|
||||
|
||||
235
decnet/sniffer/p0f.py
Normal file
235
decnet/sniffer/p0f.py
Normal file
@@ -0,0 +1,235 @@
|
||||
"""
|
||||
Passive OS fingerprinting (p0f-lite) for the DECNET sniffer.
|
||||
|
||||
Pure-Python lookup module. Given the values of an incoming TCP SYN packet
|
||||
(TTL, window, MSS, window-scale, and TCP option ordering), returns a coarse
|
||||
OS bucket (linux / windows / macos_ios / freebsd / openbsd / nmap / unknown)
|
||||
plus derived hop distance and inferred initial TTL.
|
||||
|
||||
Rationale
|
||||
---------
|
||||
Full p0f v3 distinguishes several dozen OS/tool profiles by combining dozens
|
||||
of low-level quirks (OLEN, WSIZE, EOL padding, PCLASS, quirks, payload class).
|
||||
For DECNET we only need a coarse bucket — enough to tag an attacker as
|
||||
"linux beacon" vs "windows interactive" vs "active scan". The curated
|
||||
table below covers default stacks that dominate real-world attacker traffic.
|
||||
|
||||
References (public p0f v3 DB, nmap-os-db, and Mozilla OS Fingerprint table):
|
||||
https://github.com/p0f/p0f/blob/master/p0f.fp
|
||||
|
||||
No external dependencies.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
# ─── TTL → initial TTL bucket ───────────────────────────────────────────────
|
||||
|
||||
# Common "hop 0" TTLs. Packets decrement TTL once per hop, so we round up
|
||||
# the observed TTL to the nearest known starting value.
|
||||
_TTL_BUCKETS: tuple[int, ...] = (32, 64, 128, 255)
|
||||
|
||||
|
||||
def initial_ttl(ttl: int) -> int:
|
||||
"""
|
||||
Round *ttl* up to the nearest known initial-TTL bucket.
|
||||
|
||||
A SYN with TTL=59 was almost certainly emitted by a Linux/BSD host
|
||||
(initial 64) five hops away; TTL=120 by a Windows host (initial 128)
|
||||
eight hops away.
|
||||
"""
|
||||
for bucket in _TTL_BUCKETS:
|
||||
if ttl <= bucket:
|
||||
return bucket
|
||||
return 255
|
||||
|
||||
|
||||
def hop_distance(ttl: int) -> int:
|
||||
"""
|
||||
Estimate hops between the attacker and the sniffer based on TTL.
|
||||
|
||||
Upper-bounded at 64 (anything further has most likely been mangled
|
||||
by a misconfigured firewall or a TTL-spoofing NAT).
|
||||
"""
|
||||
dist = initial_ttl(ttl) - ttl
|
||||
if dist < 0:
|
||||
return 0
|
||||
if dist > 64:
|
||||
return 64
|
||||
return dist
|
||||
|
||||
|
||||
# ─── OS signature table (TTL bucket, window, MSS, wscale, option-order) ─────
|
||||
|
||||
# Each entry is a set of loose predicates. If all predicates match, the
|
||||
# OS label is returned. First-match wins. `None` means "don't care".
|
||||
#
|
||||
# The option signatures use the short-code alphabet from
|
||||
# decnet/prober/tcpfp.py :: _OPT_CODES (M=MSS, N=NOP, W=WScale,
|
||||
# T=Timestamp, S=SAckOK, E=EOL).
|
||||
|
||||
_SIGNATURES: tuple[tuple[dict, str], ...] = (
|
||||
# ── nmap -sS / -sT default probe ───────────────────────────────────────
|
||||
# nmap crafts very distinctive SYNs: tiny window (1024/4096/etc.), full
|
||||
# option set including WScale=10 and SAckOK. Match these first so they
|
||||
# don't get misclassified as Linux.
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 64,
|
||||
"window_in": {1024, 2048, 3072, 4096, 31337, 32768, 65535},
|
||||
"mss": 1460,
|
||||
"wscale": 10,
|
||||
"options": "M,W,T,S,S",
|
||||
},
|
||||
"nmap",
|
||||
),
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 64,
|
||||
"window_in": {1024, 2048, 3072, 4096, 31337, 32768, 65535},
|
||||
"options_starts_with": "M,W,T,S",
|
||||
},
|
||||
"nmap",
|
||||
),
|
||||
# ── macOS / iOS default SYN (match before Linux — shares TTL 64) ──────
|
||||
# TTL 64, window 65535, MSS 1460, WScale 6, specific option order
|
||||
# M,N,W,N,N,T,S,E (Darwin signature with EOL padding).
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 64,
|
||||
"window": 65535,
|
||||
"wscale": 6,
|
||||
"options": "M,N,W,N,N,T,S,E",
|
||||
},
|
||||
"macos_ios",
|
||||
),
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 64,
|
||||
"window_in": {65535},
|
||||
"wscale_in": {5, 6},
|
||||
"has_timestamps": True,
|
||||
"options_ends_with": "E",
|
||||
},
|
||||
"macos_ios",
|
||||
),
|
||||
# ── FreeBSD default SYN (TTL 64, no EOL) ───────────────────────────────
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 64,
|
||||
"window": 65535,
|
||||
"wscale": 6,
|
||||
"has_sack": True,
|
||||
"has_timestamps": True,
|
||||
"options_no_eol": True,
|
||||
},
|
||||
"freebsd",
|
||||
),
|
||||
# ── Linux (kernel 3.x – 6.x) default SYN ───────────────────────────────
|
||||
# TTL 64, window 29200 / 64240 / 65535, MSS 1460, WScale 7, full options.
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 64,
|
||||
"window_min": 5000,
|
||||
"wscale_in": {6, 7, 8, 9, 10, 11, 12, 13, 14},
|
||||
"has_sack": True,
|
||||
"has_timestamps": True,
|
||||
},
|
||||
"linux",
|
||||
),
|
||||
# ── OpenBSD default SYN ─────────────────────────────────────────────────
|
||||
# TTL 64, window 16384, WScale 3-6, MSS 1460
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 64,
|
||||
"window_in": {16384, 16960},
|
||||
"wscale_in": {3, 4, 5, 6},
|
||||
},
|
||||
"openbsd",
|
||||
),
|
||||
# ── Windows 10/11/Server default SYN ────────────────────────────────────
|
||||
# TTL 128, window 64240/65535, MSS 1460, WScale 8, SACK+TS
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 128,
|
||||
"window_min": 8192,
|
||||
"wscale_in": {2, 6, 7, 8},
|
||||
"has_sack": True,
|
||||
},
|
||||
"windows",
|
||||
),
|
||||
# ── Windows 7/XP (legacy) ───────────────────────────────────────────────
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 128,
|
||||
"window_in": {8192, 16384, 65535},
|
||||
},
|
||||
"windows",
|
||||
),
|
||||
# ── Embedded / Cisco / network gear ─────────────────────────────────────
|
||||
(
|
||||
{
|
||||
"ttl_bucket": 255,
|
||||
},
|
||||
"embedded",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _match_signature(
|
||||
sig: dict,
|
||||
ttl: int,
|
||||
window: int,
|
||||
mss: int,
|
||||
wscale: int | None,
|
||||
options_sig: str,
|
||||
) -> bool:
|
||||
"""Evaluate every predicate in *sig* against the observed values."""
|
||||
tb = initial_ttl(ttl)
|
||||
if "ttl_bucket" in sig and sig["ttl_bucket"] != tb:
|
||||
return False
|
||||
if "window" in sig and sig["window"] != window:
|
||||
return False
|
||||
if "window_in" in sig and window not in sig["window_in"]:
|
||||
return False
|
||||
if "window_min" in sig and window < sig["window_min"]:
|
||||
return False
|
||||
if "mss" in sig and sig["mss"] != mss:
|
||||
return False
|
||||
if "wscale" in sig and sig["wscale"] != wscale:
|
||||
return False
|
||||
if "wscale_in" in sig and wscale not in sig["wscale_in"]:
|
||||
return False
|
||||
if "has_sack" in sig:
|
||||
if sig["has_sack"] != ("S" in options_sig):
|
||||
return False
|
||||
if "has_timestamps" in sig:
|
||||
if sig["has_timestamps"] != ("T" in options_sig):
|
||||
return False
|
||||
if "options" in sig and sig["options"] != options_sig:
|
||||
return False
|
||||
if "options_starts_with" in sig and not options_sig.startswith(sig["options_starts_with"]):
|
||||
return False
|
||||
if "options_ends_with" in sig and not options_sig.endswith(sig["options_ends_with"]):
|
||||
return False
|
||||
if "options_no_eol" in sig and sig["options_no_eol"] and "E" in options_sig:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def guess_os(
|
||||
ttl: int,
|
||||
window: int,
|
||||
mss: int = 0,
|
||||
wscale: int | None = None,
|
||||
options_sig: str = "",
|
||||
) -> str:
|
||||
"""
|
||||
Return a coarse OS bucket for the given SYN characteristics.
|
||||
|
||||
One of: "linux", "windows", "macos_ios", "freebsd", "openbsd",
|
||||
"embedded", "nmap", "unknown".
|
||||
"""
|
||||
for sig, label in _SIGNATURES:
|
||||
if _match_signature(sig, ttl, window, mss, wscale, options_sig):
|
||||
return label
|
||||
return "unknown"
|
||||
Reference in New Issue
Block a user