fix(xff): split leak from spoof — loopback/private claims aren't leaks

An attacker hitting /admin with `X-Forwarded-For: 127.0.0.1` was
previously flagged as an IP leak. It isn't — that's the classic
IP-allowlist / WAF-bypass payload ("treat me as localhost and skip
your auth checks"). Misclassifying it as "LEAKED IPs" in the UI
confuses analysts and burns trust in the signal.

Split by claim category. After pulling the left-most claimed IP
from the proxy header, classify:

- public (routable) → bounty_type=ip_leak (real attribution leak;
  the attacker's upstream proxy forwarded their real IP).
- loopback / private / link-local / multicast / reserved /
  unspecified → bounty_type=fingerprint, fingerprint_type=
  spoofed_source (WAF-bypass / allowlist-probing attempt; the
  attacker is telling us they know what XFF does).
- unparseable → dropped.

Same extraction pipeline; diverges only at the last step. A new
shared _classify_proxy_header_claim returns (kind, payload);
_detect_ip_leak keeps its public-only contract for backward-
compat; _detect_spoofed_source is the new sibling.

UI renderer FpSpoofedSource shows the claimed IP in warn color with
the claim_category tag (LOOPBACK / PRIVATE / ...) and a WAF-BYPASS
ATTEMPT badge — distinct visual from the "LEAKED IPs" row which
stays reserved for genuine public-IP leaks.

Test addresses updated: RFC 5737 doc ranges (198.51.100.0/24,
203.0.113.0/24) are flagged `is_reserved` in Python's ipaddress
module, so they now correctly belong to the spoof bucket — tests
that meant to exercise real public IPs now use 8.8.8.8 / 1.1.1.1 /
Cloudflare DNS. Added eleven new tests locking the classifier +
the two detectors' mutual exclusion.
This commit is contained in:
2026-04-24 18:06:29 -04:00
parent 2c876b4d86
commit 6d1d69443a
3 changed files with 285 additions and 46 deletions

View File

@@ -264,6 +264,22 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non
"payload": _leak,
})
# 2b.2 Spoofed source — attacker tried to pass a non-routable IP
# (loopback / RFC1918 / link-local / reserved) in a proxy header.
# Classic WAF-bypass: `X-Forwarded-For: 127.0.0.1` hoping an
# upstream filter waves localhost through. Distinct bounty type
# from ip_leak because the semantic is inverted — attack attempt,
# not opsec failure.
_spoof = _detect_spoofed_source(log_data, _headers)
if _spoof is not None:
await repo.add_bounty({
"decky": log_data.get("decky"),
"service": log_data.get("service"),
"attacker_ip": log_data.get("attacker_ip"),
"bounty_type": "fingerprint",
"payload": _spoof,
})
# 2c. HTTP header quirks — order + casing fingerprint per request.
# Real HTTP clients have distinctive header orderings and casing
# patterns (curl vs python-requests vs Go-http-client vs nmap vs
@@ -569,17 +585,44 @@ def _extract_claimed_ip(headers: dict[str, Any]) -> tuple[Optional[str], Optiona
return None, None
def _detect_ip_leak(
log_data: dict[str, Any], headers: dict[str, Any],
) -> Optional[dict[str, Any]]:
"""Return a bounty payload iff an attribution-leak mismatch is present.
def _categorize_claimed_ip(ip: str) -> str:
"""Return a category label for a claimed IP string.
See :data:`_PROXY_HEADERS` for the set of headers checked. A leak is
claimed when:
- the TCP source IP is NOT in ``DECNET_TRUSTED_PROXIES``,
- a proxy-family header is present with a parseable IP, and
- that IP differs from the TCP source.
Otherwise returns ``None``.
Public routable addresses are potential real-IP leaks. Anything
else (loopback, private, link-local, multicast, reserved,
unspecified) is almost certainly a forgery — XFF spoofing is the
classic WAF-bypass / IP-allowlist trick. Callers branch on this:
``public`` → :data:`ip_leak` bounty, anything else →
``spoofed_source`` fingerprint bounty.
"""
try:
addr = ipaddress.ip_address(ip)
except (ValueError, TypeError):
return "unparseable"
if addr.is_unspecified:
return "unspecified"
if addr.is_loopback:
return "loopback"
if addr.is_link_local:
return "link_local"
if addr.is_multicast:
return "multicast"
if addr.is_reserved:
return "reserved"
if addr.is_private:
return "private"
return "public"
def _classify_proxy_header_claim(
log_data: dict[str, Any], headers: dict[str, Any],
) -> Optional[tuple[str, dict[str, Any]]]:
"""Shared worker for the two XFF-family detectors.
Returns ``(kind, payload)`` where ``kind`` is ``"leak"`` (public
claim, real attribution leak) or ``"spoof"`` (non-routable claim,
WAF-bypass attempt). Returns ``None`` for non-HTTP / trusted-proxy
source / no proxy header / claim matches source / unparseable claim.
"""
if log_data.get("service") != "http":
return None
@@ -595,23 +638,69 @@ def _detect_ip_leak(
if claimed is None or claimed == source_ip:
return None
# Keep only the proxy-family values in the echoed-back metadata so
# the bounty payload stays compact.
seen = {}
category = _categorize_claimed_ip(claimed)
if category == "unparseable":
return None
seen: dict[str, str] = {}
for h in _PROXY_HEADERS:
raw = _lookup_header(headers, h)
if raw is not None:
seen[h] = raw
# Identity-only payload — add_bounty dedups on the full payload
# string, so per-request method/path would create one row per URL
# the attacker hits with the same leaked IP. The bounty represents
# the LEAK itself, not each individual request.
return {
base = {
"source_ip": source_ip,
"real_ip_claim": claimed,
"claimed_ip": claimed,
"source_header": header_name,
"headers_seen": seen,
"claim_category": category,
}
return ("leak" if category == "public" else "spoof"), base
def _detect_ip_leak(
log_data: dict[str, Any], headers: dict[str, Any],
) -> Optional[dict[str, Any]]:
"""Return an ip_leak bounty payload iff a PUBLIC proxy-claim
mismatch is present — an attacker whose misconfigured VPN / proxy
forwarded their real routable IP in an XFF-family header. Returns
``None`` for spoofing attempts (loopback / private / link-local /
etc.); those land as ``spoofed_source`` fingerprints instead.
"""
result = _classify_proxy_header_claim(log_data, headers)
if result is None or result[0] != "leak":
return None
payload = result[1]
# Preserve the legacy field name so existing UI consumers
# (AttackerDetail "LEAKED IPs" row, repo JSON decode) keep working.
payload["real_ip_claim"] = payload.pop("claimed_ip")
payload.pop("claim_category", None) # always "public" for leaks
return payload
def _detect_spoofed_source(
log_data: dict[str, Any], headers: dict[str, Any],
) -> Optional[dict[str, Any]]:
"""Return a fingerprint payload iff a NON-ROUTABLE proxy-claim
is present — the attacker tried to pass loopback / private /
link-local / reserved / etc. in an XFF-family header.
That's the classic IP-allowlist / WAF-bypass trick: ``curl -H
'X-Forwarded-For: 127.0.0.1'`` hoping an upstream WAF sees
"localhost" and waves them through. No leak of their real IP;
they're telling us "I know what this header does."
Caller wraps this in ``bounty_type="fingerprint"`` with
``fingerprint_type="spoofed_source"``.
"""
result = _classify_proxy_header_claim(log_data, headers)
if result is None or result[0] != "spoof":
return None
_, payload = result
# Promote to fingerprint_type for the UI renderer dispatcher.
return {
"fingerprint_type": "spoofed_source",
**payload,
}