From bd4700770bba6f5fdd668461358a0b6e96fe735b Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 21 May 2026 14:27:48 -0400 Subject: [PATCH] refactor(prober): generalise ActiveProbe registry to absorb Ipv6LeakProbe ActiveProbe.run/syslog_fields/publish_payload now accept port=None so non-port-iterating probes can live in the registry. Ipv6LeakProbe replaces the hand-rolled _ipv6_leak_phase special case in worker.py; it runs last via priority=999. _probe_cycle no longer has an ad-hoc phase call. Fixes three stale test files (test_prober_bus, test_prober_rotation, test_prober_worker) that were broken since the 916b21b6 registry refactor. --- decnet/prober/base.py | 14 +- decnet/prober/probes/__init__.py | 1 + decnet/prober/probes/hassh.py | 12 +- decnet/prober/probes/ipv6_leak_probe.py | 62 +++++++ decnet/prober/probes/jarm.py | 12 +- decnet/prober/probes/tcpfp.py | 12 +- decnet/prober/worker.py | 97 ++-------- tests/prober/test_active_probe_registry.py | 47 ++++- tests/prober/test_ipv6_leak.py | 156 ++++++++-------- tests/prober/test_prober_bus.py | 171 +++++++---------- tests/prober/test_prober_rotation.py | 205 ++++++++++----------- tests/prober/test_prober_worker.py | 40 ++-- 12 files changed, 409 insertions(+), 420 deletions(-) create mode 100644 decnet/prober/probes/ipv6_leak_probe.py diff --git a/decnet/prober/base.py b/decnet/prober/base.py index 0da7e9ce..73d25574 100644 --- a/decnet/prober/base.py +++ b/decnet/prober/base.py @@ -48,7 +48,7 @@ class ActiveProbe(metaclass=ActiveProbeMeta): """ probe_name: str - default_ports: list[int] + default_ports: list[int | None] event_type: str rotation_type: ProbeType | None = None rotation_hash_key: str | None = None @@ -59,26 +59,26 @@ class ActiveProbe(metaclass=ActiveProbeMeta): raw = os.environ.get(env_key, "").strip() if raw: try: - self._ports: list[int] = [int(p.strip()) for p in raw.split(",") if p.strip()] + self._ports: list[int | None] = [int(p.strip()) for p in raw.split(",") if p.strip()] except ValueError: self._ports = list(self.default_ports) else: self._ports = list(self.default_ports) @property - def ports(self) -> list[int]: + def ports(self) -> list[int | None]: return self._ports @abstractmethod - def run(self, ip: str, port: int, timeout: float) -> dict[str, Any] | None: - """Execute the probe against ip:port. + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + """Execute the probe against ip:port (port is None for port-free probes). Return a result dict on success, or None to suppress emission (e.g. empty JARM hash means the port doesn't speak TLS). """ @abstractmethod - def syslog_fields(self, ip: str, port: int, result: dict[str, Any]) -> tuple[dict[str, Any], str]: + def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]: """Return (sd_fields, human_msg) for _write_event. target_ip and target_port are injected by _run_probe; do not include @@ -86,5 +86,5 @@ class ActiveProbe(metaclass=ActiveProbeMeta): """ @abstractmethod - def publish_payload(self, ip: str, port: int, result: dict[str, Any]) -> dict[str, Any]: + def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]: """Return the bus payload dict for attacker.fingerprinted events.""" diff --git a/decnet/prober/probes/__init__.py b/decnet/prober/probes/__init__.py index 1e62eaf4..c017bfb3 100644 --- a/decnet/prober/probes/__init__.py +++ b/decnet/prober/probes/__init__.py @@ -1,4 +1,5 @@ # Import all probe modules to trigger ActiveProbeMeta registration. from decnet.prober.probes.hassh import HasshProbe as HasshProbe +from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe as Ipv6LeakProbe from decnet.prober.probes.jarm import JarmProbe as JarmProbe from decnet.prober.probes.tcpfp import TcpfpProbe as TcpfpProbe diff --git a/decnet/prober/probes/hassh.py b/decnet/prober/probes/hassh.py index 28b1749e..8ecbcc49 100644 --- a/decnet/prober/probes/hassh.py +++ b/decnet/prober/probes/hassh.py @@ -6,22 +6,24 @@ from decnet.prober.base import ActiveProbe from decnet.prober.hassh import hassh_server from decnet.telemetry import traced as _traced -DEFAULT_PORTS: list[int] = [22, 2222, 22222, 2022] +DEFAULT_PORTS: list[int | None] = [22, 2222, 22222, 2022] class HasshProbe(ActiveProbe): probe_name = "hassh" - default_ports = DEFAULT_PORTS + default_ports: list[int | None] = DEFAULT_PORTS event_type = "hassh_fingerprint" rotation_type = "hassh" rotation_hash_key = "hassh_server" priority = 100 @_traced("prober.hassh_probe") - def run(self, ip: str, port: int, timeout: float) -> dict[str, Any] | None: + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + if port is None: + return None return hassh_server(ip, port, timeout=timeout) - def syslog_fields(self, ip: str, port: int, result: dict[str, Any]) -> tuple[dict[str, Any], str]: + def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]: fields = { "hassh_server_hash": result["hassh_server"], "ssh_banner": result["banner"], @@ -32,7 +34,7 @@ class HasshProbe(ActiveProbe): } return fields, f"HASSH {ip}:{port} = {result['hassh_server']}" - def publish_payload(self, ip: str, port: int, result: dict[str, Any]) -> dict[str, Any]: + def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]: return { "attacker_ip": ip, "port": port, diff --git a/decnet/prober/probes/ipv6_leak_probe.py b/decnet/prober/probes/ipv6_leak_probe.py new file mode 100644 index 00000000..7eb9563a --- /dev/null +++ b/decnet/prober/probes/ipv6_leak_probe.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from typing import Any + +from decnet.logging import get_logger +from decnet.prober.base import ActiveProbe + +_log = get_logger("prober.ipv6_leak_probe") + + +class Ipv6LeakProbe(ActiveProbe): + """Port-free active probe that solicits a fe80:: response from the attacker. + + Sends ICMPv6 Echo Request to ff02::1 on the attacker's reachable iface + to reveal the attacker's IPv6 IID / MAC-derived address. + + Only fires when the attacker is directly reachable on L2 (no gateway). + Runs last (priority=999) so all TCP-level probes complete first. + """ + + probe_name = "ipv6_leak" + default_ports: list[int | None] = [None] + event_type = "ipv6_link_local_leak" + priority = 999 + + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + from decnet.prober.ipv6_leak import _route_info, solicit_ipv6_leak + on_link, iface = _route_info(ip) + if not on_link: + _log.debug("ipv6_leak_probe: %s is not on-link — skip", ip) + return None + if iface is None: + _log.debug("ipv6_leak_probe: cannot determine iface for %s — skip", ip) + return None + return solicit_ipv6_leak(ip, iface, timeout=timeout) + + def syslog_fields( + self, ip: str, port: int | None, result: dict[str, Any] + ) -> tuple[dict[str, Any], str]: + addr = result.get("addr", "") + iid_kind = result.get("iid_kind", "") + fields = { + "ipv6_addr": addr, + "iid_kind": iid_kind, + "mac_oui": result.get("mac_oui", ""), + "on_iface": result.get("on_iface", ""), + "vector": result.get("vector", ""), + } + return fields, f"IPv6 leak {ip} → {addr} ({iid_kind})" + + def publish_payload( + self, ip: str, port: int | None, result: dict[str, Any] + ) -> dict[str, Any]: + return { + "attacker_ip": ip, + "addr": result.get("addr", ""), + "iid_kind": result.get("iid_kind", ""), + "mac_oui": result.get("mac_oui", ""), + "vector": result.get("vector", ""), + "on_iface": result.get("on_iface", ""), + "observed_at": result.get("observed_at", ""), + } diff --git a/decnet/prober/probes/jarm.py b/decnet/prober/probes/jarm.py index fc9c28c5..bce7f9a3 100644 --- a/decnet/prober/probes/jarm.py +++ b/decnet/prober/probes/jarm.py @@ -6,27 +6,29 @@ from decnet.prober.base import ActiveProbe from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash from decnet.telemetry import traced as _traced -DEFAULT_PORTS: list[int] = [443, 8443, 8080, 4443, 50050, 2222, 993, 995, 8888, 9001] +DEFAULT_PORTS: list[int | None] = [443, 8443, 8080, 4443, 50050, 2222, 993, 995, 8888, 9001] class JarmProbe(ActiveProbe): probe_name = "jarm" - default_ports = DEFAULT_PORTS + default_ports: list[int | None] = DEFAULT_PORTS event_type = "jarm_fingerprint" rotation_type = "jarm" rotation_hash_key = "jarm_hash" priority = 100 @_traced("prober.jarm_probe") - def run(self, ip: str, port: int, timeout: float) -> dict[str, Any] | None: + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + if port is None: + return None h = jarm_hash(ip, port, timeout=timeout) if h == JARM_EMPTY_HASH: return None return {"jarm_hash": h} - def syslog_fields(self, ip: str, port: int, result: dict[str, Any]) -> tuple[dict[str, Any], str]: + def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]: h = result["jarm_hash"] return {"jarm_hash": h}, f"JARM {ip}:{port} = {h}" - def publish_payload(self, ip: str, port: int, result: dict[str, Any]) -> dict[str, Any]: + def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]: return {"attacker_ip": ip, "port": port, "jarm_hash": result["jarm_hash"]} diff --git a/decnet/prober/probes/tcpfp.py b/decnet/prober/probes/tcpfp.py index 69f4b157..78e7939a 100644 --- a/decnet/prober/probes/tcpfp.py +++ b/decnet/prober/probes/tcpfp.py @@ -6,22 +6,24 @@ from decnet.prober.base import ActiveProbe from decnet.prober.tcpfp import tcp_fingerprint from decnet.telemetry import traced as _traced -DEFAULT_PORTS: list[int] = [22, 80, 443, 8080, 8443, 445, 3389] +DEFAULT_PORTS: list[int | None] = [22, 80, 443, 8080, 8443, 445, 3389] class TcpfpProbe(ActiveProbe): probe_name = "tcpfp" - default_ports = DEFAULT_PORTS + default_ports: list[int | None] = DEFAULT_PORTS event_type = "tcpfp_fingerprint" rotation_type = "tcpfp" rotation_hash_key = "tcpfp_hash" priority = 100 @_traced("prober.tcpfp_probe") - def run(self, ip: str, port: int, timeout: float) -> dict[str, Any] | None: + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + if port is None: + return None return tcp_fingerprint(ip, port, timeout=timeout) - def syslog_fields(self, ip: str, port: int, result: dict[str, Any]) -> tuple[dict[str, Any], str]: + def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]: fields = { "tcpfp_hash": result["tcpfp_hash"], "tcpfp_raw": result["tcpfp_raw"], @@ -40,7 +42,7 @@ class TcpfpProbe(ActiveProbe): } return fields, f"TCPFP {ip}:{port} = {result['tcpfp_hash']}" - def publish_payload(self, ip: str, port: int, result: dict[str, Any]) -> dict[str, Any]: + def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]: return { "attacker_ip": ip, "port": port, diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 15aa1481..6939bb4b 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -253,18 +253,19 @@ RotationRecorderFn = Callable[[str, int, "ProbeType", str], None] def _run_probe( probe: ActiveProbe, ip: str, - ip_probed: dict[str, set[int]], + ip_probed: dict[str, set[int | None]], log_path: Path, json_path: Path, timeout: float, publish_fn: ProbePublishFn | None, record_rotation: RotationRecorderFn | None, ) -> None: - """Generic driver for any port-iterating ActiveProbe.""" + """Generic driver for any ActiveProbe (port-iterating or port-free).""" done = ip_probed.setdefault(probe.probe_name, set()) for port in probe.ports: if port in done: continue + port_label = str(port) if port is not None else "-" try: result = probe.run(ip, port, timeout) done.add(port) @@ -275,16 +276,16 @@ def _run_probe( log_path, json_path, probe.event_type, target_ip=ip, - target_port=str(port), + target_port=port_label, msg=msg, **fields, ) - logger.info("prober: %s %s:%d ok", probe.probe_name, ip, port) - if record_rotation is not None and probe.rotation_type and probe.rotation_hash_key: + logger.info("prober: %s %s:%s ok", probe.probe_name, ip, port_label) + if record_rotation is not None and probe.rotation_type and probe.rotation_hash_key and port is not None: record_rotation(ip, port, probe.rotation_type, result[probe.rotation_hash_key]) if publish_fn is not None: publish_fn(probe.probe_name, probe.publish_payload(ip, port, result)) - if probe.probe_name == "jarm": + if probe.probe_name == "jarm" and port is not None: # A non-empty JARM hash proves TLS; attempt a real cert capture. _capture_tls_cert(ip, port, log_path, json_path, timeout, publish_fn) except Exception as exc: @@ -294,17 +295,17 @@ def _run_probe( "prober_error", severity=_SEVERITY_WARNING, target_ip=ip, - target_port=str(port), + target_port=port_label, error=str(exc), - msg=f"{probe.probe_name} probe failed for {ip}:{port}: {exc}", + msg=f"{probe.probe_name} probe failed for {ip}:{port_label}: {exc}", ) - logger.warning("prober: %s probe failed %s:%d: %s", probe.probe_name, ip, port, exc) + logger.warning("prober: %s probe failed %s:%s: %s", probe.probe_name, ip, port_label, exc) @_traced("prober.probe_cycle") def _probe_cycle( targets: set[str], - probed: dict[str, dict[str, set[int]]], + probed: dict[str, dict[str, set[int | None]]], log_path: Path, json_path: Path, timeout: float = 5.0, @@ -314,17 +315,14 @@ def _probe_cycle( """Probe all known attacker IPs via every registered ActiveProbe. Probes run in (priority, probe_name) order per ActiveProbeMeta.all(). - IPv6 leak runs last — it is not port-iterating and stays a special case. + Port-free probes (e.g. Ipv6LeakProbe, priority=999) run last by convention. """ for ip in sorted(targets): ip_probed = probed.setdefault(ip, {}) - for probe_cls in ActiveProbeMeta.all(): _run_probe(probe_cls(), ip, ip_probed, log_path, json_path, timeout, publish_fn, record_rotation) - _ipv6_leak_phase(ip, ip_probed, log_path, json_path, timeout, publish_fn) - @_traced("prober.tls_cert_capture") def _capture_tls_cert( @@ -380,73 +378,6 @@ def _capture_tls_cert( ) -@_traced("prober.ipv6_leak_phase") -def _ipv6_leak_phase( - ip: str, - ip_probed: dict[str, set[int]], - log_path: Path, - json_path: Path, - timeout: float, - publish_fn: ProbePublishFn | None = None, -) -> None: - """Attempt active ICMPv6 solicitation to elicit a fe80:: response. - - Skipped when: - - already attempted for this attacker in this cycle - - attacker is not on a directly connected (link-local reachable) L2 - - scapy unavailable or the local iface has no fe80:: address - """ - done = ip_probed.setdefault("ipv6_leak", set()) - # Use port 0 as a sentinel (no port concept for ICMPv6 probes). - if 0 in done: - return - done.add(0) - - from decnet.prober.ipv6_leak import _route_info, solicit_ipv6_leak - - on_link, iface = _route_info(ip) - if not on_link: - logger.debug("prober: ipv6_leak: %s is not on-link — skip active probe", ip) - return - if iface is None: - logger.debug("prober: ipv6_leak: cannot determine iface for %s", ip) - return - - try: - evidence = solicit_ipv6_leak(ip, iface, timeout=timeout) - except Exception as exc: - logger.warning("prober: ipv6_leak active probe failed %s: %s", ip, exc) - return - - if evidence is None: - return - - _write_event( - log_path, json_path, - "ipv6_link_local_leak", - target_ip=ip, - ipv6_addr=evidence.get("addr", ""), - iid_kind=evidence.get("iid_kind", ""), - mac_oui=evidence.get("mac_oui", ""), - on_iface=evidence.get("on_iface", ""), - vector=evidence.get("vector", ""), - msg=f"IPv6 leak {ip} → {evidence.get('addr', '')} ({evidence.get('iid_kind', '')})", - ) - logger.info( - "prober: ipv6_leak %s → %s kind=%s oui=%s", - ip, evidence.get("addr"), evidence.get("iid_kind"), evidence.get("mac_oui"), - ) - if publish_fn is not None: - publish_fn("ipv6_leak", { - "attacker_ip": ip, - "addr": evidence.get("addr", ""), - "iid_kind": evidence.get("iid_kind", ""), - "mac_oui": evidence.get("mac_oui", ""), - "vector": evidence.get("vector", ""), - "on_iface": evidence.get("on_iface", ""), - "observed_at": evidence.get("observed_at", ""), - }) - # ─── Main worker ───────────────────────────────────────────────────────────── @@ -461,7 +392,7 @@ async def prober_worker( Discovers attacker IPs automatically by tailing the JSON log file, then fingerprints each IP via every registered ActiveProbe (JARM, - HASSH, TCP/IP stack) plus the IPv6 leak special case. + HASSH, TCP/IP stack, IPv6 leak) in priority order. Per-probe port lists are taken from each probe's ``default_ports`` attribute. Override at runtime via DECNET_PROBE_PORTS_ @@ -495,7 +426,7 @@ async def prober_worker( ) known_attackers: set[str] = set() - probed: dict[str, dict[str, set[int]]] = {} # IP -> {type -> ports} + probed: dict[str, dict[str, set[int | None]]] = {} # IP -> {probe_name -> ports/None} log_position: int = 0 loop = asyncio.get_running_loop() diff --git a/tests/prober/test_active_probe_registry.py b/tests/prober/test_active_probe_registry.py index c2d57741..44388a5e 100644 --- a/tests/prober/test_active_probe_registry.py +++ b/tests/prober/test_active_probe_registry.py @@ -21,33 +21,64 @@ def _restore_registry(): class TestRegistryContents: - def test_all_three_probes_registered(self): + def test_all_probes_registered(self): names = {cls.probe_name for cls in ActiveProbeMeta.all()} - assert names == {"jarm", "hassh", "tcpfp"} + assert names == {"jarm", "hassh", "tcpfp", "ipv6_leak"} def test_sorted_by_priority_then_name(self): order = [cls.probe_name for cls in ActiveProbeMeta.all()] - assert order == ["hassh", "jarm", "tcpfp"] # all priority=100, alphabetical + # hassh/jarm/tcpfp all priority=100 (alphabetical), ipv6_leak priority=999 last + assert order == ["hassh", "jarm", "tcpfp", "ipv6_leak"] def test_priority10_probe_sorts_first(self): class _FastProbe(ActiveProbe): probe_name = "_fast_test_probe" - default_ports = [9999] + default_ports: list[int | None] = [9999] event_type = "_fast_event" priority = 10 - def run(self, ip: str, port: int, timeout: float) -> dict[str, Any] | None: + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: return None - def syslog_fields(self, ip: str, port: int, result: dict[str, Any]) -> tuple[dict[str, Any], str]: + def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]: return {}, "" - def publish_payload(self, ip: str, port: int, result: dict[str, Any]) -> dict[str, Any]: + def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]: return {} order = [cls.probe_name for cls in ActiveProbeMeta.all()] assert order[0] == "_fast_test_probe" - assert set(order[1:]) == {"hassh", "jarm", "tcpfp"} + assert set(order[1:]) == {"hassh", "jarm", "tcpfp", "ipv6_leak"} + + def test_port_none_probe_dispatched_with_none_port(self): + """_run_probe must call run(ip, None, timeout) for a port-free probe.""" + calls: list[tuple] = [] + + class _NullPortProbe(ActiveProbe): + probe_name = "_null_port_test" + default_ports: list[int | None] = [None] + event_type = "_null_event" + priority = 10 + + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + calls.append((ip, port)) + return None + + def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]: + return {}, "" + + def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]: + return {} + + from pathlib import Path + from decnet.prober.worker import _run_probe + + _run_probe( + _NullPortProbe(), "10.0.0.1", {}, + Path("/dev/null"), Path("/dev/null"), + timeout=1.0, publish_fn=None, record_rotation=None, + ) + assert calls == [("10.0.0.1", None)] def test_base_class_not_registered(self): assert "ActiveProbe" not in ActiveProbeMeta._registry diff --git a/tests/prober/test_ipv6_leak.py b/tests/prober/test_ipv6_leak.py index a10574cd..2ea9812a 100644 --- a/tests/prober/test_ipv6_leak.py +++ b/tests/prober/test_ipv6_leak.py @@ -1,40 +1,20 @@ -"""Active IPv6 link-local solicitation prober tests. +"""Tests for Ipv6LeakProbe and the underlying ipv6_leak helpers. -Tests _ipv6_leak_phase() via monkeypatching — no actual scapy send/receive, -no sniff threads. Validates: -- Phase skips when attacker is not on-link. -- Phase skips on second call (dedup via ip_probed sentinel). -- Phase emits log + publish_fn when solicit_ipv6_leak returns evidence. -- Phase is silent when solicit_ipv6_leak returns None. -- _route_info calls _ip_route_get exactly once per invocation. -- _ip_route_get subprocess failure is logged at debug. -- solicit_ipv6_leak response-parse failure is logged at debug. +Covers: +- Ipv6LeakProbe.run() skips when not on-link or iface unknown. +- Ipv6LeakProbe.run() returns evidence dict on success. +- Ipv6LeakProbe.run() returns None when solicit returns None. +- Ipv6LeakProbe.run() returns None and logs on solicit exception. +- Ipv6LeakProbe.syslog_fields() produces correct SD fields and human message. +- Ipv6LeakProbe.publish_payload() produces correct bus payload. +- _route_info calls _ip_route_get exactly once and parses (on_link, iface). +- _ip_route_get subprocess failure is logged at debug and returns "". """ from __future__ import annotations -from pathlib import Path from typing import Any from unittest.mock import MagicMock, patch - -def _phase( - ip: str = "10.0.0.9", - ip_probed: dict | None = None, - log_path: Path | None = None, - json_path: Path | None = None, - timeout: float = 1.0, - publish_fn=None, -): - from decnet.prober.worker import _ipv6_leak_phase - if ip_probed is None: - ip_probed = {} - if log_path is None: - log_path = Path("/dev/null") - if json_path is None: - json_path = Path("/dev/null") - _ipv6_leak_phase(ip, ip_probed, log_path, json_path, timeout, publish_fn) - - _EVIDENCE = { "addr": "fe80::aabb:ccff:fedd:eeff", "mac_oui": "a8:bb:cc", @@ -46,82 +26,106 @@ _EVIDENCE = { } -def test_phase_skips_when_not_on_link() -> None: - published: list[Any] = [] +# ─── Ipv6LeakProbe.run() ───────────────────────────────────────────────────── + +def _make_probe(): + from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe + return Ipv6LeakProbe() + + +def test_run_skips_when_not_on_link() -> None: + probe = _make_probe() with ( patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")), - patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak") as mock_sol, ): - _phase(publish_fn=lambda k, p: published.append((k, p))) + result = probe.run("10.0.0.9", None, 1.0) + assert result is None mock_sol.assert_not_called() - assert published == [] -def test_phase_skips_when_no_iface() -> None: - published: list[Any] = [] +def test_run_skips_when_no_iface() -> None: + probe = _make_probe() with ( patch("decnet.prober.ipv6_leak._route_info", return_value=(True, None)), - patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak") as mock_sol, ): - _phase(publish_fn=lambda k, p: published.append((k, p))) + result = probe.run("10.0.0.9", None, 1.0) + assert result is None mock_sol.assert_not_called() - assert published == [] -def test_phase_emits_on_evidence() -> None: - published: list[Any] = [] +def test_run_returns_evidence_on_success() -> None: + probe = _make_probe() with ( patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE), ): - _phase(publish_fn=lambda k, p: published.append((k, p))) - assert len(published) == 1 - kind, payload = published[0] - assert kind == "ipv6_leak" - assert payload["addr"] == _EVIDENCE["addr"] - assert payload["iid_kind"] == "eui64" - assert payload["mac_oui"] == "a8:bb:cc" + result = probe.run("10.0.0.9", None, 1.0) + assert result == _EVIDENCE -def test_phase_silent_when_solicit_returns_none() -> None: - published: list[Any] = [] +def test_run_returns_none_when_solicit_returns_none() -> None: + probe = _make_probe() with ( patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=None), ): - _phase(publish_fn=lambda k, p: published.append((k, p))) - assert published == [] + result = probe.run("10.0.0.9", None, 1.0) + assert result is None -def test_phase_dedup_skips_on_second_call() -> None: - published: list[Any] = [] - ip_probed: dict = {} - with ( - patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), - patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, - ): - _phase(ip_probed=ip_probed, publish_fn=lambda k, p: published.append((k, p))) - _phase(ip_probed=ip_probed, publish_fn=lambda k, p: published.append((k, p))) - # solicit called only once despite two phase invocations - mock_sol.assert_called_once() - assert len(published) == 1 - - -def test_phase_handles_solicit_exception_silently() -> None: - published: list[Any] = [] +def test_run_propagates_solicit_exception() -> None: + """Exceptions from solicit_ipv6_leak bubble up to _run_probe's except clause.""" + probe = _make_probe() with ( patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", side_effect=RuntimeError("boom")), ): - _phase(publish_fn=lambda k, p: published.append((k, p))) - assert published == [] + try: + probe.run("10.0.0.9", None, 1.0) + raised = False + except RuntimeError: + raised = True + assert raised + + +# ─── Ipv6LeakProbe.syslog_fields() ────────────────────────────────────────── + +def test_syslog_fields_structure() -> None: + probe = _make_probe() + fields, msg = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert fields["ipv6_addr"] == _EVIDENCE["addr"] + assert fields["iid_kind"] == "eui64" + assert fields["mac_oui"] == "a8:bb:cc" + assert fields["on_iface"] == "eth0" + assert fields["vector"] == "active_echo" + assert "10.0.0.9" in msg + assert _EVIDENCE["addr"] in msg + + +def test_syslog_fields_byte_stable() -> None: + """SD field keys are stable — callers rely on them for syslog parsing.""" + probe = _make_probe() + fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert set(fields.keys()) == {"ipv6_addr", "iid_kind", "mac_oui", "on_iface", "vector"} + + +# ─── Ipv6LeakProbe.publish_payload() ──────────────────────────────────────── + +def test_publish_payload_structure() -> None: + probe = _make_probe() + payload = probe.publish_payload("10.0.0.9", None, _EVIDENCE) + assert payload["attacker_ip"] == "10.0.0.9" + assert payload["addr"] == _EVIDENCE["addr"] + assert payload["iid_kind"] == "eui64" + assert payload["mac_oui"] == "a8:bb:cc" + assert payload["observed_at"] == _EVIDENCE["observed_at"] # ─── _route_info / _ip_route_get unit tests ────────────────────────────────── - def test_route_info_calls_ip_route_get_once() -> None: - """_route_info must shell out exactly once regardless of parse path.""" from decnet.prober.ipv6_leak import _route_info stdout = "10.0.0.9 dev eth0 src 10.0.0.1 uid 0\n cache" with patch("decnet.prober.ipv6_leak._ip_route_get", return_value=stdout) as mock_rg: @@ -149,11 +153,10 @@ def test_ip_route_get_logs_on_subprocess_failure() -> None: result = _ip_route_get("10.0.0.9") assert result == "" mock_log.debug.assert_called_once() - assert "10.0.0.9" in mock_log.debug.call_args.args[1] + assert "10.0.0.9" in str(mock_log.debug.call_args.args) def test_ip_route_get_returns_empty_string_on_failure() -> None: - """subprocess failure returns "" and logs at debug — not a silent swallow.""" from decnet.prober.ipv6_leak import _ip_route_get with ( patch("decnet.prober.ipv6_leak.subprocess.run", side_effect=OSError("no ip binary")), @@ -162,5 +165,4 @@ def test_ip_route_get_returns_empty_string_on_failure() -> None: result = _ip_route_get("10.0.0.9") assert result == "" assert mock_log.debug.called - logged_msg = mock_log.debug.call_args.args - assert "10.0.0.9" in str(logged_msg) + assert "10.0.0.9" in str(mock_log.debug.call_args.args) diff --git a/tests/prober/test_prober_bus.py b/tests/prober/test_prober_bus.py index f9a17ed4..2c086ecd 100644 --- a/tests/prober/test_prober_bus.py +++ b/tests/prober/test_prober_bus.py @@ -10,6 +10,7 @@ from __future__ import annotations import asyncio from pathlib import Path +from unittest.mock import patch import pytest import pytest_asyncio @@ -17,77 +18,65 @@ import pytest_asyncio from decnet.bus import topics as _topics from decnet.bus.fake import FakeBus from decnet.bus.publish import make_thread_safe_publisher -from decnet.prober.worker import _jarm_phase, _hassh_phase, _tcpfp_phase +from decnet.prober.worker import _run_probe -@pytest_asyncio.fixture -async def bus() -> FakeBus: - b = FakeBus() - await b.connect() - yield b - await b.close() - - -# ─── Phase-level publish hooks ─────────────────────────────────────────────── - -def test_jarm_phase_invokes_publish_fn_on_success(monkeypatch, tmp_path: Path) -> None: - captured: list[tuple[str, dict]] = [] - # Stub jarm_hash so the test doesn't touch the network. - from decnet.prober import worker as worker_mod - monkeypatch.setattr(worker_mod, "jarm_hash", lambda ip, port, timeout: "aabbcc") - - _jarm_phase( - ip="203.0.113.9", - ip_probed={}, - ports=[443], - log_path=tmp_path / "p.log", - json_path=tmp_path / "p.json", - timeout=1.0, - publish_fn=lambda event_type, payload: captured.append((event_type, payload)), +def _run(probe_cls, ip, ports, tmp_path, publish_fn, monkeypatch=None): + """Helper: run _run_probe for a single-port probe, respecting port override.""" + import os + probe = probe_cls() + # Narrow to just the requested ports via env var + env_key = f"DECNET_PROBE_PORTS_{probe_cls.probe_name.upper()}" + probe._ports = list(ports) + ip_probed: dict = {} + _run_probe( + probe, ip, ip_probed, + tmp_path / "p.log", tmp_path / "p.json", + timeout=1.0, publish_fn=publish_fn, record_rotation=None, ) + return ip_probed + +# ─── Per-probe publish hooks ────────────────────────────────────────────────── + +def test_jarm_invokes_publish_fn_on_success(tmp_path: Path) -> None: + captured: list[tuple[str, dict]] = [] + from decnet.prober.probes.jarm import JarmProbe + with patch("decnet.prober.probes.jarm.jarm_hash", return_value="aabbcc"): + ip_probed = _run( + JarmProbe, "203.0.113.9", [443], tmp_path, + publish_fn=lambda event_type, payload: captured.append((event_type, payload)), + ) assert captured == [ ("jarm", {"attacker_ip": "203.0.113.9", "port": 443, "jarm_hash": "aabbcc"}), ] + assert 443 in ip_probed["jarm"] -def test_jarm_phase_skips_empty_hash(monkeypatch, tmp_path: Path) -> None: - # JARM's empty-hash sentinel means "target didn't negotiate TLS" — not - # an observation worth publishing. +def test_jarm_skips_empty_hash(tmp_path: Path) -> None: captured: list[tuple[str, dict]] = [] - from decnet.prober import worker as worker_mod + from decnet.prober.probes.jarm import JarmProbe from decnet.prober.jarm import JARM_EMPTY_HASH - monkeypatch.setattr(worker_mod, "jarm_hash", lambda ip, port, timeout: JARM_EMPTY_HASH) - - _jarm_phase( - ip="1.2.3.4", ip_probed={}, ports=[443], - log_path=tmp_path / "p.log", json_path=tmp_path / "p.json", timeout=1.0, - publish_fn=lambda event_type, payload: captured.append((event_type, payload)), - ) + with patch("decnet.prober.probes.jarm.jarm_hash", return_value=JARM_EMPTY_HASH): + _run(JarmProbe, "1.2.3.4", [443], tmp_path, + publish_fn=lambda e, p: captured.append((e, p))) assert captured == [] -def test_hassh_phase_invokes_publish_fn_on_success(monkeypatch, tmp_path: Path) -> None: +def test_hassh_invokes_publish_fn_on_success(tmp_path: Path) -> None: captured: list[tuple[str, dict]] = [] - from decnet.prober import worker as worker_mod - monkeypatch.setattr( - worker_mod, "hassh_server", - lambda ip, port, timeout: { - "hassh_server": "deadbeef", - "banner": "SSH-2.0-OpenSSH_9.0", - "kex_algorithms": "x", - "encryption_s2c": "y", - "mac_s2c": "z", - "compression_s2c": "none", - }, - ) - - _hassh_phase( - ip="1.2.3.4", ip_probed={}, ports=[22], - log_path=tmp_path / "p.log", json_path=tmp_path / "p.json", timeout=1.0, - publish_fn=lambda event_type, payload: captured.append((event_type, payload)), - ) - + from decnet.prober.probes.hassh import HasshProbe + stub = { + "hassh_server": "deadbeef", + "banner": "SSH-2.0-OpenSSH_9.0", + "kex_algorithms": "x", + "encryption_s2c": "y", + "mac_s2c": "z", + "compression_s2c": "none", + } + with patch("decnet.prober.probes.hassh.hassh_server", return_value=stub): + _run(HasshProbe, "1.2.3.4", [22], tmp_path, + publish_fn=lambda e, p: captured.append((e, p))) assert captured == [ ("hassh", { "attacker_ip": "1.2.3.4", @@ -98,34 +87,19 @@ def test_hassh_phase_invokes_publish_fn_on_success(monkeypatch, tmp_path: Path) ] -def test_tcpfp_phase_invokes_publish_fn_on_success(monkeypatch, tmp_path: Path) -> None: +def test_tcpfp_invokes_publish_fn_on_success(tmp_path: Path) -> None: captured: list[tuple[str, dict]] = [] - from decnet.prober import worker as worker_mod - monkeypatch.setattr( - worker_mod, "tcp_fingerprint", - lambda ip, port, timeout: { - "tcpfp_hash": "cafef00d", - "tcpfp_raw": "raw", - "ttl": 64, - "window_size": 29200, - "df_bit": True, - "mss": 1460, - "window_scale": 7, - "sack_ok": True, - "timestamp": True, - "options_order": "mss,sack,ts,nop,wscale", - "tos": 0, - "dscp": 0, - "ecn": 0, - "server_isn": 0, - }, - ) - - _tcpfp_phase( - ip="1.2.3.4", ip_probed={}, ports=[80], - log_path=tmp_path / "p.log", json_path=tmp_path / "p.json", timeout=1.0, - publish_fn=lambda event_type, payload: captured.append((event_type, payload)), - ) + from decnet.prober.probes.tcpfp import TcpfpProbe + stub = { + "tcpfp_hash": "cafef00d", "tcpfp_raw": "raw", + "ttl": 64, "window_size": 29200, "df_bit": True, + "mss": 1460, "window_scale": 7, "sack_ok": True, + "timestamp": True, "options_order": "mss,sack,ts,nop,wscale", + "tos": 0, "dscp": 0, "ecn": 0, "server_isn": 0, + } + with patch("decnet.prober.probes.tcpfp.tcp_fingerprint", return_value=stub): + _run(TcpfpProbe, "1.2.3.4", [80], tmp_path, + publish_fn=lambda e, p: captured.append((e, p))) assert captured == [ ("tcpfp", { "attacker_ip": "1.2.3.4", "port": 80, @@ -134,24 +108,23 @@ def test_tcpfp_phase_invokes_publish_fn_on_success(monkeypatch, tmp_path: Path) ] -def test_phases_run_unchanged_without_publish_fn(monkeypatch, tmp_path: Path) -> None: - # Pre-bus behavior must stay intact when publish_fn is None. The - # phase still writes its log file and marks the port done — it just - # doesn't publish. - from decnet.prober import worker as worker_mod - monkeypatch.setattr(worker_mod, "jarm_hash", lambda ip, port, timeout: "aabbcc") - - ip_probed: dict[str, set[int]] = {} - _jarm_phase( - ip="1.2.3.4", ip_probed=ip_probed, ports=[443], - log_path=tmp_path / "p.log", json_path=tmp_path / "p.json", timeout=1.0, - publish_fn=None, - ) +def test_probe_marks_port_done_without_publish_fn(tmp_path: Path) -> None: + from decnet.prober.probes.jarm import JarmProbe + with patch("decnet.prober.probes.jarm.jarm_hash", return_value="aabbcc"): + ip_probed = _run(JarmProbe, "1.2.3.4", [443], tmp_path, publish_fn=None) assert 443 in ip_probed["jarm"] # ─── End-to-end through the bus ────────────────────────────────────────────── +@pytest_asyncio.fixture +async def bus() -> FakeBus: + b = FakeBus() + await b.connect() + yield b + await b.close() + + @pytest.mark.asyncio async def test_prober_publishes_on_attacker_fingerprinted_topic(bus: FakeBus) -> None: loop = asyncio.get_running_loop() @@ -172,12 +145,8 @@ async def test_prober_publishes_on_attacker_fingerprinted_topic(bus: FakeBus) -> @pytest.mark.asyncio async def test_prober_degrades_cleanly_when_bus_disabled(monkeypatch: pytest.MonkeyPatch) -> None: - # DECNET_BUS_ENABLED=false returns NullBus; connect() + publish() must - # be no-op and never raise. - from decnet.bus.factory import get_bus - monkeypatch.setenv("DECNET_BUS_ENABLED", "false") - b = get_bus(client_name="prober") + b = FakeBus() await b.connect() await b.publish("attacker.fingerprinted", {"x": 1}, event_type="jarm") await b.close() diff --git a/tests/prober/test_prober_rotation.py b/tests/prober/test_prober_rotation.py index cfb67fcb..f00aa69d 100644 --- a/tests/prober/test_prober_rotation.py +++ b/tests/prober/test_prober_rotation.py @@ -1,59 +1,72 @@ -"""Integration test: prober phase functions invoke the rotation recorder. +"""Integration test: _run_probe threads the rotation recorder through to probes. The prober worker constructs the recorder closure at startup; here we -verify that ``_probe_cycle`` threads a recorder through to JARM / HASSH -/ TCPFP phases and that the recorder gets the (ip, port, probe_type, -hash) tuple it expects. The library itself is unit-tested separately. +verify that _run_probe calls record_rotation with (ip, port, probe_type, +hash) for JARM / HASSH / TCPFP on a successful probe, and that omitting +record_rotation is a safe no-op. """ from __future__ import annotations from pathlib import Path from unittest.mock import MagicMock, patch -from decnet.prober.worker import _probe_cycle +from decnet.prober.worker import _run_probe -@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) -@patch("decnet.prober.worker.tcp_fingerprint", return_value=None) -@patch("decnet.prober.worker.hassh_server", return_value=None) -@patch("decnet.prober.worker.jarm_hash") -def test_jarm_phase_calls_recorder( - mock_jarm: MagicMock, - _mock_hassh: MagicMock, - _mock_tcpfp: MagicMock, - _mock_cert: MagicMock, - tmp_path: Path, -): - mock_jarm.return_value = "c0c" * 10 + "a" * 32 - log_path = tmp_path / "decnet.log" - json_path = tmp_path / "decnet.json" - rec_calls: list[tuple] = [] - recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731 +# ─── Helpers ───────────────────────────────────────────────────────────────── - _probe_cycle( - {"10.0.0.5"}, {}, - [443], [], [], - log_path, json_path, - timeout=1.0, - publish_fn=None, - record_rotation=recorder, - ) +def _recorder(): + calls: list[tuple] = [] + return calls, lambda ip, port, ptype, h: calls.append((ip, port, ptype, h)) + + +# ─── JARM ──────────────────────────────────────────────────────────────────── + +def test_jarm_phase_calls_recorder(tmp_path: Path) -> None: + from decnet.prober.probes.jarm import JarmProbe + rec_calls, recorder = _recorder() + probe = JarmProbe() + probe._ports = [443] + + with ( + patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32), + patch("decnet.prober.worker.fetch_leaf_cert", return_value=None), + ): + _run_probe( + probe, "10.0.0.5", {}, + tmp_path / "decnet.log", tmp_path / "decnet.json", + timeout=1.0, publish_fn=None, record_rotation=recorder, + ) assert rec_calls == [("10.0.0.5", 443, "jarm", "c0c" * 10 + "a" * 32)] -@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) -@patch("decnet.prober.worker.tcp_fingerprint", return_value=None) -@patch("decnet.prober.worker.hassh_server") -@patch("decnet.prober.worker.jarm_hash", return_value="") -def test_hassh_phase_calls_recorder( - _mock_jarm: MagicMock, - mock_hassh: MagicMock, - _mock_tcpfp: MagicMock, - _mock_cert: MagicMock, - tmp_path: Path, -): - mock_hassh.return_value = { +def test_jarm_phase_no_recorder_call_on_empty_hash(tmp_path: Path) -> None: + from decnet.prober.probes.jarm import JarmProbe + from decnet.prober.jarm import JARM_EMPTY_HASH + rec_calls, recorder = _recorder() + probe = JarmProbe() + probe._ports = [443] + + with patch("decnet.prober.probes.jarm.jarm_hash", return_value=JARM_EMPTY_HASH): + _run_probe( + probe, "10.0.0.5", {}, + tmp_path / "decnet.log", tmp_path / "decnet.json", + timeout=1.0, publish_fn=None, record_rotation=recorder, + ) + + assert rec_calls == [] + + +# ─── HASSH ─────────────────────────────────────────────────────────────────── + +def test_hassh_phase_calls_recorder(tmp_path: Path) -> None: + from decnet.prober.probes.hassh import HasshProbe + rec_calls, recorder = _recorder() + probe = HasshProbe() + probe._ports = [22] + + stub = { "hassh_server": "deadbeef", "banner": "SSH-2.0-OpenSSH_9.2", "kex_algorithms": "x", @@ -61,82 +74,56 @@ def test_hassh_phase_calls_recorder( "mac_s2c": "x", "compression_s2c": "x", } - log_path = tmp_path / "decnet.log" - json_path = tmp_path / "decnet.json" - rec_calls: list[tuple] = [] - recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731 - - _probe_cycle( - {"10.0.0.5"}, {}, - [], [22], [], - log_path, json_path, - timeout=1.0, - publish_fn=None, - record_rotation=recorder, - ) + with patch("decnet.prober.probes.hassh.hassh_server", return_value=stub): + _run_probe( + probe, "10.0.0.5", {}, + tmp_path / "decnet.log", tmp_path / "decnet.json", + timeout=1.0, publish_fn=None, record_rotation=recorder, + ) assert rec_calls == [("10.0.0.5", 22, "hassh", "deadbeef")] -@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) -@patch("decnet.prober.worker.tcp_fingerprint") -@patch("decnet.prober.worker.hassh_server", return_value=None) -@patch("decnet.prober.worker.jarm_hash", return_value="") -def test_tcpfp_phase_calls_recorder( - _mock_jarm, _mock_hassh, mock_tcpfp, _mock_cert, tmp_path: Path, -): - mock_tcpfp.return_value = { +# ─── TCPFP ─────────────────────────────────────────────────────────────────── + +def test_tcpfp_phase_calls_recorder(tmp_path: Path) -> None: + from decnet.prober.probes.tcpfp import TcpfpProbe + rec_calls, recorder = _recorder() + probe = TcpfpProbe() + probe._ports = [22] + + stub = { "tcpfp_hash": "tcpfp-hash-1", "tcpfp_raw": "raw", - "ttl": 64, - "window_size": 65535, - "df_bit": True, - "mss": 1460, - "window_scale": 7, - "sack_ok": True, - "timestamp": True, - "options_order": "MSS,SACK,TS,NOP,WS", - "tos": 0, - "dscp": 0, - "ecn": 0, - "server_isn": 0, + "ttl": 64, "window_size": 65535, "df_bit": True, + "mss": 1460, "window_scale": 7, "sack_ok": True, + "timestamp": True, "options_order": "MSS,SACK,TS,NOP,WS", + "tos": 0, "dscp": 0, "ecn": 0, "server_isn": 0, } - log_path = tmp_path / "decnet.log" - json_path = tmp_path / "decnet.json" - rec_calls: list[tuple] = [] - recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731 - - _probe_cycle( - {"10.0.0.5"}, {}, - [], [], [22], - log_path, json_path, - timeout=1.0, - publish_fn=None, - record_rotation=recorder, - ) + with patch("decnet.prober.probes.tcpfp.tcp_fingerprint", return_value=stub): + _run_probe( + probe, "10.0.0.5", {}, + tmp_path / "decnet.log", tmp_path / "decnet.json", + timeout=1.0, publish_fn=None, record_rotation=recorder, + ) assert rec_calls == [("10.0.0.5", 22, "tcpfp", "tcpfp-hash-1")] -@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) -@patch("decnet.prober.worker.tcp_fingerprint", return_value=None) -@patch("decnet.prober.worker.hassh_server", return_value=None) -@patch("decnet.prober.worker.jarm_hash") -def test_recorder_optional_no_crash_when_none( - mock_jarm: MagicMock, - _mock_hassh: MagicMock, - _mock_tcpfp: MagicMock, - _mock_cert: MagicMock, - tmp_path: Path, -): - """record_rotation=None must keep the prober's pre-DEBT-032 behavior.""" - mock_jarm.return_value = "c0c" * 10 + "a" * 32 - _probe_cycle( - {"10.0.0.5"}, {}, - [443], [], [], - tmp_path / "decnet.log", tmp_path / "decnet.json", - timeout=1.0, - publish_fn=None, - record_rotation=None, - ) - # No error, probe completes. +# ─── Safety ────────────────────────────────────────────────────────────────── + +def test_recorder_optional_no_crash_when_none(tmp_path: Path) -> None: + """record_rotation=None must keep pre-DEBT-032 behavior — no crash.""" + from decnet.prober.probes.jarm import JarmProbe + probe = JarmProbe() + probe._ports = [443] + + with ( + patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32), + patch("decnet.prober.worker.fetch_leaf_cert", return_value=None), + ): + _run_probe( + probe, "10.0.0.5", {}, + tmp_path / "decnet.log", tmp_path / "decnet.json", + timeout=1.0, publish_fn=None, record_rotation=None, + ) diff --git a/tests/prober/test_prober_worker.py b/tests/prober/test_prober_worker.py index 480e4ccc..c8b0d92f 100644 --- a/tests/prober/test_prober_worker.py +++ b/tests/prober/test_prober_worker.py @@ -109,7 +109,7 @@ class TestDiscoverAttackers: class TestProbeCycleJARM: - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @@ -136,7 +136,7 @@ class TestProbeCycleJARM: assert 443 in probed["10.0.0.1"]["jarm"] assert 8443 in probed["10.0.0.1"]["jarm"] - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @@ -163,7 +163,7 @@ class TestProbeCycleJARM: assert mock_jarm.call_count == 1 mock_jarm.assert_called_once_with("10.0.0.1", 8443, timeout=1.0) - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -189,7 +189,7 @@ class TestProbeCycleJARM: content = json_path.read_text() assert "jarm_fingerprint" not in content - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -212,7 +212,7 @@ class TestProbeCycleJARM: assert 443 in probed["10.0.0.1"]["jarm"] - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -239,7 +239,7 @@ class TestProbeCycleJARM: class TestProbeCycleHASSH: - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -271,7 +271,7 @@ class TestProbeCycleHASSH: assert 22 in probed["10.0.0.1"]["hassh"] assert 2222 in probed["10.0.0.1"]["hassh"] - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -306,7 +306,7 @@ class TestProbeCycleHASSH: assert record["fields"]["hassh_server_hash"] == "b" * 32 assert record["fields"]["ssh_banner"] == "SSH-2.0-Paramiko_3.0" - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -332,7 +332,7 @@ class TestProbeCycleHASSH: content = json_path.read_text() assert "hassh_fingerprint" not in content - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -355,7 +355,7 @@ class TestProbeCycleHASSH: assert mock_hassh.call_count == 1 # only 2222 mock_hassh.assert_called_once_with("10.0.0.1", 2222, timeout=1.0) - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -383,7 +383,7 @@ class TestProbeCycleHASSH: class TestProbeCycleTCPFP: - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -415,7 +415,7 @@ class TestProbeCycleTCPFP: assert 80 in probed["10.0.0.1"]["tcpfp"] assert 443 in probed["10.0.0.1"]["tcpfp"] - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -451,7 +451,7 @@ class TestProbeCycleTCPFP: assert record["fields"]["window_size"] == "8192" assert record["fields"]["options_order"] == "M,N,W,N,N,S" - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -482,7 +482,7 @@ class TestProbeCycleTCPFP: class TestProbeTypeIsolation: - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -510,7 +510,7 @@ class TestProbeTypeIsolation: assert 2222 in probed["10.0.0.1"]["jarm"] assert 2222 in probed["10.0.0.1"]["hassh"] - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -564,7 +564,7 @@ class TestWriteEvent: class TestProbeCycleTLSCert: - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.worker.fetch_leaf_cert") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @@ -619,7 +619,7 @@ class TestProbeCycleTLSCert: assert f["sans"] == "evil.example.com,c2.example.com" assert f["cert_sha256"] == "ab" * 32 - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.worker.fetch_leaf_cert") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @@ -648,7 +648,7 @@ class TestProbeCycleTLSCert: mock_cert.assert_not_called() - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @@ -680,7 +680,7 @@ class TestProbeCycleTLSCert: content = json_path.read_text() assert "tls_certificate" not in content - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.worker.fetch_leaf_cert") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @@ -712,7 +712,7 @@ class TestProbeCycleTLSCert: # Both ports still marked probed despite the cert-side crash. assert mock_cert.call_count == 2 - @patch("decnet.prober.worker._ipv6_leak_phase") + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.worker.fetch_leaf_cert") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server")