Files
DECNET/tests/web/test_ingester_http_quirks.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

228 lines
8.3 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""HTTP header-quirks fingerprint extraction in the ingester."""
from __future__ import annotations
from unittest.mock import AsyncMock
import pytest
from decnet.web.ingester import (
_casing_category,
_guess_tool_from_order,
_http_quirks_fingerprint,
_short_hash,
_extract_bounty,
)
def _log_row(headers: dict[str, str], *, service: str = "http") -> dict:
return {
"decky": "http-01",
"service": service,
"attacker_ip": "1.2.3.4",
"event_type": "request",
"fields": {
"method": "GET",
"path": "/",
"headers": headers,
},
}
# ─── casing classifier ─────────────────────────────────────────────────────
def test_casing_title():
assert _casing_category("User-Agent") == "title"
assert _casing_category("Host") == "title"
assert _casing_category("X-Forwarded-For") == "title"
def test_casing_lower():
assert _casing_category("user-agent") == "lower"
assert _casing_category("x-forwarded-for") == "lower"
def test_casing_upper():
assert _casing_category("USER-AGENT") == "upper"
def test_casing_mixed():
assert _casing_category("USer-AgEnt") == "mixed"
# ─── order + casing hash stability ──────────────────────────────────────────
def test_same_order_same_hash():
row_a = _log_row({"Host": "x", "User-Agent": "curl/8", "Accept": "*/*"})
row_b = _log_row({"Host": "y", "User-Agent": "curl/7", "Accept": "*/*"})
fa = _http_quirks_fingerprint(row_a, row_a["fields"]["headers"])
fb = _http_quirks_fingerprint(row_b, row_b["fields"]["headers"])
assert fa["order_hash"] == fb["order_hash"]
assert fa["casing_hash"] == fb["casing_hash"]
def test_different_order_different_hash():
row_a = _log_row({"Host": "x", "User-Agent": "a", "Accept": "*/*"})
row_b = _log_row({"Accept": "*/*", "User-Agent": "a", "Host": "x"})
fa = _http_quirks_fingerprint(row_a, row_a["fields"]["headers"])
fb = _http_quirks_fingerprint(row_b, row_b["fields"]["headers"])
assert fa["order_hash"] != fb["order_hash"]
def test_different_casing_different_hash():
row_a = _log_row({"Host": "x", "User-Agent": "a"})
row_b = _log_row({"host": "x", "user-agent": "a"})
fa = _http_quirks_fingerprint(row_a, row_a["fields"]["headers"])
fb = _http_quirks_fingerprint(row_b, row_b["fields"]["headers"])
assert fa["casing_hash"] != fb["casing_hash"]
assert fa["casing_category"] == "title"
assert fb["casing_category"] == "lower"
def test_volatile_headers_excluded_from_hash():
"""Content-Length, Cookie, XFF etc. are per-request; the identity
hash must not depend on them, otherwise two requests from the same
stack — one with Cookie, one without — would dedup-miss at the
bounty layer and spam the AttackerDetail page."""
row_a = _log_row({
"Host": "x", "User-Agent": "a", "Content-Length": "100",
})
row_b = _log_row({
"Host": "x", "User-Agent": "a", "Content-Length": "999",
"Cookie": "session=abc",
})
fa = _http_quirks_fingerprint(row_a, row_a["fields"]["headers"])
fb = _http_quirks_fingerprint(row_b, row_b["fields"]["headers"])
# Whole payload must be identical — add_bounty dedups on the full
# serialized payload, so ANY per-request-varying field would spawn
# new rows. This assertion is the contract.
assert fa == fb
assert fa["stable_count"] == 2
# ─── tool guesses ──────────────────────────────────────────────────────────
def test_curl_signature_guessed():
assert _guess_tool_from_order(["host", "user-agent", "accept"]) == "curl"
def test_python_requests_signature_guessed():
assert _guess_tool_from_order([
"host", "user-agent", "accept-encoding", "accept", "connection",
]) == "python-requests"
def test_go_http_client_signature_guessed():
assert _guess_tool_from_order([
"host", "user-agent", "accept-encoding",
]) == "go-http-client"
def test_nmap_nse_signature_guessed():
# Short order starting with host, user-agent → nmap-nse.
assert _guess_tool_from_order(["host", "user-agent"]) == "nmap-nse"
def test_unknown_tool_returns_none():
assert _guess_tool_from_order(["accept", "host", "user-agent"]) is None
def test_fingerprint_includes_tool_guess_curl():
row = _log_row({
"Host": "target", "User-Agent": "curl/8.0", "Accept": "*/*",
})
f = _http_quirks_fingerprint(row, row["fields"]["headers"])
assert f["tool_guess"] == "curl"
# ─── gating ─────────────────────────────────────────────────────────────────
def test_non_http_service_skipped():
row = _log_row({"Host": "x"}, service="ssh")
assert _http_quirks_fingerprint(row, row["fields"]["headers"]) is None
def test_empty_headers_skipped():
row = _log_row({})
assert _http_quirks_fingerprint(row, {}) is None
def test_only_volatile_headers_still_emits():
"""If every header is in the volatile set we still want a fingerprint,
just with empty order — "zero stable headers" is itself a signal."""
row = _log_row({"Content-Length": "10", "Cookie": "a=b"})
f = _http_quirks_fingerprint(row, row["fields"]["headers"])
assert f is not None
assert f["stable_count"] == 0
assert f["order"] == []
# ─── end-to-end via _extract_bounty ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_extract_bounty_emits_http_quirks():
row = _log_row({
"Host": "target", "User-Agent": "curl/8.0", "Accept": "*/*",
})
repo = AsyncMock()
await _extract_bounty(repo, row)
calls = [
c.args[0] for c in repo.add_bounty.call_args_list
]
# Expect: http_useragent fingerprint + http_quirks fingerprint.
fp_types = [
c["payload"].get("fingerprint_type")
for c in calls
if c["bounty_type"] == "fingerprint"
]
assert "http_useragent" in fp_types
assert "http_quirks" in fp_types
quirks = next(
c for c in calls
if c["bounty_type"] == "fingerprint"
and c["payload"].get("fingerprint_type") == "http_quirks"
)
assert quirks["payload"]["tool_guess"] == "curl"
assert quirks["payload"]["casing_category"] == "title"
@pytest.mark.asyncio
async def test_extract_bounty_non_http_skips_quirks():
row = _log_row({"Host": "x"}, service="ssh")
repo = AsyncMock()
await _extract_bounty(repo, row)
for call in repo.add_bounty.call_args_list:
payload = call.args[0].get("payload") or {}
assert payload.get("fingerprint_type") != "http_quirks"
def test_payload_stable_across_paths_and_methods():
"""Two requests from the same stack hitting different paths/methods
must produce byte-identical payloads so (ip, type, payload) dedup
collapses them into one bounty row. If this test breaks, check
whether a per-request field snuck back into _http_quirks_fingerprint."""
headers = {"Host": "target", "User-Agent": "curl/8.0", "Accept": "*/*"}
row_get = {
"decky": "http-01", "service": "http", "attacker_ip": "1.2.3.4",
"event_type": "request",
"fields": {"method": "GET", "path": "/admin", "headers": headers},
}
row_post = {
"decky": "http-01", "service": "http", "attacker_ip": "1.2.3.4",
"event_type": "request",
"fields": {"method": "POST", "path": "/wp-login.php", "headers": headers},
}
fa = _http_quirks_fingerprint(row_get, headers)
fb = _http_quirks_fingerprint(row_post, headers)
assert fa == fb, "payload must not depend on request method/path"
# ─── hash stability across restarts ─────────────────────────────────────────
def test_short_hash_deterministic():
assert _short_hash("abc") == _short_hash("abc")
assert _short_hash("abc") != _short_hash("def")
assert len(_short_hash("anything")) == 16