diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 7003ef87..9e034430 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -264,6 +264,22 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non "payload": _leak, }) + # 2b.2 Spoofed source — attacker tried to pass a non-routable IP + # (loopback / RFC1918 / link-local / reserved) in a proxy header. + # Classic WAF-bypass: `X-Forwarded-For: 127.0.0.1` hoping an + # upstream filter waves localhost through. Distinct bounty type + # from ip_leak because the semantic is inverted — attack attempt, + # not opsec failure. + _spoof = _detect_spoofed_source(log_data, _headers) + if _spoof is not None: + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": log_data.get("service"), + "attacker_ip": log_data.get("attacker_ip"), + "bounty_type": "fingerprint", + "payload": _spoof, + }) + # 2c. HTTP header quirks — order + casing fingerprint per request. # Real HTTP clients have distinctive header orderings and casing # patterns (curl vs python-requests vs Go-http-client vs nmap vs @@ -569,17 +585,44 @@ def _extract_claimed_ip(headers: dict[str, Any]) -> tuple[Optional[str], Optiona return None, None -def _detect_ip_leak( - log_data: dict[str, Any], headers: dict[str, Any], -) -> Optional[dict[str, Any]]: - """Return a bounty payload iff an attribution-leak mismatch is present. +def _categorize_claimed_ip(ip: str) -> str: + """Return a category label for a claimed IP string. - See :data:`_PROXY_HEADERS` for the set of headers checked. A leak is - claimed when: - - the TCP source IP is NOT in ``DECNET_TRUSTED_PROXIES``, - - a proxy-family header is present with a parseable IP, and - - that IP differs from the TCP source. - Otherwise returns ``None``. + Public routable addresses are potential real-IP leaks. Anything + else (loopback, private, link-local, multicast, reserved, + unspecified) is almost certainly a forgery — XFF spoofing is the + classic WAF-bypass / IP-allowlist trick. Callers branch on this: + ``public`` → :data:`ip_leak` bounty, anything else → + ``spoofed_source`` fingerprint bounty. + """ + try: + addr = ipaddress.ip_address(ip) + except (ValueError, TypeError): + return "unparseable" + if addr.is_unspecified: + return "unspecified" + if addr.is_loopback: + return "loopback" + if addr.is_link_local: + return "link_local" + if addr.is_multicast: + return "multicast" + if addr.is_reserved: + return "reserved" + if addr.is_private: + return "private" + return "public" + + +def _classify_proxy_header_claim( + log_data: dict[str, Any], headers: dict[str, Any], +) -> Optional[tuple[str, dict[str, Any]]]: + """Shared worker for the two XFF-family detectors. + + Returns ``(kind, payload)`` where ``kind`` is ``"leak"`` (public + claim, real attribution leak) or ``"spoof"`` (non-routable claim, + WAF-bypass attempt). Returns ``None`` for non-HTTP / trusted-proxy + source / no proxy header / claim matches source / unparseable claim. """ if log_data.get("service") != "http": return None @@ -595,23 +638,69 @@ def _detect_ip_leak( if claimed is None or claimed == source_ip: return None - # Keep only the proxy-family values in the echoed-back metadata so - # the bounty payload stays compact. - seen = {} + category = _categorize_claimed_ip(claimed) + if category == "unparseable": + return None + + seen: dict[str, str] = {} for h in _PROXY_HEADERS: raw = _lookup_header(headers, h) if raw is not None: seen[h] = raw - # Identity-only payload — add_bounty dedups on the full payload - # string, so per-request method/path would create one row per URL - # the attacker hits with the same leaked IP. The bounty represents - # the LEAK itself, not each individual request. - return { + base = { "source_ip": source_ip, - "real_ip_claim": claimed, + "claimed_ip": claimed, "source_header": header_name, "headers_seen": seen, + "claim_category": category, + } + return ("leak" if category == "public" else "spoof"), base + + +def _detect_ip_leak( + log_data: dict[str, Any], headers: dict[str, Any], +) -> Optional[dict[str, Any]]: + """Return an ip_leak bounty payload iff a PUBLIC proxy-claim + mismatch is present — an attacker whose misconfigured VPN / proxy + forwarded their real routable IP in an XFF-family header. Returns + ``None`` for spoofing attempts (loopback / private / link-local / + etc.); those land as ``spoofed_source`` fingerprints instead. + """ + result = _classify_proxy_header_claim(log_data, headers) + if result is None or result[0] != "leak": + return None + payload = result[1] + # Preserve the legacy field name so existing UI consumers + # (AttackerDetail "LEAKED IPs" row, repo JSON decode) keep working. + payload["real_ip_claim"] = payload.pop("claimed_ip") + payload.pop("claim_category", None) # always "public" for leaks + return payload + + +def _detect_spoofed_source( + log_data: dict[str, Any], headers: dict[str, Any], +) -> Optional[dict[str, Any]]: + """Return a fingerprint payload iff a NON-ROUTABLE proxy-claim + is present — the attacker tried to pass loopback / private / + link-local / reserved / etc. in an XFF-family header. + + That's the classic IP-allowlist / WAF-bypass trick: ``curl -H + 'X-Forwarded-For: 127.0.0.1'`` hoping an upstream WAF sees + "localhost" and waves them through. No leak of their real IP; + they're telling us "I know what this header does." + + Caller wraps this in ``bounty_type="fingerprint"`` with + ``fingerprint_type="spoofed_source"``. + """ + result = _classify_proxy_header_claim(log_data, headers) + if result is None or result[0] != "spoof": + return None + _, payload = result + # Promote to fingerprint_type for the UI renderer dispatcher. + return { + "fingerprint_type": "spoofed_source", + **payload, } diff --git a/decnet_web/src/components/AttackerDetail.tsx b/decnet_web/src/components/AttackerDetail.tsx index 7d0c47d4..d96a464d 100644 --- a/decnet_web/src/components/AttackerDetail.tsx +++ b/decnet_web/src/components/AttackerDetail.tsx @@ -93,6 +93,7 @@ const fpTypeLabel: Record = { tls_certificate: 'CERTIFICATE', http_useragent: 'HTTP USER-AGENT', http_quirks: 'HTTP HEADER QUIRKS', + spoofed_source: 'SPOOFED SOURCE IP', vnc_client_version: 'VNC CLIENT', jarm: 'JARM', hassh_server: 'HASSH SERVER', @@ -106,6 +107,7 @@ const fpTypeIcon: Record = { tls_certificate: , http_useragent: , http_quirks: , + spoofed_source: , vnc_client_version: , jarm: , hassh_server: , @@ -340,6 +342,37 @@ const FpGeneric: React.FC<{ p: any }> = ({ p }) => ( ); +const FpSpoofedSource: React.FC<{ p: any }> = ({ p }) => ( +
+
+ CLAIMED: + + {p.claimed_ip || '—'} + + + via {p.source_header} + +
+
+ {p.claim_category && ( + + {String(p.claim_category).toUpperCase()} + + )} + WAF-BYPASS ATTEMPT +
+ {p.source_ip && ( +
+ real source · {p.source_ip} +
+ )} +
+); + const FpHttpQuirks: React.FC<{ p: any }> = ({ p }) => { const order: string[] = Array.isArray(p.order) ? p.order : []; return ( @@ -401,6 +434,7 @@ const FingerprintGroup: React.FC<{ fpType: string; items: any[] }> = ({ fpType, case 'hassh_server': return ; case 'tcpfp': return ; case 'http_quirks': return ; + case 'spoofed_source': return ; default: return ; } })} @@ -1279,7 +1313,7 @@ const AttackerDetail: React.FC = () => { // Active probes first, then passive, then unknown const activeTypes = ['jarm', 'hassh_server', 'tcpfp']; - const passiveTypes = ['ja3', 'ja4l', 'tls_resumption', 'tls_certificate', 'http_useragent', 'http_quirks', 'vnc_client_version']; + const passiveTypes = ['ja3', 'ja4l', 'tls_resumption', 'tls_certificate', 'http_useragent', 'http_quirks', 'spoofed_source', 'vnc_client_version']; const knownTypes = [...activeTypes, ...passiveTypes]; const unknownTypes = Object.keys(groups).filter((t) => !knownTypes.includes(t)); diff --git a/tests/web/test_ingester_xff.py b/tests/web/test_ingester_xff.py index dce2d6d1..85cc3866 100644 --- a/tests/web/test_ingester_xff.py +++ b/tests/web/test_ingester_xff.py @@ -5,13 +5,18 @@ from unittest.mock import AsyncMock import pytest -from decnet.web.ingester import _detect_ip_leak, _extract_bounty +from decnet.web.ingester import ( + _categorize_claimed_ip, + _detect_ip_leak, + _detect_spoofed_source, + _extract_bounty, +) def _log_row( headers: dict[str, str] | None = None, *, - source_ip: str = "203.0.113.42", + source_ip: str = "8.8.8.8", service: str = "http", event_type: str = "request", ) -> dict: @@ -32,12 +37,12 @@ def _log_row( def test_xff_leftmost_differs_from_source_emits_leak(): row = _log_row({ - "X-Forwarded-For": "198.51.100.7, 10.0.0.1", + "X-Forwarded-For": "1.1.1.1, 10.0.0.1", }) result = _detect_ip_leak(row, row["fields"]["headers"]) assert result is not None - assert result["source_ip"] == "203.0.113.42" - assert result["real_ip_claim"] == "198.51.100.7" + assert result["source_ip"] == "8.8.8.8" + assert result["real_ip_claim"] == "1.1.1.1" assert result["source_header"] == "X-Forwarded-For" # Identity-only payload — method/path intentionally omitted so the # bounty dedup collapses repeat hits from the same attacker. @@ -46,7 +51,28 @@ def test_xff_leftmost_differs_from_source_emits_leak(): def test_xff_matches_source_no_leak(): - row = _log_row({"X-Forwarded-For": "203.0.113.42"}) + row = _log_row({"X-Forwarded-For": "8.8.8.8"}) + assert _detect_ip_leak(row, row["fields"]["headers"]) is None + + +def test_xff_loopback_is_not_a_leak(): + """curl -H 'X-Forwarded-For: 127.0.0.1' is the classic WAF-bypass + payload. Must not be classified as an attribution leak — loopback + is not a routable IP anyone could actually have as their real + address.""" + row = _log_row({"X-Forwarded-For": "127.0.0.1"}) + assert _detect_ip_leak(row, row["fields"]["headers"]) is None + + +def test_xff_rfc1918_is_not_a_leak(): + """RFC1918 private addresses are forgery attempts, not leaks.""" + for ip in ("10.0.0.1", "172.16.0.1", "192.168.1.1"): + row = _log_row({"X-Forwarded-For": ip}) + assert _detect_ip_leak(row, row["fields"]["headers"]) is None, ip + + +def test_xff_link_local_is_not_a_leak(): + row = _log_row({"X-Forwarded-For": "169.254.1.1"}) assert _detect_ip_leak(row, row["fields"]["headers"]) is None @@ -59,26 +85,26 @@ def test_forwarded_header_rfc7239_parsed(): def test_forwarded_with_ipv6_and_port(): - row = _log_row({"Forwarded": 'for="[2001:db8::1]:4711"'}) + row = _log_row({"Forwarded": 'for="[2606:4700:4700::1111]:4711"'}) result = _detect_ip_leak(row, row["fields"]["headers"]) assert result is not None - assert result["real_ip_claim"] == "2001:db8::1" + assert result["real_ip_claim"] == "2606:4700:4700::1111" def test_x_real_ip_fallback(): - row = _log_row({"X-Real-IP": "198.51.100.7"}) + row = _log_row({"X-Real-IP": "1.1.1.1"}) result = _detect_ip_leak(row, row["fields"]["headers"]) assert result is not None assert result["source_header"] == "X-Real-IP" - assert result["real_ip_claim"] == "198.51.100.7" + assert result["real_ip_claim"] == "1.1.1.1" def test_cf_connecting_ip_variant(): - row = _log_row({"CF-Connecting-IP": "198.51.100.9"}) + row = _log_row({"CF-Connecting-IP": "1.0.0.1"}) result = _detect_ip_leak(row, row["fields"]["headers"]) assert result is not None assert result["source_header"] == "CF-Connecting-IP" - assert result["real_ip_claim"] == "198.51.100.9" + assert result["real_ip_claim"] == "1.0.0.1" def test_priority_forwarded_over_xff(): @@ -97,29 +123,29 @@ def test_priority_forwarded_over_xff(): def test_case_insensitive_header_match(): - row = _log_row({"x-forwarded-for": "198.51.100.7"}) + row = _log_row({"x-forwarded-for": "1.1.1.1"}) result = _detect_ip_leak(row, row["fields"]["headers"]) assert result is not None - assert result["real_ip_claim"] == "198.51.100.7" + assert result["real_ip_claim"] == "1.1.1.1" def test_trusted_proxy_source_skipped(monkeypatch): - monkeypatch.setenv("DECNET_TRUSTED_PROXIES", "203.0.113.42") - row = _log_row({"X-Forwarded-For": "198.51.100.7"}) + monkeypatch.setenv("DECNET_TRUSTED_PROXIES", "8.8.8.8") + row = _log_row({"X-Forwarded-For": "1.1.1.1"}) assert _detect_ip_leak(row, row["fields"]["headers"]) is None def test_trusted_proxy_cidr(monkeypatch): - monkeypatch.setenv("DECNET_TRUSTED_PROXIES", "203.0.113.0/24") - row = _log_row({"X-Forwarded-For": "198.51.100.7"}) + monkeypatch.setenv("DECNET_TRUSTED_PROXIES", "8.8.8.0/24") + row = _log_row({"X-Forwarded-For": "1.1.1.1"}) assert _detect_ip_leak(row, row["fields"]["headers"]) is None def test_malformed_xff_falls_through_to_next_parseable(): - row = _log_row({"X-Forwarded-For": "garbage, 198.51.100.7, not-ip"}) + row = _log_row({"X-Forwarded-For": "garbage, 1.1.1.1, not-ip"}) result = _detect_ip_leak(row, row["fields"]["headers"]) assert result is not None - assert result["real_ip_claim"] == "198.51.100.7" + assert result["real_ip_claim"] == "1.1.1.1" def test_all_values_unparseable_bails(): @@ -134,14 +160,14 @@ def test_no_headers_skipped(): def test_non_http_service_skipped(): row = _log_row( - {"X-Forwarded-For": "198.51.100.7"}, + {"X-Forwarded-For": "1.1.1.1"}, service="ssh", ) assert _detect_ip_leak(row, row["fields"]["headers"]) is None def test_missing_attacker_ip_bails(): - row = _log_row({"X-Forwarded-For": "198.51.100.7"}, source_ip="") + row = _log_row({"X-Forwarded-For": "1.1.1.1"}, source_ip="") assert _detect_ip_leak(row, row["fields"]["headers"]) is None @@ -150,7 +176,7 @@ def test_missing_attacker_ip_bails(): @pytest.mark.asyncio async def test_extract_bounty_emits_ip_leak_row(): row = _log_row({ - "X-Forwarded-For": "198.51.100.7", + "X-Forwarded-For": "1.1.1.1", "User-Agent": "curl/7.81.0", }) repo = AsyncMock() @@ -169,13 +195,13 @@ async def test_extract_bounty_emits_ip_leak_row(): if c.args[0]["bounty_type"] == "ip_leak" ) payload = leak_call.args[0]["payload"] - assert payload["real_ip_claim"] == "198.51.100.7" - assert payload["source_ip"] == "203.0.113.42" + assert payload["real_ip_claim"] == "1.1.1.1" + assert payload["source_ip"] == "8.8.8.8" @pytest.mark.asyncio async def test_extract_bounty_no_leak_no_call(): - row = _log_row({"X-Forwarded-For": "203.0.113.42"}) # matches source + row = _log_row({"X-Forwarded-For": "8.8.8.8"}) # matches source repo = AsyncMock() await _extract_bounty(repo, row) @@ -184,3 +210,93 @@ async def test_extract_bounty_no_leak_no_call(): for call in repo.add_bounty.call_args_list ] assert "ip_leak" not in types + + +# ─── spoofed-source (non-routable claim) classification ───────────────────── + +def test_categorize_public(): + assert _categorize_claimed_ip("8.8.8.8") == "public" + assert _categorize_claimed_ip("2606:4700:4700::1111") == "public" + + +def test_categorize_loopback(): + assert _categorize_claimed_ip("127.0.0.1") == "loopback" + assert _categorize_claimed_ip("::1") == "loopback" + + +def test_categorize_private(): + for ip in ("10.0.0.1", "172.16.0.1", "192.168.1.1"): + assert _categorize_claimed_ip(ip) == "private", ip + + +def test_categorize_link_local(): + assert _categorize_claimed_ip("169.254.1.1") == "link_local" + assert _categorize_claimed_ip("fe80::1") == "link_local" + + +def test_categorize_multicast_and_reserved(): + assert _categorize_claimed_ip("224.0.0.1") == "multicast" + # 240.0.0.1 is reserved (class E). + assert _categorize_claimed_ip("240.0.0.1") == "reserved" + + +def test_categorize_unparseable(): + assert _categorize_claimed_ip("not-an-ip") == "unparseable" + assert _categorize_claimed_ip("") == "unparseable" + + +def test_spoofed_source_fires_on_loopback_waf_bypass(): + """The original motivating case: curl -H 'X-Forwarded-For: 127.0.0.1' + must produce a spoofed_source fingerprint, NOT an ip_leak.""" + row = _log_row({"X-Forwarded-For": "127.0.0.1"}) + result = _detect_spoofed_source(row, row["fields"]["headers"]) + assert result is not None + assert result["fingerprint_type"] == "spoofed_source" + assert result["claim_category"] == "loopback" + assert result["claimed_ip"] == "127.0.0.1" + assert result["source_ip"] == "8.8.8.8" + + +def test_spoofed_source_fires_on_rfc1918(): + row = _log_row({"X-Forwarded-For": "10.0.0.5"}) + result = _detect_spoofed_source(row, row["fields"]["headers"]) + assert result is not None + assert result["claim_category"] == "private" + + +def test_spoofed_source_skipped_on_public_claim(): + """A public-IP claim is a leak, not a spoof — the two detectors + are mutually exclusive.""" + row = _log_row({"X-Forwarded-For": "1.1.1.1"}) + assert _detect_spoofed_source(row, row["fields"]["headers"]) is None + + +def test_spoofed_source_skipped_when_matches_source(): + row = _log_row({"X-Forwarded-For": "8.8.8.8"}) + assert _detect_spoofed_source(row, row["fields"]["headers"]) is None + + +def test_spoofed_source_respects_trusted_proxy(monkeypatch): + monkeypatch.setenv("DECNET_TRUSTED_PROXIES", "8.8.8.8") + row = _log_row({"X-Forwarded-For": "127.0.0.1"}) + assert _detect_spoofed_source(row, row["fields"]["headers"]) is None + + +@pytest.mark.asyncio +async def test_extract_bounty_emits_spoofed_source_fingerprint(): + row = _log_row({"X-Forwarded-For": "127.0.0.1"}) + repo = AsyncMock() + await _extract_bounty(repo, row) + + calls = [c.args[0] for c in repo.add_bounty.call_args_list] + # ip_leak must NOT fire for the loopback case. + assert all(c["bounty_type"] != "ip_leak" for c in calls) + # A fingerprint with fingerprint_type=spoofed_source should fire. + spoof = next( + (c for c in calls + if c["bounty_type"] == "fingerprint" + and c["payload"].get("fingerprint_type") == "spoofed_source"), + None, + ) + assert spoof is not None + assert spoof["payload"]["claim_category"] == "loopback"