From b80e62190477e943448563e8ca3888b0436d0aa7 Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 21 May 2026 14:16:42 -0400 Subject: [PATCH] fix(prober): consolidate ip route get to single call + log bare excepts _route_info() calls _ip_route_get once and returns (on_link, iface); worker._ipv6_leak_phase now calls it instead of the two separate helpers. Bare except clauses at _ip_route_get and response parse now log at debug. --- decnet/prober/ipv6_leak.py | 38 +++++++++++------- decnet/prober/worker.py | 7 ++-- tests/prober/test_ipv6_leak.py | 70 ++++++++++++++++++++++++++++------ 3 files changed, 86 insertions(+), 29 deletions(-) diff --git a/decnet/prober/ipv6_leak.py b/decnet/prober/ipv6_leak.py index e9c5cf61..77a2eaf7 100644 --- a/decnet/prober/ipv6_leak.py +++ b/decnet/prober/ipv6_leak.py @@ -35,26 +35,37 @@ def _ip_route_get(attacker_v4: str) -> str: capture_output=True, text=True, timeout=2, ) return out.stdout - except Exception: + except Exception as exc: + _log.debug("ipv6_leak: ip route get failed for %s: %s", attacker_v4, exc) return "" +def _route_info(attacker_v4: str) -> tuple[bool, str | None]: + """Return (on_link, iface) with a single `ip route get` invocation. + + on_link is True when there is no intermediate gateway ("via" absent). + iface is the local interface name, or None if not parseable. + """ + stdout = _ip_route_get(attacker_v4) + parts = stdout.split() + on_link = "via" not in parts + iface: str | None = None + if "dev" in parts: + idx = parts.index("dev") + iface = parts[idx + 1] if idx + 1 < len(parts) else None + return on_link, iface + + def _resolve_iface_for_ip(attacker_v4: str) -> str | None: """Return the local interface name that would route to attacker_v4.""" - stdout = _ip_route_get(attacker_v4) - parts = stdout.split() - if "dev" in parts: - idx = parts.index("dev") - return parts[idx + 1] if idx + 1 < len(parts) else None - return None + _, iface = _route_info(attacker_v4) + return iface def _is_on_link(attacker_v4: str) -> bool: - """Return True only when the attacker is directly reachable on L2. - - Checks that `ip route get` shows no intermediate gateway (no "via"). - """ - return "via" not in _ip_route_get(attacker_v4) + """Return True only when the attacker is directly reachable on L2.""" + on_link, _ = _route_info(attacker_v4) + return on_link def solicit_ipv6_leak( @@ -100,7 +111,8 @@ def solicit_ipv6_leak( try: src_addr: str = resp[IPv6].src - except Exception: + except Exception as exc: + _log.debug("ipv6_leak: response parse failed on %s: %s", attacker_v4, exc) return None if not _is_link_local(src_addr): diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 76e7663c..15aa1481 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -402,13 +402,12 @@ def _ipv6_leak_phase( return done.add(0) - from decnet.prober.ipv6_leak import _is_on_link, _resolve_iface_for_ip, solicit_ipv6_leak + from decnet.prober.ipv6_leak import _route_info, solicit_ipv6_leak - if not _is_on_link(ip): + on_link, iface = _route_info(ip) + if not on_link: logger.debug("prober: ipv6_leak: %s is not on-link — skip active probe", ip) return - - iface = _resolve_iface_for_ip(ip) if iface is None: logger.debug("prober: ipv6_leak: cannot determine iface for %s", ip) return diff --git a/tests/prober/test_ipv6_leak.py b/tests/prober/test_ipv6_leak.py index 79de990c..a10574cd 100644 --- a/tests/prober/test_ipv6_leak.py +++ b/tests/prober/test_ipv6_leak.py @@ -6,6 +6,9 @@ no sniff threads. Validates: - Phase skips on second call (dedup via ip_probed sentinel). - Phase emits log + publish_fn when solicit_ipv6_leak returns evidence. - Phase is silent when solicit_ipv6_leak returns None. +- _route_info calls _ip_route_get exactly once per invocation. +- _ip_route_get subprocess failure is logged at debug. +- solicit_ipv6_leak response-parse failure is logged at debug. """ from __future__ import annotations @@ -46,8 +49,7 @@ _EVIDENCE = { def test_phase_skips_when_not_on_link() -> None: published: list[Any] = [] with ( - patch("decnet.prober.ipv6_leak._is_on_link", return_value=False), - patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, ): _phase(publish_fn=lambda k, p: published.append((k, p))) @@ -58,8 +60,7 @@ def test_phase_skips_when_not_on_link() -> None: def test_phase_skips_when_no_iface() -> None: published: list[Any] = [] with ( - patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), - patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value=None), + patch("decnet.prober.ipv6_leak._route_info", return_value=(True, None)), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, ): _phase(publish_fn=lambda k, p: published.append((k, p))) @@ -70,8 +71,7 @@ def test_phase_skips_when_no_iface() -> None: def test_phase_emits_on_evidence() -> None: published: list[Any] = [] with ( - patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), - patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE), ): _phase(publish_fn=lambda k, p: published.append((k, p))) @@ -86,8 +86,7 @@ def test_phase_emits_on_evidence() -> None: def test_phase_silent_when_solicit_returns_none() -> None: published: list[Any] = [] with ( - patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), - patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=None), ): _phase(publish_fn=lambda k, p: published.append((k, p))) @@ -98,8 +97,7 @@ def test_phase_dedup_skips_on_second_call() -> None: published: list[Any] = [] ip_probed: dict = {} with ( - patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), - patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, ): _phase(ip_probed=ip_probed, publish_fn=lambda k, p: published.append((k, p))) @@ -112,9 +110,57 @@ def test_phase_dedup_skips_on_second_call() -> None: def test_phase_handles_solicit_exception_silently() -> None: published: list[Any] = [] with ( - patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), - patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak._route_info", return_value=(True, "eth0")), patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", side_effect=RuntimeError("boom")), ): _phase(publish_fn=lambda k, p: published.append((k, p))) assert published == [] + + +# ─── _route_info / _ip_route_get unit tests ────────────────────────────────── + + +def test_route_info_calls_ip_route_get_once() -> None: + """_route_info must shell out exactly once regardless of parse path.""" + from decnet.prober.ipv6_leak import _route_info + stdout = "10.0.0.9 dev eth0 src 10.0.0.1 uid 0\n cache" + with patch("decnet.prober.ipv6_leak._ip_route_get", return_value=stdout) as mock_rg: + on_link, iface = _route_info("10.0.0.9") + mock_rg.assert_called_once_with("10.0.0.9") + assert on_link is True + assert iface == "eth0" + + +def test_route_info_detects_gateway() -> None: + from decnet.prober.ipv6_leak import _route_info + stdout = "10.0.0.9 via 192.168.1.1 dev eth0 src 192.168.1.50\n cache" + with patch("decnet.prober.ipv6_leak._ip_route_get", return_value=stdout): + on_link, iface = _route_info("10.0.0.9") + assert on_link is False + assert iface == "eth0" + + +def test_ip_route_get_logs_on_subprocess_failure() -> None: + from decnet.prober.ipv6_leak import _ip_route_get + with ( + patch("decnet.prober.ipv6_leak.subprocess.run", side_effect=OSError("no ip")), + patch("decnet.prober.ipv6_leak._log") as mock_log, + ): + result = _ip_route_get("10.0.0.9") + assert result == "" + mock_log.debug.assert_called_once() + assert "10.0.0.9" in mock_log.debug.call_args.args[1] + + +def test_ip_route_get_returns_empty_string_on_failure() -> None: + """subprocess failure returns "" and logs at debug — not a silent swallow.""" + from decnet.prober.ipv6_leak import _ip_route_get + with ( + patch("decnet.prober.ipv6_leak.subprocess.run", side_effect=OSError("no ip binary")), + patch("decnet.prober.ipv6_leak._log") as mock_log, + ): + result = _ip_route_get("10.0.0.9") + assert result == "" + assert mock_log.debug.called + logged_msg = mock_log.debug.call_args.args + assert "10.0.0.9" in str(logged_msg)