diff --git a/decnet/prober/osfp/__init__.py b/decnet/prober/osfp/__init__.py index dc02bc5f..0589966e 100644 --- a/decnet/prober/osfp/__init__.py +++ b/decnet/prober/osfp/__init__.py @@ -1,12 +1,27 @@ """Passive + active OS fingerprinting providers. Consumed by the profiler's `sniffer_rollup` (and, longer-term, by a -dedicated prober pass). Each provider implements `base.Provider`: given a -dict of observed TCP/IP quirks (window, wscale, mss, options signature, -TTL, etc.), return a best-match OS label with confidence. +dedicated prober pass). Each provider implements `base.Provider`: given +a dict of observed TCP/IP quirks (window, wscale, mss, options +signature, TTL, etc.), return a best-match OS label with confidence. Layout mirrors `decnet/geoip/` and `decnet/bus/`: `base.py` defines the -protocol, `factory.py` is the only sanctioned accessor, and each concrete -source (p0f, eventually nmap-osdb / our own curated DB) lives in its own -subpackage. Don't import concrete provider classes directly. +protocol, `factory.py` is the only sanctioned accessor, and each +concrete source (p0f today, nmap-osdb / DECNET-observed later) lives in +its own subpackage. Don't import concrete provider classes directly — +use :func:`factory.get_provider` or :func:`factory.get_all_providers`. """ +from decnet.prober.osfp.base import OsMatch, Provider +from decnet.prober.osfp.factory import ( + get_all_providers, + get_provider, + reset_cache, +) + +__all__ = [ + "OsMatch", + "Provider", + "get_all_providers", + "get_provider", + "reset_cache", +] diff --git a/decnet/prober/osfp/factory.py b/decnet/prober/osfp/factory.py new file mode 100644 index 00000000..ee84f307 --- /dev/null +++ b/decnet/prober/osfp/factory.py @@ -0,0 +1,87 @@ +"""OS-fingerprint provider factory. + +Dispatch is env-driven (``DECNET_OSFP_PROVIDERS``, comma-separated), +with ``p0f-v2`` as the current default. Structure mirrors +:mod:`decnet.geoip.factory` exactly: lazy singletons, a ``reset_cache`` +for tests, no dialect-specific globals past this module. + +Callers have two entry points: + +- :func:`get_provider` — fetch one provider by name (or the default). + Used by anything that wants a single authoritative answer. +- :func:`get_all_providers` — fetch the full priority chain as a list. + Used by the profiler's :func:`~decnet.profiler.fingerprint.sniffer_rollup` + to try each provider in turn and take the highest-confidence match + across all of them. + +Reserved names ``dbip`` / ``maxmind`` don't apply here — we use +``nmap-osdb`` (pending Fyodor's grant) and ``decnet-observed`` (our +own DB of honeypot-captured signatures) as the reserved slots that +raise :class:`NotImplementedError` until their subpackages ship. +""" +from __future__ import annotations + +import os +from typing import Optional + +from decnet.prober.osfp.base import Provider + + +_DEFAULT_PROVIDERS = "p0f-v2" + +# Lazy singletons, one per name, keyed by the env-selected order so +# resetting the env (via reset_cache in tests) rebuilds cleanly. +_cached: dict[str, Provider] = {} + + +def _configured_names() -> list[str]: + raw = os.environ.get("DECNET_OSFP_PROVIDERS", _DEFAULT_PROVIDERS) + return [n.strip() for n in raw.split(",") if n.strip()] + + +def _build(name: str) -> Provider: + if name == "p0f-v2": + from decnet.prober.osfp.p0f.provider import P0fV2Provider + return P0fV2Provider() + if name in ("nmap-osdb", "decnet-observed"): + raise NotImplementedError( + f"OS-fingerprint provider {name!r} is reserved but not yet wired." + ) + raise ValueError(f"Unsupported OS-fingerprint provider: {name!r}") + + +def get_provider(name: Optional[str] = None) -> Provider: + """Return a single provider — *name* if given, otherwise the first + entry of ``DECNET_OSFP_PROVIDERS`` (default ``p0f-v2``). + + Lazily built, memoised. Callers MUST go through this or + :func:`get_all_providers` — direct imports of the concrete + provider class are forbidden per the provider-subpackage convention. + """ + if name is None: + names = _configured_names() + name = names[0] if names else _DEFAULT_PROVIDERS + cached = _cached.get(name) + if cached is not None: + return cached + provider = _build(name) + _cached[name] = provider + return provider + + +def get_all_providers() -> list[Provider]: + """Return every configured provider, in priority order. + + Declared order in ``DECNET_OSFP_PROVIDERS`` IS priority order. The + consumer (``sniffer_rollup``) iterates and picks the best-scoring + match across all of them; a later provider CAN beat an earlier one + if its signature is more specific, so the "priority" is a tiebreaker, + not a short-circuit. + """ + return [get_provider(n) for n in _configured_names()] + + +def reset_cache() -> None: + """Forget memoised providers — tests use this when monkeypatching + ``DECNET_OSFP_PROVIDERS`` or ``decnet/prober/osfp/p0f/data/``.""" + _cached.clear() diff --git a/decnet/prober/osfp/p0f/provider.py b/decnet/prober/osfp/p0f/provider.py new file mode 100644 index 00000000..90c477e6 --- /dev/null +++ b/decnet/prober/osfp/p0f/provider.py @@ -0,0 +1,109 @@ +"""p0f v2 Provider — loads the vendored .fp databases and matches +against observed TCP quirks. + +Four databases ship under ``data/``: + + p0f.fp — SYN fingerprints (passive / sniffer-captured inbound). + p0fa.fp — SYN-ACK fingerprints (prober active-probe responses). + p0fr.fp — RST+ fingerprints (reset-response quirks). + p0fo.fp — "stray" packet fingerprints. + +The provider routes incoming observations to the right sig list based +on ``obs["context"]`` — see :meth:`P0fV2Provider.match` — and returns +the highest-specificity matching :class:`OsMatch` or ``None``. + +DECNET-authored additions can land in ``p0f-decnet.fp`` (same +directory, loaded if present) under GPL-3.0. None exist today — the +plan deferred writing any to a later commit — but the provider +already picks it up when it appears. +""" +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any, Optional + +from decnet.prober.osfp.base import OsMatch, Provider +from decnet.prober.osfp.p0f.format import parse_p0f_v2 +from decnet.prober.osfp.p0f.signature import Signature + +logger = logging.getLogger("decnet.prober.osfp.p0f.provider") + + +# Directory containing the vendored .fp files. +_DATA_DIR: Path = Path(__file__).resolve().parent / "data" + +# Which .fp files feed each observation context. +_CONTEXT_DBS: dict[str, tuple[str, ...]] = { + "syn": ("p0f.fp", "p0f-decnet.fp"), + "synack": ("p0fa.fp",), + "rst": ("p0fr.fp",), + "stray": ("p0fo.fp",), +} + + +class P0fV2Provider(Provider): + """Match observations against the p0f v2 database.""" + + name = "p0f-v2" + + def __init__(self, data_dir: Optional[Path] = None) -> None: + self._data_dir = (data_dir or _DATA_DIR).resolve() + self._sigs_by_context: dict[str, list[Signature]] = {} + self._load() + + def _load(self) -> None: + for context, filenames in _CONTEXT_DBS.items(): + merged: list[Signature] = [] + for name in filenames: + path = self._data_dir / name + if not path.is_file(): + # p0f-decnet.fp is optional; all others are required. + if name.startswith("p0f-decnet"): + continue + logger.warning("p0f-v2: missing required DB file %s", path) + continue + try: + merged.extend(parse_p0f_v2(path)) + except OSError as exc: + logger.warning("p0f-v2: could not load %s: %s", path, exc) + self._sigs_by_context[context] = merged + logger.debug("p0f-v2: %s context loaded %d signatures", context, len(merged)) + + def match(self, obs: dict[str, Any]) -> Optional[OsMatch]: + """Return the highest-specificity matching signature, or None. + + ``obs["context"]`` selects the DB slice; default is "syn" + (passive observation, which is 80%+ of the event stream). + Invalid contexts return None rather than raising. + """ + context = obs.get("context", "syn") + sigs = self._sigs_by_context.get(context) + if not sigs: + return None + + best: tuple[float, Signature] | None = None + for sig in sigs: + score = sig.score(obs) + if score is None: + continue + if best is None or score > best[0]: + best = (score, sig) + # Short-circuit on a perfect match — can't beat 1.0. + if best[0] >= 1.0: + break + + if best is None: + return None + score, sig = best + return OsMatch( + os=sig.os, + flavor=sig.flavor, + confidence=score, + provider=self.name, + is_userland=sig.is_userland, + ) + + def signature_counts(self) -> dict[str, int]: + """For diagnostics / tests — how many sigs loaded per context.""" + return {ctx: len(sigs) for ctx, sigs in self._sigs_by_context.items()} diff --git a/tests/prober/osfp/test_provider.py b/tests/prober/osfp/test_provider.py new file mode 100644 index 00000000..5d607d0c --- /dev/null +++ b/tests/prober/osfp/test_provider.py @@ -0,0 +1,177 @@ +"""Integration tests for P0fV2Provider against the vendored .fp data.""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from decnet.prober.osfp import factory, get_all_providers, get_provider +from decnet.prober.osfp.base import OsMatch +from decnet.prober.osfp.p0f.provider import P0fV2Provider + + +@pytest.fixture(autouse=True) +def _reset_factory_cache(): + """Clean singleton between tests so env overrides take effect.""" + factory.reset_cache() + yield + factory.reset_cache() + + +# ─── Provider-level end-to-end ─────────────────────────────────────────────── + + +def test_provider_loads_all_four_contexts() -> None: + p = P0fV2Provider() + counts = p.signature_counts() + assert counts["syn"] == 262, counts + assert counts["synack"] == 61, counts + assert counts["rst"] == 46, counts + assert counts["stray"] == 6, counts + + +def test_match_known_linux_26_signature() -> None: + """Linux 2.6 with window=5840, MSS=1460, wscale=7 is in the + vendored p0f.fp — must resolve to a Linux match.""" + p = P0fV2Provider() + obs = { + "window": 5840, "ttl": 64, "df": True, "total_len": 60, + "options_sig": "M1460,S,T,N,W7", "quirks": frozenset(), + "mss": 1460, "wscale": 7, "context": "syn", + } + match = p.match(obs) + assert match is not None + assert match.os == "Linux" + assert match.provider == "p0f-v2" + assert match.confidence > 0.5 + + +def test_match_returns_none_for_unmatchable_observation() -> None: + p = P0fV2Provider() + # Ridiculous values with no corresponding signature. + obs = { + "window": 999999, "ttl": 64, "df": True, "total_len": 9999, + "options_sig": "?255,?254", "quirks": frozenset(), + "mss": 9999, "wscale": 99, "context": "syn", + } + assert p.match(obs) is None + + +def test_match_unknown_context_returns_none() -> None: + p = P0fV2Provider() + obs = {"window": 5840, "ttl": 64, "df": True, "total_len": 60, + "options_sig": "M1460", "quirks": frozenset(), + "mss": 1460, "context": "impossible"} + assert p.match(obs) is None + + +def test_match_missing_context_defaults_to_syn() -> None: + p = P0fV2Provider() + obs = { + "window": 5840, "ttl": 64, "df": True, "total_len": 60, + "options_sig": "M1460,S,T,N,W7", "quirks": frozenset(), + "mss": 1460, "wscale": 7, + # no 'context' key + } + match = p.match(obs) + assert match is not None + assert match.os == "Linux" + + +def test_match_synack_context_uses_p0fa() -> None: + """Sanity: active-probe SYN-ACK observations resolve against the + 61-sig p0fa.fp list, not the 262-sig p0f.fp list. + + Targeting "S22:64:1:60:M*,S,T,N,W0:AT:Linux:2.2" from p0fa.fp + (ACK quirk + second-timestamp quirk are characteristic of SYN-ACK + responses, distinguishing these sigs from the plain-SYN DB).""" + p = P0fV2Provider() + obs = { + "window": 22 * 1460, "ttl": 64, "df": True, "total_len": 60, + "options_sig": "M1460,S,T,N,W0", + "quirks": frozenset({"A", "T"}), # ACK-nonzero + T2-nonzero + "mss": 1460, "wscale": 0, "context": "synack", + } + match = p.match(obs) + assert match is not None + assert match.os == "Linux" + + +def test_match_returns_highest_specificity_not_first() -> None: + """When multiple signatures can fire, the provider must pick the + most-specific one. Proxy for this: a Linux-style observation that + could be caught by an @generic fallback AND a literal-Linux sig must + resolve to the literal one (higher confidence).""" + p = P0fV2Provider() + obs = { + "window": 5840, "ttl": 64, "df": True, "total_len": 60, + "options_sig": "M1460,S,T,N,W7", "quirks": frozenset(), + "mss": 1460, "wscale": 7, "context": "syn", + } + match = p.match(obs) + # An @generic match would carry is_approximate=True on the underlying + # signature — we can't check that through OsMatch directly, but we can + # check confidence: literal-heavy sigs score notably higher than the + # wildcard-heavy @-fallbacks, so a healthy match is ≥ 0.6. + assert match is not None + assert match.confidence >= 0.6 + + +# ─── Factory dispatch ─────────────────────────────────────────────────────── + + +def test_factory_default_is_p0f_v2() -> None: + p = get_provider() + assert p.name == "p0f-v2" + assert isinstance(p, P0fV2Provider) + + +def test_factory_is_memoised() -> None: + assert get_provider() is get_provider() + + +def test_factory_get_all_providers_returns_list() -> None: + providers = get_all_providers() + assert len(providers) >= 1 + assert providers[0].name == "p0f-v2" + + +def test_factory_env_override_chain(monkeypatch: pytest.MonkeyPatch) -> None: + """Multi-provider chain must preserve declared order.""" + monkeypatch.setenv("DECNET_OSFP_PROVIDERS", "p0f-v2") + factory.reset_cache() + providers = get_all_providers() + assert [p.name for p in providers] == ["p0f-v2"] + + +def test_factory_unsupported_name_raises(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_OSFP_PROVIDERS", "nonexistent-source") + factory.reset_cache() + with pytest.raises(ValueError): + get_provider() + + +def test_factory_reserved_names_raise_not_implemented(monkeypatch: pytest.MonkeyPatch) -> None: + """nmap-osdb and decnet-observed are reserved for future work; the + factory must fail loud rather than silently.""" + for reserved in ("nmap-osdb", "decnet-observed"): + monkeypatch.setenv("DECNET_OSFP_PROVIDERS", reserved) + factory.reset_cache() + with pytest.raises(NotImplementedError): + get_provider() + + +# ─── OsMatch surface ──────────────────────────────────────────────────────── + + +def test_osmatch_str_shows_provider() -> None: + match = OsMatch(os="Linux", flavor="2.6", confidence=0.8, provider="p0f-v2") + s = str(match) + assert "Linux" in s and "2.6" in s and "p0f-v2" in s + + +def test_osmatch_userland_flag_marks_scanner() -> None: + match = OsMatch(os="nmap", flavor="syn-stealth", confidence=0.9, + provider="p0f-v2", is_userland=True) + assert match.is_userland + assert "userland" in str(match).lower()