feat(prober/osfp): p0f v2 .fp parser + Signature scoring

First code layer of the OS-fingerprinting work on top of yesterday's
vendored p0f v2 database. Three new modules, all pure (no I/O outside
of the parser's file read):

- decnet/prober/osfp/base.py — Provider protocol + OsMatch dataclass
  matching the established Provider convention in decnet/geoip and
  decnet/bus. Docstring spells out the never-raise invariant: malformed
  input returns None, so a single bad event can't wedge a whole
  attacker-profile rebuild.

- decnet/prober/osfp/p0f/signature.py — Signature dataclass + three
  predicate helpers (WindowSpec / IntSpec / OptionToken) encoding the
  p0f v2 DSL's wildcard / modulo / MSS-multiple / MTU-multiple
  semantics. Scoring is our extension on top of upstream p0f's
  first-match-wins policy: each signature carries a precomputed
  specificity in [0, 1] so the factory can pick the most-specific
  match when multiple signatures fire against one observation.

- decnet/prober/osfp/p0f/format.py — .fp line parser. Every shipped
  field variant from the DSL spec at the top of p0f.fp is covered
  (Snn / Tnn / %nnn / * for window; T0 vs T; -/@/* os-genre prefixes;
  quirks as concatenated single-letter flags; '.' sentinels for
  no-options / no-quirks). Malformed lines log a warning and skip
  instead of aborting the whole file — 1 bad row must not cost the
  other 374.

20 parser tests + 14 scoring tests. Full vendored-DB smoke tests
confirm all 375 signatures parse round-trip (262 SYN + 61 SYN-ACK +
46 RST + 6 stray) and every computed specificity lands in [0, 1].
This commit is contained in:
2026-04-24 11:47:54 -04:00
parent 620e1f5b1d
commit 41ff6b4b03
6 changed files with 857 additions and 0 deletions

View File

@@ -0,0 +1,59 @@
"""OS-fingerprint provider protocol + OsMatch result shape.
Each concrete provider (p0f v2 today; nmap-osdb / DECNET-observed DB
later) implements `Provider`. Callers go through
:func:`decnet.prober.osfp.factory.get_provider` or
:func:`decnet.prober.osfp.factory.get_all_providers` — direct imports
of a concrete class are forbidden, mirroring the convention in
``decnet/geoip`` and ``decnet/bus``.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Optional
@dataclass(frozen=True)
class OsMatch:
"""The result of matching an observation against a provider's DB.
Consumers should prefer higher ``confidence``. Providers compute
confidence as the fraction of signature fields that matched exactly
(vs. wildcard / modulo / "any" predicates) — a signature with every
field constrained scoring 1.0, one with every field wildcarded
approaching 0.0. This is explicit so the profiler can pick the
most-specific match when multiple providers fire.
"""
os: str
flavor: str
confidence: float
provider: str
is_userland: bool = False
def __str__(self) -> str:
tag = "userland" if self.is_userland else self.os
return f"{tag} {self.flavor} ({self.confidence:.2f} via {self.provider})"
class Provider(ABC):
"""Abstract OS-fingerprint source.
Providers consume a dict of observed TCP/IP quirks (``window``,
``wscale``, ``mss``, ``options_sig``, ``ttl``, ``df``,
``total_len``, ``quirks`` — not all fields required) and return a
best-match :class:`OsMatch` or ``None`` when nothing matches.
Providers MUST NOT raise on malformed or partial input — the
upstream caller (`profiler/fingerprint.py::sniffer_rollup`) runs
on data that may be missing any or all fields depending on the
event mix, and a raising provider would wedge every attacker
profile rebuild. Return ``None`` instead.
"""
name: str
@abstractmethod
def match(self, obs: dict[str, Any]) -> Optional[OsMatch]:
"""Return best-match OsMatch for *obs*, or None."""

View File

@@ -0,0 +1,243 @@
"""p0f v2 ``.fp`` file parser.
Format (from the DSL spec at the top of every shipped ``.fp`` file):
wwww:ttt:D:ss:OOO:QQ:OS:Details
Where:
wwww — window size: literal int | '*' | '%nnn' | 'Snn' | 'Tnn'
ttt — initial TTL (literal int: 32/64/128/255 typically)
D — DF bit: '0' or '1'
ss — total IP packet length: literal int | '*' | '%nnn'
OOO — option order: comma/space-separated tokens, or '.' for none.
Tokens: N, E, S, T, T0, P, Wnnn/W*/W%nnn, Mnnn/M*/M%nnn, ?n
QQ — quirks: concatenated single-letter flags, or '.' for none.
Flags: P, Z, I, U, X, A, T, F, D, !, K, Q, 0, R
OS — genre, optionally prefixed '-' (userland), '@' (group),
'*' (random/bogus), or combinations (e.g. '-@Windows').
Details — free-text flavor/version.
Lines starting with '#' and blank lines are skipped.
"""
from __future__ import annotations
import logging
import re
from pathlib import Path
from typing import Optional
from decnet.prober.osfp.p0f.signature import (
IntSpec,
OptionToken,
Signature,
WindowSpec,
precompute_specificity,
)
logger = logging.getLogger("decnet.prober.osfp.p0f.format")
_OPTION_TOKEN_RE = re.compile(r"^([NESTPE]|T0|[MW\?])(\*|%\d+|\d+)?$")
class P0fParseError(ValueError):
"""Raised on genuinely malformed signature lines. The loader
catches these and skips the offending line with a logger warning —
one bad row doesn't disable the whole DB."""
def parse_p0f_v2(path: Path) -> list[Signature]:
"""Parse a p0f v2 ``.fp`` file and return a list of Signatures.
Malformed lines are logged at WARNING and skipped rather than
aborting the whole load — the vendored DB has ~375 entries and one
corrupt row shouldn't prevent the other 374 from being usable.
"""
out: list[Signature] = []
with path.open("r", encoding="utf-8", errors="replace") as fh:
for lineno, raw in enumerate(fh, 1):
line = raw.strip()
if not line or line.startswith("#"):
continue
try:
sig = _parse_line(line)
except P0fParseError as exc:
logger.warning(
"p0f parse: skipping %s:%d%s", path.name, lineno, exc,
)
continue
out.append(sig)
logger.debug("p0f parse: loaded %d signatures from %s", len(out), path.name)
return out
def _parse_line(line: str) -> Signature:
parts = line.split(":", 7)
if len(parts) < 7:
raise P0fParseError(f"expected 7+ colon-delimited fields, got {len(parts)}")
if len(parts) == 7:
parts = [*parts, ""] # empty details
wss_s, ttl_s, df_s, tot_s, opts_s, quirks_s, os_s, details = parts
wss = _parse_wss(wss_s)
ttl = _parse_int_field(ttl_s, field="ttl")
df = _parse_df(df_s)
total_len = _parse_int_spec(tot_s)
options = _parse_options(opts_s)
quirks = _parse_quirks(quirks_s)
os_name, is_userland, is_approx, is_random = _parse_os_genre(os_s)
sig = Signature(
wss=wss,
ttl=ttl,
df=df,
total_len=total_len,
options=options,
quirks=quirks,
os=os_name,
flavor=details.strip(),
notes="",
is_userland=is_userland,
is_approximate=is_approx,
is_random=is_random,
)
# Replace specificity (frozen dataclass field default) with the
# computed value via dataclasses.replace.
from dataclasses import replace
return replace(sig, specificity=precompute_specificity(sig))
def _parse_wss(s: str) -> WindowSpec:
s = s.strip()
if s == "*":
return WindowSpec("any")
if s.startswith("%"):
try:
return WindowSpec("mod", int(s[1:]))
except ValueError as exc:
raise P0fParseError(f"bad mod window {s!r}") from exc
if s.startswith("S"):
try:
return WindowSpec("mss_mul", int(s[1:]))
except ValueError as exc:
raise P0fParseError(f"bad Snn window {s!r}") from exc
if s.startswith("T"):
try:
return WindowSpec("mtu_mul", int(s[1:]))
except ValueError as exc:
raise P0fParseError(f"bad Tnn window {s!r}") from exc
try:
return WindowSpec("literal", int(s))
except ValueError as exc:
raise P0fParseError(f"bad literal window {s!r}") from exc
def _parse_int_field(s: str, *, field: str) -> int:
"""Parse a bare int field (used for TTL). No wildcards allowed."""
try:
return int(s.strip())
except ValueError as exc:
raise P0fParseError(f"bad {field}: {s!r}") from exc
def _parse_df(s: str) -> Optional[bool]:
s = s.strip()
if s == "*":
return None
if s == "0":
return False
if s == "1":
return True
raise P0fParseError(f"bad DF {s!r}; expected 0/1/*")
def _parse_int_spec(s: str) -> IntSpec:
s = s.strip()
if s == "*":
return IntSpec("any")
if s.startswith("%"):
try:
return IntSpec("mod", int(s[1:]))
except ValueError as exc:
raise P0fParseError(f"bad mod int {s!r}") from exc
try:
return IntSpec("literal", int(s))
except ValueError as exc:
raise P0fParseError(f"bad literal int {s!r}") from exc
def _parse_options(s: str) -> tuple[OptionToken, ...]:
s = s.strip()
if s in (".", ""):
return (OptionToken("."),)
normalized = s.replace(",", " ")
tokens: list[OptionToken] = []
for raw in normalized.split():
tok = raw.strip()
if not tok:
continue
tokens.append(_parse_option_token(tok))
if not tokens:
return (OptionToken("."),)
return tuple(tokens)
def _parse_option_token(raw: str) -> OptionToken:
# T0 — timestamp zero (not the TCP option '?0').
if raw == "T0":
return OptionToken("T0")
m = _OPTION_TOKEN_RE.match(raw)
if not m:
raise P0fParseError(f"bad option token {raw!r}")
kind, val_raw = m.group(1), m.group(2)
if kind in ("N", "E", "S", "T", "P"):
return OptionToken(kind)
# M / W / ? expect a numeric predicate (or wildcard).
if val_raw is None:
raise P0fParseError(f"option {kind!r} missing required value")
if val_raw == "*":
spec = IntSpec("any")
elif val_raw.startswith("%"):
try:
spec = IntSpec("mod", int(val_raw[1:]))
except ValueError as exc:
raise P0fParseError(f"bad {kind} mod value {val_raw!r}") from exc
else:
try:
spec = IntSpec("literal", int(val_raw))
except ValueError as exc:
raise P0fParseError(f"bad {kind} literal value {val_raw!r}") from exc
return OptionToken(kind, spec)
def _parse_quirks(s: str) -> frozenset[str]:
s = s.strip()
if s == "." or not s:
return frozenset()
# Quirks are a concatenated string of single-letter flags. '!' is a
# valid quirk too.
return frozenset(c for c in s if not c.isspace())
def _parse_os_genre(s: str) -> tuple[str, bool, bool, bool]:
"""Strip p0f's genre-prefix modifiers and return (os_name, is_userland, is_approx, is_random)."""
is_userland = False
is_approx = False
is_random = False
s = s.strip()
# Prefixes can stack in any order — strip them all.
changed = True
while changed and s:
changed = False
if s.startswith("-"):
is_userland = True
s = s[1:]
changed = True
elif s.startswith("@"):
is_approx = True
s = s[1:]
changed = True
elif s.startswith("*"):
is_random = True
s = s[1:]
changed = True
return s, is_userland, is_approx, is_random

View File

@@ -0,0 +1,278 @@
"""p0f v2 signature + observation matching/scoring.
A :class:`Signature` is one parsed row from a ``.fp`` file. A match
against an observation dict (the kind ``sniffer_rollup`` hands us)
returns a confidence score in [0, 1], with higher scores indicating
more-specific matches. Wildcards and modulo predicates match but
contribute less to the confidence than an exact literal match, so
when multiple signatures fire against one observation we can pick the
most-specific one.
Observation dict shape (all keys optional — a provider returns None
if too few match-relevant fields are present):
{
"window": int | None, # TCP window size
"mss": int | None, # TCP MSS option value
"wscale": int | None, # TCP window-scale option value
"ttl": int | None, # initial-TTL bucket (32/64/128/255)
"df": bool | None, # IP Don't-Fragment flag
"total_len": int | None, # IP total length (SYN)
"options_sig": str | None, # e.g. "M,N,W,T" or "M1460,N,W7,S"
"quirks": frozenset[str] | None, # e.g. {"Z", "P"}
}
The scoring is our extension — upstream p0f is "first match wins"
using the order of entries in ``.fp``. We score so the factory can
compare across multiple DB files (p0f.fp + p0fa.fp) and return the
winner objectively.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any, Optional
# ─── Field predicates ──────────────────────────────────────────────────────
@dataclass(frozen=True)
class WindowSpec:
"""Parsed 'wss' field. Encodes p0f v2's window-size predicate DSL:
- 'literal' → observed window == value
- 'mss_mul' → observed window == MSS * value (p0f "Snn")
- 'mtu_mul' → observed window == (MSS+40) * value (p0f "Tnn")
- 'mod' → observed window % value == 0 (p0f "%nnn")
- 'any' → wildcard (p0f "*")
"""
kind: str
value: Optional[int] = None
def matches(self, window: Optional[int], mss: Optional[int]) -> bool:
if self.kind == "any":
return True
if window is None:
return False
if self.kind == "literal":
return window == self.value
if self.kind == "mod":
return self.value is not None and self.value > 0 and (window % self.value == 0)
if self.kind == "mss_mul":
return mss is not None and self.value is not None and window == mss * self.value
if self.kind == "mtu_mul":
return mss is not None and self.value is not None and window == (mss + 40) * self.value
return False
@dataclass(frozen=True)
class IntSpec:
"""Wildcard-or-modulo int predicate, used for MSS / wscale / total_len."""
kind: str # 'literal' | 'mod' | 'any'
value: Optional[int] = None
def matches(self, observed: Optional[int]) -> bool:
if self.kind == "any":
return True
if observed is None:
return False
if self.kind == "literal":
return observed == self.value
if self.kind == "mod":
return self.value is not None and self.value > 0 and (observed % self.value == 0)
return False
@dataclass(frozen=True)
class OptionToken:
"""One TCP option as it appears in a signature's options list.
- kind='N' EOL 'E' SACK-permitted 'S' timestamp 'T' zero-timestamp 'T0'
- kind='M' MSS option, value = IntSpec
- kind='W' window-scale option, value = IntSpec
- kind='?' unknown option number, value = IntSpec (literal = option number)
- kind='.' no-options sentinel (singleton — matches only empty option list)
"""
kind: str
value: Optional[IntSpec] = None
def matches_literal(self, token: "OptionToken") -> bool:
"""True when *this* signature token matches an observed *token*.
Signature-side carries the wildcard/modulo predicate; observed
side is always a literal (or kind-only for flag options).
"""
if self.kind != token.kind:
return False
if self.value is None:
return True
if token.value is None:
return False
# Both have IntSpecs — match via predicate.
return self.value.matches(token.value.value)
# ─── Signature ─────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class Signature:
"""One parsed row from a p0f v2 .fp file.
``label_prefix`` captures p0f's os-genre modifiers:
- ``-`` userland stack (not a real OS; flagged scanner/browser)
- ``@`` approximate / group match
- ``*`` random or bogus userland
These prefixes are stripped from ``os``; the flags survive here
for the profiler to decide e.g. "do I promote nmap to tool_guesses?"
"""
wss: WindowSpec
ttl: int
df: Optional[bool]
total_len: IntSpec
options: tuple[OptionToken, ...] # in order; use (OptionToken('.'),) for none
quirks: frozenset[str]
os: str
flavor: str
notes: str
is_userland: bool = False # '-' prefix
is_approximate: bool = False # '@' prefix
is_random: bool = False # '*' prefix (distinct from wildcard)
# Cache: a crude "specificity budget" precomputed at parse time.
# Higher = more constrained fields, used as a tie-breaker when two
# signatures match the same observation.
specificity: float = field(default=0.0)
def score(self, obs: dict[str, Any]) -> Optional[float]:
"""Return a confidence in [0, 1] on match, or None if any field
rejects the observation."""
mss = obs.get("mss")
# Window
if not self.wss.matches(obs.get("window"), mss):
return None
# TTL — initial-TTL bucket must match exactly. The profiler is
# expected to have rounded the observed TTL up to the nearest
# bucket already via decnet.sniffer.p0f.initial_ttl.
obs_ttl = obs.get("ttl")
if obs_ttl is None or obs_ttl != self.ttl:
return None
# DF (None on the sig side = wildcard)
if self.df is not None:
obs_df = obs.get("df")
if obs_df is None or bool(obs_df) != self.df:
return None
# Total length
if not self.total_len.matches(obs.get("total_len")):
return None
# Options
if not _options_match(self.options, obs.get("options_sig")):
return None
# Quirks — must match as a set.
obs_quirks = obs.get("quirks") or frozenset()
if not isinstance(obs_quirks, frozenset):
obs_quirks = frozenset(obs_quirks)
if self.quirks != obs_quirks:
return None
# All fields matched — return the precomputed specificity.
return self.specificity
def _options_match(sig_opts: tuple[OptionToken, ...], obs_sig: Optional[str]) -> bool:
"""Match signature option sequence against observation's comma/space-
separated option string."""
obs_tokens = _parse_observation_options(obs_sig)
# Special case: signature is '.' (no-options sentinel).
if len(sig_opts) == 1 and sig_opts[0].kind == ".":
return len(obs_tokens) == 0
if len(sig_opts) != len(obs_tokens):
return False
return all(s.matches_literal(o) for s, o in zip(sig_opts, obs_tokens))
_OBS_TOKEN_RE = re.compile(r"^([A-Z\?])(\d+)?$")
def _parse_observation_options(opts_sig: Optional[str]) -> list[OptionToken]:
"""Convert the observation-side options string (from
tcp_syn_fingerprint / tcpfp_fingerprint SD fields) into a list of
literal OptionTokens. Accepts comma or space delimiters and tokens
like 'M1460', 'W7', 'T', 'T0', 'N', 'E', '?47'.
"""
if not opts_sig:
return []
normalized = opts_sig.replace(",", " ")
out: list[OptionToken] = []
for raw in normalized.split():
token = raw.strip()
if not token:
continue
if token == "T0": # nosec B105 — TCP option name ("Timestamp zero"), not a credential
out.append(OptionToken("T0"))
continue
m = _OBS_TOKEN_RE.match(token)
if not m:
# Unknown token — represent as opaque "?" with no value so
# nothing matches it. Better than raising.
out.append(OptionToken("?", IntSpec("literal", -1)))
continue
kind, num = m.group(1), m.group(2)
if num is None:
out.append(OptionToken(kind))
else:
out.append(OptionToken(kind, IntSpec("literal", int(num))))
return out
def precompute_specificity(sig: Signature) -> float:
"""Crude specificity score used when comparing matching signatures.
Each field contributes a weight; wildcards and modulo predicates
contribute less. Tuned so a fully-literal signature scores ~1.0 and
a near-wildcard signature scores ~0.1.
"""
w = 0.0
total = 0.0
# Window (weight 3 — very discriminating)
total += 3
if sig.wss.kind == "literal":
w += 3.0
elif sig.wss.kind in ("mss_mul", "mtu_mul"):
w += 2.5
elif sig.wss.kind == "mod":
w += 1.5
# TTL — always literal, contributes a flat 1
total += 1
w += 1.0
# DF (weight 1)
total += 1
if sig.df is not None:
w += 1.0
# Total length (weight 1)
total += 1
if sig.total_len.kind == "literal":
w += 1.0
elif sig.total_len.kind == "mod":
w += 0.5
# Options (weight 3 — highly discriminating when literal)
total += 3
if not (len(sig.options) == 1 and sig.options[0].kind == "."):
literal_opts = sum(
1 for o in sig.options
if o.value is None or o.value.kind == "literal"
)
if sig.options:
w += 3.0 * (literal_opts / len(sig.options))
else:
# "no options" is itself a signal.
w += 2.0
# Quirks (weight 1 — most sigs have no quirks so this is a small edge)
total += 1
if sig.quirks:
w += 1.0
return round(w / total, 4)

View File

View File

@@ -0,0 +1,152 @@
"""Tests for the p0f v2 .fp parser (decnet/prober/osfp/p0f/format.py)."""
from __future__ import annotations
from pathlib import Path
import pytest
from decnet.prober.osfp.p0f.format import P0fParseError, _parse_line, parse_p0f_v2
# ─── Line-parser unit tests ──────────────────────────────────────────────────
def test_parse_line_minimal_literal() -> None:
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:2.6.x kernel")
assert sig.os == "Linux"
assert sig.flavor == "2.6.x kernel"
assert sig.ttl == 64
assert sig.df is True
assert sig.wss.kind == "literal" and sig.wss.value == 5840
assert sig.total_len.kind == "literal" and sig.total_len.value == 60
assert len(sig.options) == 5
# First option: MSS=1460
mss_opt = sig.options[0]
assert mss_opt.kind == "M"
assert mss_opt.value is not None and mss_opt.value.value == 1460
assert sig.quirks == frozenset()
assert not sig.is_userland
def test_parse_line_wildcard_window() -> None:
sig = _parse_line("*:128:1:*:M*,S,T,N,W*:.:Windows:XP SP1+")
assert sig.wss.kind == "any"
assert sig.total_len.kind == "any"
assert sig.options[0].kind == "M"
assert sig.options[0].value is not None and sig.options[0].value.kind == "any"
def test_parse_line_mss_multiple_window() -> None:
sig = _parse_line("S4:64:1:60:M*,S,T,N,W*:.:Linux:generic")
assert sig.wss.kind == "mss_mul" and sig.wss.value == 4
def test_parse_line_mtu_multiple_window() -> None:
sig = _parse_line("T3:64:1:60:M*,S,T,N,W*:.:Solaris:10")
assert sig.wss.kind == "mtu_mul" and sig.wss.value == 3
def test_parse_line_modulo_window() -> None:
sig = _parse_line("%8192:64:1:60:M*,S,T,N,W*:.:Linux:probe")
assert sig.wss.kind == "mod" and sig.wss.value == 8192
def test_parse_line_userland_prefix() -> None:
sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:.:-nmap:syn stealth")
assert sig.is_userland is True
assert sig.os == "nmap"
def test_parse_line_combined_prefixes() -> None:
sig = _parse_line("5840:64:1:60:M*:.:-@Windows:fuzzy match")
assert sig.is_userland is True
assert sig.is_approximate is True
assert sig.os == "Windows"
def test_parse_line_quirks_non_empty() -> None:
sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:PZ:Linux:with quirks")
assert sig.quirks == frozenset({"P", "Z"})
def test_parse_line_no_options_sentinel() -> None:
sig = _parse_line("5840:64:1:60:.:.:Linux:barebones")
assert len(sig.options) == 1
assert sig.options[0].kind == "."
def test_parse_line_t0_timestamp_distinct_from_t() -> None:
sig = _parse_line("5840:64:1:60:M*,T0:.:Linux:broken timestamps")
assert sig.options[1].kind == "T0"
def test_parse_line_unknown_option_number() -> None:
sig = _parse_line("5840:64:1:60:M*,?47:.:Weird:stack")
unknown = sig.options[1]
assert unknown.kind == "?"
assert unknown.value is not None and unknown.value.value == 47
def test_parse_line_rejects_too_few_fields() -> None:
with pytest.raises(P0fParseError):
_parse_line("5840:64:1:60")
def test_parse_line_rejects_bad_df() -> None:
with pytest.raises(P0fParseError):
_parse_line("5840:64:X:60:M*:.:Linux:bad")
def test_parse_line_rejects_bad_window_token() -> None:
with pytest.raises(P0fParseError):
_parse_line("Kfoo:64:1:60:M*:.:Linux:bad")
def test_parse_line_rejects_malformed_option() -> None:
with pytest.raises(P0fParseError):
_parse_line("5840:64:1:60:!!!wat:.:Linux:bad")
# ─── File-level tests ────────────────────────────────────────────────────────
def test_parse_file_skips_comments_blanks_bad_lines(tmp_path: Path) -> None:
fp = tmp_path / "test.fp"
fp.write_text(
"# comment\n"
"\n"
"5840:64:1:60:M1460,S,T,N,W7:.:Linux:2.6.x\n"
"# another comment\n"
"garbage line that should skip\n"
"8192:128:1:48:M1460,N,W0,N,N,S:.:Windows:XP\n"
)
sigs = parse_p0f_v2(fp)
assert len(sigs) == 2
assert {s.os for s in sigs} == {"Linux", "Windows"}
def test_parse_vendored_syn_db_fully_loads() -> None:
"""The full vendored p0f.fp MUST parse without losing signatures.
Upstream inventory: 262 SYN signatures. A regression that drops rows
would silently degrade OS-fingerprint coverage."""
data = Path(__file__).resolve().parents[3] / "decnet/prober/osfp/p0f/data/p0f.fp"
sigs = parse_p0f_v2(data)
assert len(sigs) == 262, f"expected 262 SYN sigs, parser returned {len(sigs)}"
def test_parse_vendored_all_four_dbs_fully_load() -> None:
"""Same invariant across all four vendored databases."""
base = Path(__file__).resolve().parents[3] / "decnet/prober/osfp/p0f/data"
expected = {"p0f.fp": 262, "p0fa.fp": 61, "p0fr.fp": 46, "p0fo.fp": 6}
for name, want in expected.items():
sigs = parse_p0f_v2(base / name)
assert len(sigs) == want, f"{name}: expected {want}, got {len(sigs)}"
def test_parse_vendored_specificity_in_range() -> None:
"""Every signature's computed specificity must land in [0, 1]."""
data = Path(__file__).resolve().parents[3] / "decnet/prober/osfp/p0f/data/p0f.fp"
for sig in parse_p0f_v2(data):
assert 0.0 <= sig.specificity <= 1.0, (
f"{sig.os}/{sig.flavor}: specificity out of range ({sig.specificity})"
)

View File

@@ -0,0 +1,125 @@
"""Tests for signature matching + scoring."""
from __future__ import annotations
import pytest
from decnet.prober.osfp.p0f.format import _parse_line
def _obs(**overrides):
"""Baseline observation (Linux 2.6 on Ethernet), overridable."""
base = {
"window": 5840,
"ttl": 64,
"df": True,
"total_len": 60,
"options_sig": "M1460,S,T,N,W7",
"quirks": frozenset(),
"mss": 1460,
"wscale": 7,
}
base.update(overrides)
return base
# ─── Match / no-match ────────────────────────────────────────────────────────
def test_score_exact_match_is_high() -> None:
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:2.6.x literal")
score = sig.score(_obs())
assert score is not None
assert score >= 0.9, f"literal-fields signature should score high, got {score}"
def test_score_wildcard_match_is_lower_than_literal() -> None:
literal = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:literal")
wildcard = _parse_line("*:64:1:*:M*,S,T,N,W*:.:Linux:wildcard")
obs = _obs()
ls = literal.score(obs)
ws = wildcard.score(obs)
assert ls is not None and ws is not None
assert ls > ws, f"literal ({ls}) should outscore wildcard ({ws})"
def test_score_window_mismatch_returns_none() -> None:
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:fixed")
assert sig.score(_obs(window=64240)) is None
def test_score_ttl_mismatch_returns_none() -> None:
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:ttl64")
assert sig.score(_obs(ttl=128)) is None
def test_score_df_mismatch_returns_none() -> None:
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:df-required")
assert sig.score(_obs(df=False)) is None
def test_score_df_wildcard_on_signature_matches_either() -> None:
sig = _parse_line("5840:64:*:60:M1460,S,T,N,W7:.:Linux:any-df")
assert sig.score(_obs(df=True)) is not None
assert sig.score(_obs(df=False)) is not None
def test_score_options_order_mismatch_returns_none() -> None:
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:ordered")
# Same tokens, different order — must NOT match.
assert sig.score(_obs(options_sig="S,T,M1460,N,W7")) is None
def test_score_options_missing_token_returns_none() -> None:
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:5opts")
assert sig.score(_obs(options_sig="M1460,S,T,N")) is None
def test_score_quirks_must_match_as_set() -> None:
sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:PZ:Linux:with PZ")
assert sig.score(_obs(quirks=frozenset({"P", "Z"}))) is not None
assert sig.score(_obs(quirks=frozenset({"P"}))) is None # missing Z
assert sig.score(_obs(quirks=frozenset({"P", "Z", "I"}))) is None # extra I
def test_score_mss_multiple_window() -> None:
# S4 = 4 * MSS. With MSS=1460 → window=5840.
sig = _parse_line("S4:64:1:60:M1460,S,T,N,W7:.:Linux:S4")
assert sig.score(_obs(window=5840, mss=1460)) is not None
# With MSS=536 → S4 expects window=2144
assert sig.score(_obs(window=2144, mss=536)) is not None
assert sig.score(_obs(window=5840, mss=536)) is None
def test_score_modulo_window() -> None:
sig = _parse_line("%8192:64:1:60:M1460,S,T,N,W7:.:Linux:mod8192")
assert sig.score(_obs(window=32768)) is not None
assert sig.score(_obs(window=40960)) is not None
assert sig.score(_obs(window=32769)) is None
def test_score_no_options_sentinel() -> None:
sig = _parse_line("5840:64:1:60:.:.:Linux:no-opts")
assert sig.score(_obs(options_sig="")) is not None
assert sig.score(_obs(options_sig=None)) is not None
assert sig.score(_obs(options_sig="M1460")) is None
def test_score_missing_observation_fields_returns_none() -> None:
"""A signature that requires a specific window can't match when the
observation has no window. This is the safety invariant —
sniffer_rollup may call score() with partial data."""
sig = _parse_line("5840:64:1:60:M1460,S,T,N,W7:.:Linux:strict")
assert sig.score(_obs(window=None)) is None
assert sig.score(_obs(ttl=None)) is None
def test_score_option_value_wildcard_matches_any_literal() -> None:
sig = _parse_line("5840:64:1:60:M*,S,T,N,W*:.:Linux:wild-mss-wscale")
assert sig.score(_obs(options_sig="M1460,S,T,N,W7")) is not None
assert sig.score(_obs(options_sig="M536,S,T,N,W2")) is not None
def test_score_option_value_modulo() -> None:
sig = _parse_line("5840:64:1:60:M%4,S,T,N,W7:.:Linux:mss-mod-4")
assert sig.score(_obs(options_sig="M1460,S,T,N,W7")) is not None # 1460 % 4 == 0
assert sig.score(_obs(options_sig="M1461,S,T,N,W7")) is None