From 41ff6b4b03899cbddc438f98b46d4d94ec306be2 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 24 Apr 2026 11:47:54 -0400 Subject: [PATCH] feat(prober/osfp): p0f v2 .fp parser + Signature scoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First code layer of the OS-fingerprinting work on top of yesterday's vendored p0f v2 database. Three new modules, all pure (no I/O outside of the parser's file read): - decnet/prober/osfp/base.py — Provider protocol + OsMatch dataclass matching the established Provider convention in decnet/geoip and decnet/bus. Docstring spells out the never-raise invariant: malformed input returns None, so a single bad event can't wedge a whole attacker-profile rebuild. - decnet/prober/osfp/p0f/signature.py — Signature dataclass + three predicate helpers (WindowSpec / IntSpec / OptionToken) encoding the p0f v2 DSL's wildcard / modulo / MSS-multiple / MTU-multiple semantics. Scoring is our extension on top of upstream p0f's first-match-wins policy: each signature carries a precomputed specificity in [0, 1] so the factory can pick the most-specific match when multiple signatures fire against one observation. - decnet/prober/osfp/p0f/format.py — .fp line parser. Every shipped field variant from the DSL spec at the top of p0f.fp is covered (Snn / Tnn / %nnn / * for window; T0 vs T; -/@/* os-genre prefixes; quirks as concatenated single-letter flags; '.' sentinels for no-options / no-quirks). Malformed lines log a warning and skip instead of aborting the whole file — 1 bad row must not cost the other 374. 20 parser tests + 14 scoring tests. Full vendored-DB smoke tests confirm all 375 signatures parse round-trip (262 SYN + 61 SYN-ACK + 46 RST + 6 stray) and every computed specificity lands in [0, 1]. --- decnet/prober/osfp/base.py | 59 ++++++ decnet/prober/osfp/p0f/format.py | 243 ++++++++++++++++++++++++ decnet/prober/osfp/p0f/signature.py | 278 ++++++++++++++++++++++++++++ tests/prober/osfp/__init__.py | 0 tests/prober/osfp/test_format.py | 152 +++++++++++++++ tests/prober/osfp/test_signature.py | 125 +++++++++++++ 6 files changed, 857 insertions(+) create mode 100644 decnet/prober/osfp/base.py create mode 100644 decnet/prober/osfp/p0f/format.py create mode 100644 decnet/prober/osfp/p0f/signature.py create mode 100644 tests/prober/osfp/__init__.py create mode 100644 tests/prober/osfp/test_format.py create mode 100644 tests/prober/osfp/test_signature.py diff --git a/decnet/prober/osfp/base.py b/decnet/prober/osfp/base.py new file mode 100644 index 00000000..01e74991 --- /dev/null +++ b/decnet/prober/osfp/base.py @@ -0,0 +1,59 @@ +"""OS-fingerprint provider protocol + OsMatch result shape. + +Each concrete provider (p0f v2 today; nmap-osdb / DECNET-observed DB +later) implements `Provider`. Callers go through +:func:`decnet.prober.osfp.factory.get_provider` or +:func:`decnet.prober.osfp.factory.get_all_providers` — direct imports +of a concrete class are forbidden, mirroring the convention in +``decnet/geoip`` and ``decnet/bus``. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Optional + + +@dataclass(frozen=True) +class OsMatch: + """The result of matching an observation against a provider's DB. + + Consumers should prefer higher ``confidence``. Providers compute + confidence as the fraction of signature fields that matched exactly + (vs. wildcard / modulo / "any" predicates) — a signature with every + field constrained scoring 1.0, one with every field wildcarded + approaching 0.0. This is explicit so the profiler can pick the + most-specific match when multiple providers fire. + """ + + os: str + flavor: str + confidence: float + provider: str + is_userland: bool = False + + def __str__(self) -> str: + tag = "userland" if self.is_userland else self.os + return f"{tag} {self.flavor} ({self.confidence:.2f} via {self.provider})" + + +class Provider(ABC): + """Abstract OS-fingerprint source. + + Providers consume a dict of observed TCP/IP quirks (``window``, + ``wscale``, ``mss``, ``options_sig``, ``ttl``, ``df``, + ``total_len``, ``quirks`` — not all fields required) and return a + best-match :class:`OsMatch` or ``None`` when nothing matches. + + Providers MUST NOT raise on malformed or partial input — the + upstream caller (`profiler/fingerprint.py::sniffer_rollup`) runs + on data that may be missing any or all fields depending on the + event mix, and a raising provider would wedge every attacker + profile rebuild. Return ``None`` instead. + """ + + name: str + + @abstractmethod + def match(self, obs: dict[str, Any]) -> Optional[OsMatch]: + """Return best-match OsMatch for *obs*, or None.""" diff --git a/decnet/prober/osfp/p0f/format.py b/decnet/prober/osfp/p0f/format.py new file mode 100644 index 00000000..ec666a0e --- /dev/null +++ b/decnet/prober/osfp/p0f/format.py @@ -0,0 +1,243 @@ +"""p0f v2 ``.fp`` file parser. + +Format (from the DSL spec at the top of every shipped ``.fp`` file): + + wwww:ttt:D:ss:OOO:QQ:OS:Details + +Where: + wwww — window size: literal int | '*' | '%nnn' | 'Snn' | 'Tnn' + ttt — initial TTL (literal int: 32/64/128/255 typically) + D — DF bit: '0' or '1' + ss — total IP packet length: literal int | '*' | '%nnn' + OOO — option order: comma/space-separated tokens, or '.' for none. + Tokens: N, E, S, T, T0, P, Wnnn/W*/W%nnn, Mnnn/M*/M%nnn, ?n + QQ — quirks: concatenated single-letter flags, or '.' for none. + Flags: P, Z, I, U, X, A, T, F, D, !, K, Q, 0, R + OS — genre, optionally prefixed '-' (userland), '@' (group), + '*' (random/bogus), or combinations (e.g. '-@Windows'). + Details — free-text flavor/version. + +Lines starting with '#' and blank lines are skipped. +""" +from __future__ import annotations + +import logging +import re +from pathlib import Path +from typing import Optional + +from decnet.prober.osfp.p0f.signature import ( + IntSpec, + OptionToken, + Signature, + WindowSpec, + precompute_specificity, +) + +logger = logging.getLogger("decnet.prober.osfp.p0f.format") + +_OPTION_TOKEN_RE = re.compile(r"^([NESTPE]|T0|[MW\?])(\*|%\d+|\d+)?$") + + +class P0fParseError(ValueError): + """Raised on genuinely malformed signature lines. The loader + catches these and skips the offending line with a logger warning — + one bad row doesn't disable the whole DB.""" + + +def parse_p0f_v2(path: Path) -> list[Signature]: + """Parse a p0f v2 ``.fp`` file and return a list of Signatures. + + Malformed lines are logged at WARNING and skipped rather than + aborting the whole load — the vendored DB has ~375 entries and one + corrupt row shouldn't prevent the other 374 from being usable. + """ + out: list[Signature] = [] + with path.open("r", encoding="utf-8", errors="replace") as fh: + for lineno, raw in enumerate(fh, 1): + line = raw.strip() + if not line or line.startswith("#"): + continue + try: + sig = _parse_line(line) + except P0fParseError as exc: + logger.warning( + "p0f parse: skipping %s:%d — %s", path.name, lineno, exc, + ) + continue + out.append(sig) + logger.debug("p0f parse: loaded %d signatures from %s", len(out), path.name) + return out + + +def _parse_line(line: str) -> Signature: + parts = line.split(":", 7) + if len(parts) < 7: + raise P0fParseError(f"expected 7+ colon-delimited fields, got {len(parts)}") + if len(parts) == 7: + parts = [*parts, ""] # empty details + wss_s, ttl_s, df_s, tot_s, opts_s, quirks_s, os_s, details = parts + + wss = _parse_wss(wss_s) + ttl = _parse_int_field(ttl_s, field="ttl") + df = _parse_df(df_s) + total_len = _parse_int_spec(tot_s) + options = _parse_options(opts_s) + quirks = _parse_quirks(quirks_s) + os_name, is_userland, is_approx, is_random = _parse_os_genre(os_s) + + sig = Signature( + wss=wss, + ttl=ttl, + df=df, + total_len=total_len, + options=options, + quirks=quirks, + os=os_name, + flavor=details.strip(), + notes="", + is_userland=is_userland, + is_approximate=is_approx, + is_random=is_random, + ) + # Replace specificity (frozen dataclass field default) with the + # computed value via dataclasses.replace. + from dataclasses import replace + return replace(sig, specificity=precompute_specificity(sig)) + + +def _parse_wss(s: str) -> WindowSpec: + s = s.strip() + if s == "*": + return WindowSpec("any") + if s.startswith("%"): + try: + return WindowSpec("mod", int(s[1:])) + except ValueError as exc: + raise P0fParseError(f"bad mod window {s!r}") from exc + if s.startswith("S"): + try: + return WindowSpec("mss_mul", int(s[1:])) + except ValueError as exc: + raise P0fParseError(f"bad Snn window {s!r}") from exc + if s.startswith("T"): + try: + return WindowSpec("mtu_mul", int(s[1:])) + except ValueError as exc: + raise P0fParseError(f"bad Tnn window {s!r}") from exc + try: + return WindowSpec("literal", int(s)) + except ValueError as exc: + raise P0fParseError(f"bad literal window {s!r}") from exc + + +def _parse_int_field(s: str, *, field: str) -> int: + """Parse a bare int field (used for TTL). No wildcards allowed.""" + try: + return int(s.strip()) + except ValueError as exc: + raise P0fParseError(f"bad {field}: {s!r}") from exc + + +def _parse_df(s: str) -> Optional[bool]: + s = s.strip() + if s == "*": + return None + if s == "0": + return False + if s == "1": + return True + raise P0fParseError(f"bad DF {s!r}; expected 0/1/*") + + +def _parse_int_spec(s: str) -> IntSpec: + s = s.strip() + if s == "*": + return IntSpec("any") + if s.startswith("%"): + try: + return IntSpec("mod", int(s[1:])) + except ValueError as exc: + raise P0fParseError(f"bad mod int {s!r}") from exc + try: + return IntSpec("literal", int(s)) + except ValueError as exc: + raise P0fParseError(f"bad literal int {s!r}") from exc + + +def _parse_options(s: str) -> tuple[OptionToken, ...]: + s = s.strip() + if s in (".", ""): + return (OptionToken("."),) + normalized = s.replace(",", " ") + tokens: list[OptionToken] = [] + for raw in normalized.split(): + tok = raw.strip() + if not tok: + continue + tokens.append(_parse_option_token(tok)) + if not tokens: + return (OptionToken("."),) + return tuple(tokens) + + +def _parse_option_token(raw: str) -> OptionToken: + # T0 — timestamp zero (not the TCP option '?0'). + if raw == "T0": + return OptionToken("T0") + m = _OPTION_TOKEN_RE.match(raw) + if not m: + raise P0fParseError(f"bad option token {raw!r}") + kind, val_raw = m.group(1), m.group(2) + if kind in ("N", "E", "S", "T", "P"): + return OptionToken(kind) + # M / W / ? expect a numeric predicate (or wildcard). + if val_raw is None: + raise P0fParseError(f"option {kind!r} missing required value") + if val_raw == "*": + spec = IntSpec("any") + elif val_raw.startswith("%"): + try: + spec = IntSpec("mod", int(val_raw[1:])) + except ValueError as exc: + raise P0fParseError(f"bad {kind} mod value {val_raw!r}") from exc + else: + try: + spec = IntSpec("literal", int(val_raw)) + except ValueError as exc: + raise P0fParseError(f"bad {kind} literal value {val_raw!r}") from exc + return OptionToken(kind, spec) + + +def _parse_quirks(s: str) -> frozenset[str]: + s = s.strip() + if s == "." or not s: + return frozenset() + # Quirks are a concatenated string of single-letter flags. '!' is a + # valid quirk too. + return frozenset(c for c in s if not c.isspace()) + + +def _parse_os_genre(s: str) -> tuple[str, bool, bool, bool]: + """Strip p0f's genre-prefix modifiers and return (os_name, is_userland, is_approx, is_random).""" + is_userland = False + is_approx = False + is_random = False + s = s.strip() + # Prefixes can stack in any order — strip them all. + changed = True + while changed and s: + changed = False + if s.startswith("-"): + is_userland = True + s = s[1:] + changed = True + elif s.startswith("@"): + is_approx = True + s = s[1:] + changed = True + elif s.startswith("*"): + is_random = True + s = s[1:] + changed = True + return s, is_userland, is_approx, is_random diff --git a/decnet/prober/osfp/p0f/signature.py b/decnet/prober/osfp/p0f/signature.py new file mode 100644 index 00000000..5c08dec5 --- /dev/null +++ b/decnet/prober/osfp/p0f/signature.py @@ -0,0 +1,278 @@ +"""p0f v2 signature + observation matching/scoring. + +A :class:`Signature` is one parsed row from a ``.fp`` file. A match +against an observation dict (the kind ``sniffer_rollup`` hands us) +returns a confidence score in [0, 1], with higher scores indicating +more-specific matches. Wildcards and modulo predicates match but +contribute less to the confidence than an exact literal match, so +when multiple signatures fire against one observation we can pick the +most-specific one. + +Observation dict shape (all keys optional — a provider returns None +if too few match-relevant fields are present): + + { + "window": int | None, # TCP window size + "mss": int | None, # TCP MSS option value + "wscale": int | None, # TCP window-scale option value + "ttl": int | None, # initial-TTL bucket (32/64/128/255) + "df": bool | None, # IP Don't-Fragment flag + "total_len": int | None, # IP total length (SYN) + "options_sig": str | None, # e.g. "M,N,W,T" or "M1460,N,W7,S" + "quirks": frozenset[str] | None, # e.g. {"Z", "P"} + } + +The scoring is our extension — upstream p0f is "first match wins" +using the order of entries in ``.fp``. We score so the factory can +compare across multiple DB files (p0f.fp + p0fa.fp) and return the +winner objectively. +""" +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Any, Optional + + +# ─── Field predicates ────────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class WindowSpec: + """Parsed 'wss' field. Encodes p0f v2's window-size predicate DSL: + + - 'literal' → observed window == value + - 'mss_mul' → observed window == MSS * value (p0f "Snn") + - 'mtu_mul' → observed window == (MSS+40) * value (p0f "Tnn") + - 'mod' → observed window % value == 0 (p0f "%nnn") + - 'any' → wildcard (p0f "*") + """ + + kind: str + value: Optional[int] = None + + def matches(self, window: Optional[int], mss: Optional[int]) -> bool: + if self.kind == "any": + return True + if window is None: + return False + if self.kind == "literal": + return window == self.value + if self.kind == "mod": + return self.value is not None and self.value > 0 and (window % self.value == 0) + if self.kind == "mss_mul": + return mss is not None and self.value is not None and window == mss * self.value + if self.kind == "mtu_mul": + return mss is not None and self.value is not None and window == (mss + 40) * self.value + return False + + +@dataclass(frozen=True) +class IntSpec: + """Wildcard-or-modulo int predicate, used for MSS / wscale / total_len.""" + + kind: str # 'literal' | 'mod' | 'any' + value: Optional[int] = None + + def matches(self, observed: Optional[int]) -> bool: + if self.kind == "any": + return True + if observed is None: + return False + if self.kind == "literal": + return observed == self.value + if self.kind == "mod": + return self.value is not None and self.value > 0 and (observed % self.value == 0) + return False + + +@dataclass(frozen=True) +class OptionToken: + """One TCP option as it appears in a signature's options list. + + - kind='N' EOL 'E' SACK-permitted 'S' timestamp 'T' zero-timestamp 'T0' + - kind='M' MSS option, value = IntSpec + - kind='W' window-scale option, value = IntSpec + - kind='?' unknown option number, value = IntSpec (literal = option number) + - kind='.' no-options sentinel (singleton — matches only empty option list) + """ + + kind: str + value: Optional[IntSpec] = None + + def matches_literal(self, token: "OptionToken") -> bool: + """True when *this* signature token matches an observed *token*. + + Signature-side carries the wildcard/modulo predicate; observed + side is always a literal (or kind-only for flag options). + """ + if self.kind != token.kind: + return False + if self.value is None: + return True + if token.value is None: + return False + # Both have IntSpecs — match via predicate. + return self.value.matches(token.value.value) + + +# ─── Signature ───────────────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class Signature: + """One parsed row from a p0f v2 .fp file. + + ``label_prefix`` captures p0f's os-genre modifiers: + - ``-`` userland stack (not a real OS; flagged scanner/browser) + - ``@`` approximate / group match + - ``*`` random or bogus userland + These prefixes are stripped from ``os``; the flags survive here + for the profiler to decide e.g. "do I promote nmap to tool_guesses?" + """ + + wss: WindowSpec + ttl: int + df: Optional[bool] + total_len: IntSpec + options: tuple[OptionToken, ...] # in order; use (OptionToken('.'),) for none + quirks: frozenset[str] + os: str + flavor: str + notes: str + is_userland: bool = False # '-' prefix + is_approximate: bool = False # '@' prefix + is_random: bool = False # '*' prefix (distinct from wildcard) + + # Cache: a crude "specificity budget" precomputed at parse time. + # Higher = more constrained fields, used as a tie-breaker when two + # signatures match the same observation. + specificity: float = field(default=0.0) + + def score(self, obs: dict[str, Any]) -> Optional[float]: + """Return a confidence in [0, 1] on match, or None if any field + rejects the observation.""" + mss = obs.get("mss") + # Window + if not self.wss.matches(obs.get("window"), mss): + return None + # TTL — initial-TTL bucket must match exactly. The profiler is + # expected to have rounded the observed TTL up to the nearest + # bucket already via decnet.sniffer.p0f.initial_ttl. + obs_ttl = obs.get("ttl") + if obs_ttl is None or obs_ttl != self.ttl: + return None + # DF (None on the sig side = wildcard) + if self.df is not None: + obs_df = obs.get("df") + if obs_df is None or bool(obs_df) != self.df: + return None + # Total length + if not self.total_len.matches(obs.get("total_len")): + return None + # Options + if not _options_match(self.options, obs.get("options_sig")): + return None + # Quirks — must match as a set. + obs_quirks = obs.get("quirks") or frozenset() + if not isinstance(obs_quirks, frozenset): + obs_quirks = frozenset(obs_quirks) + if self.quirks != obs_quirks: + return None + # All fields matched — return the precomputed specificity. + return self.specificity + + +def _options_match(sig_opts: tuple[OptionToken, ...], obs_sig: Optional[str]) -> bool: + """Match signature option sequence against observation's comma/space- + separated option string.""" + obs_tokens = _parse_observation_options(obs_sig) + # Special case: signature is '.' (no-options sentinel). + if len(sig_opts) == 1 and sig_opts[0].kind == ".": + return len(obs_tokens) == 0 + if len(sig_opts) != len(obs_tokens): + return False + return all(s.matches_literal(o) for s, o in zip(sig_opts, obs_tokens)) + + +_OBS_TOKEN_RE = re.compile(r"^([A-Z\?])(\d+)?$") + + +def _parse_observation_options(opts_sig: Optional[str]) -> list[OptionToken]: + """Convert the observation-side options string (from + tcp_syn_fingerprint / tcpfp_fingerprint SD fields) into a list of + literal OptionTokens. Accepts comma or space delimiters and tokens + like 'M1460', 'W7', 'T', 'T0', 'N', 'E', '?47'. + """ + if not opts_sig: + return [] + normalized = opts_sig.replace(",", " ") + out: list[OptionToken] = [] + for raw in normalized.split(): + token = raw.strip() + if not token: + continue + if token == "T0": # nosec B105 — TCP option name ("Timestamp zero"), not a credential + out.append(OptionToken("T0")) + continue + m = _OBS_TOKEN_RE.match(token) + if not m: + # Unknown token — represent as opaque "?" with no value so + # nothing matches it. Better than raising. + out.append(OptionToken("?", IntSpec("literal", -1))) + continue + kind, num = m.group(1), m.group(2) + if num is None: + out.append(OptionToken(kind)) + else: + out.append(OptionToken(kind, IntSpec("literal", int(num)))) + return out + + +def precompute_specificity(sig: Signature) -> float: + """Crude specificity score used when comparing matching signatures. + + Each field contributes a weight; wildcards and modulo predicates + contribute less. Tuned so a fully-literal signature scores ~1.0 and + a near-wildcard signature scores ~0.1. + """ + w = 0.0 + total = 0.0 + # Window (weight 3 — very discriminating) + total += 3 + if sig.wss.kind == "literal": + w += 3.0 + elif sig.wss.kind in ("mss_mul", "mtu_mul"): + w += 2.5 + elif sig.wss.kind == "mod": + w += 1.5 + # TTL — always literal, contributes a flat 1 + total += 1 + w += 1.0 + # DF (weight 1) + total += 1 + if sig.df is not None: + w += 1.0 + # Total length (weight 1) + total += 1 + if sig.total_len.kind == "literal": + w += 1.0 + elif sig.total_len.kind == "mod": + w += 0.5 + # Options (weight 3 — highly discriminating when literal) + total += 3 + if not (len(sig.options) == 1 and sig.options[0].kind == "."): + literal_opts = sum( + 1 for o in sig.options + if o.value is None or o.value.kind == "literal" + ) + if sig.options: + w += 3.0 * (literal_opts / len(sig.options)) + else: + # "no options" is itself a signal. + w += 2.0 + # Quirks (weight 1 — most sigs have no quirks so this is a small edge) + total += 1 + if sig.quirks: + w += 1.0 + return round(w / total, 4) diff --git a/tests/prober/osfp/__init__.py b/tests/prober/osfp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/prober/osfp/test_format.py b/tests/prober/osfp/test_format.py new file mode 100644 index 00000000..90aef2fc --- /dev/null +++ b/tests/prober/osfp/test_format.py @@ -0,0 +1,152 @@ +"""Tests for the p0f v2 .fp parser (decnet/prober/osfp/p0f/format.py).""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from decnet.prober.osfp.p0f.format import P0fParseError, _parse_line, parse_p0f_v2 + + +# ─── Line-parser unit tests ────────────────────────────────────────────────── + + +def test_parse_line_minimal_literal() -> None: + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:2.6.x kernel") + assert sig.os == "Linux" + assert sig.flavor == "2.6.x kernel" + assert sig.ttl == 64 + assert sig.df is True + assert sig.wss.kind == "literal" and sig.wss.value == 5840 + assert sig.total_len.kind == "literal" and sig.total_len.value == 60 + assert len(sig.options) == 5 + # First option: MSS=1460 + mss_opt = sig.options[0] + assert mss_opt.kind == "M" + assert mss_opt.value is not None and mss_opt.value.value == 1460 + assert sig.quirks == frozenset() + assert not sig.is_userland + + +def test_parse_line_wildcard_window() -> None: + sig = _parse_line("*:128:1:*:M*,S,T,N,W*:.:Windows:XP SP1+") + assert sig.wss.kind == "any" + assert sig.total_len.kind == "any" + assert sig.options[0].kind == "M" + assert sig.options[0].value is not None and sig.options[0].value.kind == "any" + + +def test_parse_line_mss_multiple_window() -> None: + sig = _parse_line("S4:64:1:60:M*,S,T,N,W*:.:Linux:generic") + assert sig.wss.kind == "mss_mul" and sig.wss.value == 4 + + +def test_parse_line_mtu_multiple_window() -> None: + sig = _parse_line("T3:64:1:60:M*,S,T,N,W*:.:Solaris:10") + assert sig.wss.kind == "mtu_mul" and sig.wss.value == 3 + + +def test_parse_line_modulo_window() -> None: + sig = _parse_line("%8192:64:1:60:M*,S,T,N,W*:.:Linux:probe") + assert sig.wss.kind == "mod" and sig.wss.value == 8192 + + +def test_parse_line_userland_prefix() -> None: + sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:.:-nmap:syn stealth") + assert sig.is_userland is True + assert sig.os == "nmap" + + +def test_parse_line_combined_prefixes() -> None: + sig = _parse_line("5840:64:1:60:M*:.:-@Windows:fuzzy match") + assert sig.is_userland is True + assert sig.is_approximate is True + assert sig.os == "Windows" + + +def test_parse_line_quirks_non_empty() -> None: + sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:PZ:Linux:with quirks") + assert sig.quirks == frozenset({"P", "Z"}) + + +def test_parse_line_no_options_sentinel() -> None: + sig = _parse_line("5840:64:1:60:.:.:Linux:barebones") + assert len(sig.options) == 1 + assert sig.options[0].kind == "." + + +def test_parse_line_t0_timestamp_distinct_from_t() -> None: + sig = _parse_line("5840:64:1:60:M*,T0:.:Linux:broken timestamps") + assert sig.options[1].kind == "T0" + + +def test_parse_line_unknown_option_number() -> None: + sig = _parse_line("5840:64:1:60:M*,?47:.:Weird:stack") + unknown = sig.options[1] + assert unknown.kind == "?" + assert unknown.value is not None and unknown.value.value == 47 + + +def test_parse_line_rejects_too_few_fields() -> None: + with pytest.raises(P0fParseError): + _parse_line("5840:64:1:60") + + +def test_parse_line_rejects_bad_df() -> None: + with pytest.raises(P0fParseError): + _parse_line("5840:64:X:60:M*:.:Linux:bad") + + +def test_parse_line_rejects_bad_window_token() -> None: + with pytest.raises(P0fParseError): + _parse_line("Kfoo:64:1:60:M*:.:Linux:bad") + + +def test_parse_line_rejects_malformed_option() -> None: + with pytest.raises(P0fParseError): + _parse_line("5840:64:1:60:!!!wat:.:Linux:bad") + + +# ─── File-level tests ──────────────────────────────────────────────────────── + + +def test_parse_file_skips_comments_blanks_bad_lines(tmp_path: Path) -> None: + fp = tmp_path / "test.fp" + fp.write_text( + "# comment\n" + "\n" + "5840:64:1:60:M1460,S,T,N,W7:.:Linux:2.6.x\n" + "# another comment\n" + "garbage line that should skip\n" + "8192:128:1:48:M1460,N,W0,N,N,S:.:Windows:XP\n" + ) + sigs = parse_p0f_v2(fp) + assert len(sigs) == 2 + assert {s.os for s in sigs} == {"Linux", "Windows"} + + +def test_parse_vendored_syn_db_fully_loads() -> None: + """The full vendored p0f.fp MUST parse without losing signatures. + Upstream inventory: 262 SYN signatures. A regression that drops rows + would silently degrade OS-fingerprint coverage.""" + data = Path(__file__).resolve().parents[3] / "decnet/prober/osfp/p0f/data/p0f.fp" + sigs = parse_p0f_v2(data) + assert len(sigs) == 262, f"expected 262 SYN sigs, parser returned {len(sigs)}" + + +def test_parse_vendored_all_four_dbs_fully_load() -> None: + """Same invariant across all four vendored databases.""" + base = Path(__file__).resolve().parents[3] / "decnet/prober/osfp/p0f/data" + expected = {"p0f.fp": 262, "p0fa.fp": 61, "p0fr.fp": 46, "p0fo.fp": 6} + for name, want in expected.items(): + sigs = parse_p0f_v2(base / name) + assert len(sigs) == want, f"{name}: expected {want}, got {len(sigs)}" + + +def test_parse_vendored_specificity_in_range() -> None: + """Every signature's computed specificity must land in [0, 1].""" + data = Path(__file__).resolve().parents[3] / "decnet/prober/osfp/p0f/data/p0f.fp" + for sig in parse_p0f_v2(data): + assert 0.0 <= sig.specificity <= 1.0, ( + f"{sig.os}/{sig.flavor}: specificity out of range ({sig.specificity})" + ) diff --git a/tests/prober/osfp/test_signature.py b/tests/prober/osfp/test_signature.py new file mode 100644 index 00000000..1ccc45ed --- /dev/null +++ b/tests/prober/osfp/test_signature.py @@ -0,0 +1,125 @@ +"""Tests for signature matching + scoring.""" +from __future__ import annotations + +import pytest + +from decnet.prober.osfp.p0f.format import _parse_line + + +def _obs(**overrides): + """Baseline observation (Linux 2.6 on Ethernet), overridable.""" + base = { + "window": 5840, + "ttl": 64, + "df": True, + "total_len": 60, + "options_sig": "M1460,S,T,N,W7", + "quirks": frozenset(), + "mss": 1460, + "wscale": 7, + } + base.update(overrides) + return base + + +# ─── Match / no-match ──────────────────────────────────────────────────────── + + +def test_score_exact_match_is_high() -> None: + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:2.6.x literal") + score = sig.score(_obs()) + assert score is not None + assert score >= 0.9, f"literal-fields signature should score high, got {score}" + + +def test_score_wildcard_match_is_lower_than_literal() -> None: + literal = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:literal") + wildcard = _parse_line("*:64:1:*:M*,S,T,N,W*:.:Linux:wildcard") + obs = _obs() + ls = literal.score(obs) + ws = wildcard.score(obs) + assert ls is not None and ws is not None + assert ls > ws, f"literal ({ls}) should outscore wildcard ({ws})" + + +def test_score_window_mismatch_returns_none() -> None: + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:fixed") + assert sig.score(_obs(window=64240)) is None + + +def test_score_ttl_mismatch_returns_none() -> None: + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:ttl64") + assert sig.score(_obs(ttl=128)) is None + + +def test_score_df_mismatch_returns_none() -> None: + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:df-required") + assert sig.score(_obs(df=False)) is None + + +def test_score_df_wildcard_on_signature_matches_either() -> None: + sig = _parse_line("5840:64:*:60:M1460,S,T,N,W7:.:Linux:any-df") + assert sig.score(_obs(df=True)) is not None + assert sig.score(_obs(df=False)) is not None + + +def test_score_options_order_mismatch_returns_none() -> None: + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:ordered") + # Same tokens, different order — must NOT match. + assert sig.score(_obs(options_sig="S,T,M1460,N,W7")) is None + + +def test_score_options_missing_token_returns_none() -> None: + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:5opts") + assert sig.score(_obs(options_sig="M1460,S,T,N")) is None + + +def test_score_quirks_must_match_as_set() -> None: + sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:PZ:Linux:with PZ") + assert sig.score(_obs(quirks=frozenset({"P", "Z"}))) is not None + assert sig.score(_obs(quirks=frozenset({"P"}))) is None # missing Z + assert sig.score(_obs(quirks=frozenset({"P", "Z", "I"}))) is None # extra I + + +def test_score_mss_multiple_window() -> None: + # S4 = 4 * MSS. With MSS=1460 → window=5840. + sig = _parse_line("S4:64:1:60:M1460,S,T,N,W7:.:Linux:S4") + assert sig.score(_obs(window=5840, mss=1460)) is not None + # With MSS=536 → S4 expects window=2144 + assert sig.score(_obs(window=2144, mss=536)) is not None + assert sig.score(_obs(window=5840, mss=536)) is None + + +def test_score_modulo_window() -> None: + sig = _parse_line("%8192:64:1:60:M1460,S,T,N,W7:.:Linux:mod8192") + assert sig.score(_obs(window=32768)) is not None + assert sig.score(_obs(window=40960)) is not None + assert sig.score(_obs(window=32769)) is None + + +def test_score_no_options_sentinel() -> None: + sig = _parse_line("5840:64:1:60:.:.:Linux:no-opts") + assert sig.score(_obs(options_sig="")) is not None + assert sig.score(_obs(options_sig=None)) is not None + assert sig.score(_obs(options_sig="M1460")) is None + + +def test_score_missing_observation_fields_returns_none() -> None: + """A signature that requires a specific window can't match when the + observation has no window. This is the safety invariant — + sniffer_rollup may call score() with partial data.""" + sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:strict") + assert sig.score(_obs(window=None)) is None + assert sig.score(_obs(ttl=None)) is None + + +def test_score_option_value_wildcard_matches_any_literal() -> None: + sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:.:Linux:wild-mss-wscale") + assert sig.score(_obs(options_sig="M1460,S,T,N,W7")) is not None + assert sig.score(_obs(options_sig="M536,S,T,N,W2")) is not None + + +def test_score_option_value_modulo() -> None: + sig = _parse_line("5840:64:1:60:M%4,S,T,N,W7:.:Linux:mss-mod-4") + assert sig.score(_obs(options_sig="M1460,S,T,N,W7")) is not None # 1460 % 4 == 0 + assert sig.score(_obs(options_sig="M1461,S,T,N,W7")) is None