Sends four crafted stimuli (UDP/closed-port, TTL=1, DF+oversized,
bad IP option) and records which ICMP error classes come back, the
per-error RTT, and the bytes echoed in each ICMP body. Absence is
as informative as a reply — Linux rate-limiting is a fingerprint signal.
Returns None when no packets could be sent (no CAP_NET_RAW), so the
probe is a no-op in non-root test environments. Port-free ActiveProbe
subclass (priority=850), metaclass auto-registered in the registry.
Also fixes three sets of stale tests left over from the TlsCertProbe
migration (4b2759e0):
- test_active_probe_registry: closed name/order sets updated for
tls_certificate and icmp_error
- test_prober_rotation: dead patches on worker.fetch_leaf_cert removed
- test_prober_worker (TestProbeCycleTLSCert): rewritten to test
TlsCertProbe as an independent registry probe, patch target updated
from worker.fetch_leaf_cert to probes.tlscert_probe.fetch_leaf_cert
73 lines
2.8 KiB
Python
73 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from decnet.logging import get_logger
|
|
from decnet.prober.base import ActiveProbe
|
|
|
|
_log = get_logger("prober.icmp_error_probe")
|
|
|
|
|
|
class IcmpErrorProbe(ActiveProbe):
|
|
"""Port-free probe that elicits ICMP error replies from the attacker.
|
|
|
|
Sends four crafted stimuli (UDP/closed-port, TTL=1, DF+oversized, bad
|
|
IP option) and records which ICMP error classes the target emits, the
|
|
per-error RTT, and bytes echoed back in each ICMP error body.
|
|
|
|
Silent responses are as fingerprint-worthy as replies: Linux emits at
|
|
most 1 ICMP error/sec, so rate-limited absences reveal OS behaviour.
|
|
|
|
Requires root / CAP_NET_RAW. Scapy is lazy-imported inside the helper.
|
|
"""
|
|
|
|
probe_name = "icmp_error"
|
|
default_ports: list[int | None] = [None]
|
|
event_type = "icmp_error_leak"
|
|
priority = 850 # after TCP/TLS (100-200), before ipv6_leak (999)
|
|
|
|
def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None:
|
|
from decnet.prober.icmp_error import elicit_icmp_errors
|
|
return elicit_icmp_errors(ip, timeout=timeout)
|
|
|
|
def syslog_fields(
|
|
self, ip: str, port: int | None, result: dict[str, Any]
|
|
) -> tuple[dict[str, Any], str]:
|
|
matrix = result.get("matrix", "")
|
|
fp_hash = result.get("fingerprint_hash", "")
|
|
errors = result.get("errors", {})
|
|
|
|
def _flag(key: str) -> str:
|
|
return "1" if errors.get(key, {}).get("returned", False) else "0"
|
|
|
|
def _rtt(key: str) -> str:
|
|
v = errors.get(key, {}).get("rtt_ms")
|
|
return str(v) if v is not None else ""
|
|
|
|
fields: dict[str, Any] = {
|
|
"icmp_matrix": matrix,
|
|
"icmp_fp_hash": fp_hash,
|
|
"icmp_port_unreach": _flag("port_unreachable"),
|
|
"icmp_time_exceeded": _flag("time_exceeded"),
|
|
"icmp_frag_needed": _flag("frag_needed"),
|
|
"icmp_param_problem": _flag("param_problem"),
|
|
"icmp_port_unreach_rtt_ms": _rtt("port_unreachable"),
|
|
"icmp_time_exceeded_rtt_ms": _rtt("time_exceeded"),
|
|
"icmp_frag_needed_rtt_ms": _rtt("frag_needed"),
|
|
"icmp_param_problem_rtt_ms": _rtt("param_problem"),
|
|
"icmp_time_exceeded_hop": errors.get("time_exceeded", {}).get("src_ip") or "",
|
|
}
|
|
msg = f"ICMP leak {ip} → matrix={matrix} fp={fp_hash[:8]}"
|
|
return fields, msg
|
|
|
|
def publish_payload(
|
|
self, ip: str, port: int | None, result: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"attacker_ip": ip,
|
|
"icmp_matrix": result.get("matrix", ""),
|
|
"icmp_fp_hash": result.get("fingerprint_hash", ""),
|
|
"errors": result.get("errors", {}),
|
|
"observed_at": result.get("observed_at", ""),
|
|
}
|