Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
321 lines
13 KiB
Python
321 lines
13 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Tests for IcmpErrorProbe and the underlying icmp_error helpers.
|
|
|
|
Covers:
|
|
- IcmpErrorProbe.run() returns helper result verbatim.
|
|
- IcmpErrorProbe.run() returns None when helper returns None.
|
|
- IcmpErrorProbe.syslog_fields() — stable key set, correct flag encoding, human msg.
|
|
- IcmpErrorProbe.publish_payload() — correct bus payload shape.
|
|
- _probe_port_unreachable / _probe_time_exceeded / _probe_frag_needed /
|
|
_probe_param_problem — returned-reply and silent-timeout cases.
|
|
- _probe_time_exceeded skipped when on-link.
|
|
- elicit_icmp_errors returns None when scapy is unavailable.
|
|
- Fingerprint hash is deterministic for identical inputs.
|
|
- Matrix encoding table-driven across all four present/absent combinations.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import importlib
|
|
from typing import Any
|
|
from unittest.mock import MagicMock, call, patch
|
|
|
|
# ─── fixtures ────────────────────────────────────────────────────────────────
|
|
|
|
_SILENT: dict[str, Any] = {
|
|
"returned": False,
|
|
"rtt_ms": None,
|
|
"src_ip": None,
|
|
"icmp_code": None,
|
|
"echo_len": None,
|
|
"echo_bytes_hex": None,
|
|
}
|
|
|
|
_EVIDENCE: dict[str, Any] = {
|
|
"matrix": "PT..",
|
|
"fingerprint_hash": "abcdef1234567890abcdef1234567890",
|
|
"errors": {
|
|
"port_unreachable": {
|
|
"sent": True, "returned": True, "rtt_ms": 1.5, "src_ip": "10.0.0.9",
|
|
"icmp_code": 3, "echo_len": 28, "echo_bytes_hex": "aabbcc",
|
|
},
|
|
"time_exceeded": {
|
|
"sent": True, "returned": True, "rtt_ms": 0.8, "src_ip": "192.168.1.1",
|
|
"icmp_code": 0, "echo_len": 28, "echo_bytes_hex": "ddeeff",
|
|
},
|
|
"frag_needed": dict(_SILENT),
|
|
"param_problem": dict(_SILENT),
|
|
},
|
|
"observed_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
|
|
|
|
def _make_probe():
|
|
from decnet.prober.probes.icmp_error_probe import IcmpErrorProbe
|
|
return IcmpErrorProbe()
|
|
|
|
|
|
# ─── IcmpErrorProbe.run() ─────────────────────────────────────────────────────
|
|
|
|
def test_run_returns_evidence() -> None:
|
|
probe = _make_probe()
|
|
with patch("decnet.prober.icmp_error.elicit_icmp_errors", return_value=_EVIDENCE) as mock_fn:
|
|
result = probe.run("10.0.0.9", None, 2.0)
|
|
assert result == _EVIDENCE
|
|
mock_fn.assert_called_once_with("10.0.0.9", timeout=2.0)
|
|
|
|
|
|
def test_run_returns_none_when_helper_returns_none() -> None:
|
|
probe = _make_probe()
|
|
with patch("decnet.prober.icmp_error.elicit_icmp_errors", return_value=None):
|
|
result = probe.run("10.0.0.9", None, 2.0)
|
|
assert result is None
|
|
|
|
|
|
# ─── IcmpErrorProbe.syslog_fields() ──────────────────────────────────────────
|
|
|
|
_EXPECTED_SD_KEYS = {
|
|
"icmp_matrix",
|
|
"icmp_fp_hash",
|
|
"icmp_port_unreach",
|
|
"icmp_time_exceeded",
|
|
"icmp_frag_needed",
|
|
"icmp_param_problem",
|
|
"icmp_port_unreach_rtt_ms",
|
|
"icmp_time_exceeded_rtt_ms",
|
|
"icmp_frag_needed_rtt_ms",
|
|
"icmp_param_problem_rtt_ms",
|
|
"icmp_time_exceeded_hop",
|
|
}
|
|
|
|
|
|
def test_syslog_fields_byte_stable() -> None:
|
|
probe = _make_probe()
|
|
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
|
|
assert set(fields.keys()) == _EXPECTED_SD_KEYS
|
|
|
|
|
|
def test_syslog_fields_flag_encoding() -> None:
|
|
probe = _make_probe()
|
|
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
|
|
assert fields["icmp_port_unreach"] == "1"
|
|
assert fields["icmp_time_exceeded"] == "1"
|
|
assert fields["icmp_frag_needed"] == "0"
|
|
assert fields["icmp_param_problem"] == "0"
|
|
|
|
|
|
def test_syslog_fields_rtt_populated() -> None:
|
|
probe = _make_probe()
|
|
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
|
|
assert fields["icmp_port_unreach_rtt_ms"] == "1.5"
|
|
assert fields["icmp_time_exceeded_rtt_ms"] == "0.8"
|
|
assert fields["icmp_frag_needed_rtt_ms"] == ""
|
|
assert fields["icmp_param_problem_rtt_ms"] == ""
|
|
|
|
|
|
def test_syslog_fields_time_exceeded_hop() -> None:
|
|
probe = _make_probe()
|
|
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
|
|
assert fields["icmp_time_exceeded_hop"] == "192.168.1.1"
|
|
|
|
|
|
def test_syslog_fields_human_msg_contains_ip_and_matrix() -> None:
|
|
probe = _make_probe()
|
|
_, msg = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
|
|
assert "10.0.0.9" in msg
|
|
assert "PT.." in msg
|
|
|
|
|
|
def test_syslog_fields_matrix_and_hash_present() -> None:
|
|
probe = _make_probe()
|
|
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
|
|
assert fields["icmp_matrix"] == "PT.."
|
|
assert fields["icmp_fp_hash"] == _EVIDENCE["fingerprint_hash"]
|
|
|
|
|
|
# ─── IcmpErrorProbe.publish_payload() ────────────────────────────────────────
|
|
|
|
def test_publish_payload_structure() -> None:
|
|
probe = _make_probe()
|
|
payload = probe.publish_payload("10.0.0.9", None, _EVIDENCE)
|
|
assert payload["attacker_ip"] == "10.0.0.9"
|
|
assert payload["icmp_matrix"] == "PT.."
|
|
assert payload["icmp_fp_hash"] == _EVIDENCE["fingerprint_hash"]
|
|
assert payload["errors"] is _EVIDENCE["errors"]
|
|
assert payload["observed_at"] == _EVIDENCE["observed_at"]
|
|
|
|
|
|
# ─── helper: _parse_reply ────────────────────────────────────────────────────
|
|
|
|
def _make_mock_resp(icmp_type: int, icmp_code: int, src_ip: str, payload_bytes: bytes = b"\x00" * 20) -> MagicMock:
|
|
"""Build a minimal scapy-shaped response mock."""
|
|
resp = MagicMock()
|
|
resp.time = 0.0
|
|
|
|
icmp_layer = MagicMock()
|
|
icmp_layer.type = icmp_type
|
|
icmp_layer.code = icmp_code
|
|
icmp_layer.payload = MagicMock()
|
|
icmp_layer.payload.__bytes__ = lambda self: payload_bytes
|
|
# make bytes(icmp_layer.payload) work
|
|
type(icmp_layer.payload).__bytes__ = lambda self: payload_bytes
|
|
|
|
ip_layer = MagicMock()
|
|
ip_layer.src = src_ip
|
|
|
|
def getitem(key):
|
|
from scapy.all import ICMP, IP # type: ignore[attr-defined]
|
|
if key is IP or (isinstance(key, type) and key.__name__ == "IP"):
|
|
return ip_layer
|
|
if key is ICMP or (isinstance(key, type) and key.__name__ == "ICMP"):
|
|
return icmp_layer
|
|
raise KeyError(key)
|
|
|
|
resp.__getitem__ = getitem
|
|
return resp
|
|
|
|
|
|
# ─── helper: primitive probe unit tests ──────────────────────────────────────
|
|
|
|
def test_probe_port_unreachable_silent_on_none_response() -> None:
|
|
from decnet.prober.icmp_error import _probe_port_unreachable
|
|
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
|
|
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
|
|
with patch("scapy.all.sr1", return_value=None):
|
|
result = _probe_port_unreachable("10.0.0.9", 0.1)
|
|
assert result["returned"] is False
|
|
assert result["rtt_ms"] is None
|
|
|
|
|
|
def test_probe_frag_needed_silent_on_none_response() -> None:
|
|
from decnet.prober.icmp_error import _probe_frag_needed
|
|
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
|
|
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
|
|
with patch("scapy.all.sr1", return_value=None):
|
|
result = _probe_frag_needed("10.0.0.9", 0.1)
|
|
assert result["returned"] is False
|
|
|
|
|
|
def test_probe_param_problem_silent_on_none_response() -> None:
|
|
from decnet.prober.icmp_error import _probe_param_problem
|
|
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
|
|
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
|
|
with patch("scapy.all.sr1", return_value=None):
|
|
result = _probe_param_problem("10.0.0.9", 0.1)
|
|
assert result["returned"] is False
|
|
|
|
|
|
def test_probe_time_exceeded_skipped_when_on_link() -> None:
|
|
from decnet.prober.icmp_error import _probe_time_exceeded
|
|
with patch("scapy.all.sr1") as mock_sr1:
|
|
result = _probe_time_exceeded("10.0.0.9", 0.1, on_link=True)
|
|
assert result["returned"] is False
|
|
mock_sr1.assert_not_called()
|
|
|
|
|
|
def test_probe_time_exceeded_silent_when_not_on_link() -> None:
|
|
from decnet.prober.icmp_error import _probe_time_exceeded
|
|
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
|
|
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
|
|
with patch("scapy.all.sr1", return_value=None):
|
|
result = _probe_time_exceeded("10.0.0.9", 0.1, on_link=False)
|
|
assert result["returned"] is False
|
|
|
|
|
|
# ─── helper: elicit_icmp_errors ──────────────────────────────────────────────
|
|
|
|
def test_elicit_returns_none_when_scapy_unavailable() -> None:
|
|
from decnet.prober.icmp_error import elicit_icmp_errors
|
|
|
|
real_import = __builtins__.__import__ if hasattr(__builtins__, "__import__") else __import__
|
|
|
|
def _import_blocker(name, *args, **kwargs):
|
|
if name.startswith("scapy"):
|
|
raise ImportError(f"mocked: {name}")
|
|
return real_import(name, *args, **kwargs)
|
|
|
|
import builtins
|
|
with patch.object(builtins, "__import__", side_effect=_import_blocker):
|
|
result = elicit_icmp_errors("10.0.0.9", 0.1)
|
|
assert result is None
|
|
|
|
|
|
def test_elicit_returns_dict_with_all_keys() -> None:
|
|
from decnet.prober.icmp_error import elicit_icmp_errors
|
|
|
|
silent = dict(_SILENT)
|
|
# At least one primitive must have sent=True or elicit returns None.
|
|
sent_silent = {**silent, "sent": True}
|
|
with (
|
|
patch("decnet.prober.icmp_error._probe_port_unreachable", return_value=sent_silent),
|
|
patch("decnet.prober.icmp_error._probe_time_exceeded", return_value=silent),
|
|
patch("decnet.prober.icmp_error._probe_frag_needed", return_value=silent),
|
|
patch("decnet.prober.icmp_error._probe_param_problem", return_value=silent),
|
|
patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")),
|
|
):
|
|
result = elicit_icmp_errors("10.0.0.9", 0.1)
|
|
|
|
assert result is not None
|
|
assert set(result.keys()) == {"matrix", "fingerprint_hash", "errors", "observed_at"}
|
|
assert set(result["errors"].keys()) == {
|
|
"port_unreachable", "time_exceeded", "frag_needed", "param_problem"
|
|
}
|
|
|
|
|
|
def test_elicit_returns_none_when_all_silent_no_caps() -> None:
|
|
from decnet.prober.icmp_error import elicit_icmp_errors
|
|
|
|
silent = dict(_SILENT) # all sent=False (PermissionError path)
|
|
with (
|
|
patch("decnet.prober.icmp_error._probe_port_unreachable", return_value=silent),
|
|
patch("decnet.prober.icmp_error._probe_time_exceeded", return_value=silent),
|
|
patch("decnet.prober.icmp_error._probe_frag_needed", return_value=silent),
|
|
patch("decnet.prober.icmp_error._probe_param_problem", return_value=silent),
|
|
patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")),
|
|
):
|
|
result = elicit_icmp_errors("10.0.0.9", 0.1)
|
|
|
|
assert result is None
|
|
|
|
|
|
def test_fingerprint_hash_stable() -> None:
|
|
from decnet.prober.icmp_error import _build_matrix, _compute_hash
|
|
|
|
errors = {
|
|
"port_unreachable": {"returned": True, "icmp_code": 3, "echo_len": 28},
|
|
"time_exceeded": {"returned": False, "icmp_code": None, "echo_len": None},
|
|
"frag_needed": {"returned": False, "icmp_code": None, "echo_len": None},
|
|
"param_problem": {"returned": False, "icmp_code": None, "echo_len": None},
|
|
}
|
|
matrix = _build_matrix(errors) # type: ignore[arg-type]
|
|
h1 = _compute_hash(matrix, errors) # type: ignore[arg-type]
|
|
h2 = _compute_hash(matrix, errors) # type: ignore[arg-type]
|
|
assert h1 == h2
|
|
assert len(h1) == 32
|
|
|
|
|
|
def test_matrix_encoding_table() -> None:
|
|
"""Matrix encodes presence/absence for all four primitives correctly."""
|
|
from decnet.prober.icmp_error import _build_matrix
|
|
|
|
def _ret(code: int | None) -> dict[str, Any]:
|
|
return {"returned": True, "icmp_code": code, "echo_len": 8}
|
|
|
|
def _sil() -> dict[str, Any]:
|
|
return {"returned": False, "icmp_code": None, "echo_len": None}
|
|
|
|
# All silent
|
|
m = _build_matrix({"port_unreachable": _sil(), "time_exceeded": _sil(), "frag_needed": _sil(), "param_problem": _sil()}) # type: ignore[arg-type]
|
|
assert m == "...."
|
|
|
|
# All returned with codes
|
|
m = _build_matrix({"port_unreachable": _ret(3), "time_exceeded": _ret(0), "frag_needed": _ret(4), "param_problem": _ret(0)}) # type: ignore[arg-type]
|
|
assert m == "PTFX"
|
|
|
|
# Mixed — first and third returned
|
|
m = _build_matrix({"port_unreachable": _ret(3), "time_exceeded": _sil(), "frag_needed": _ret(4), "param_problem": _sil()}) # type: ignore[arg-type]
|
|
assert m == "P.F."
|
|
|
|
# Returned but code is None → '~' (wrong type / parse failure)
|
|
m = _build_matrix({"port_unreachable": _ret(None), "time_exceeded": _sil(), "frag_needed": _sil(), "param_problem": _sil()}) # type: ignore[arg-type]
|
|
assert m == "~..."
|