Files
DECNET/tests/ttp/test_http_fingerprint_lifter.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

222 lines
7.4 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Per-predicate unit tests for :class:`HttpFingerprintLifter` (PR2).
Covers HFP-0001 (scanner JA4H), HFP-0002 (h2/h3 settings probe),
and HFP-0003 (QUIC probe) using synthetic CompiledRule stubs injected
directly into the lifter's RuleIndex — no YAML on disk required.
"""
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl.http_fingerprint_lifter import HttpFingerprintLifter
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
from tests.ttp._stub_store import StubRuleStore
_EMITS_BY_RULE: dict[str, tuple] = {
"HFP-0001": (("T1592", "002", "TA0043", 0.6),),
"HFP-0002": (("T1046", None, "TA0043", 0.6),),
"HFP-0003": (("T1046", None, "TA0043", 0.6),),
}
def _rule(rule_id: str, applies_to: str = "http_fingerprint") -> CompiledRule:
return CompiledRule(
rule_id=rule_id,
rule_version=1,
name=rule_id,
applies_to=frozenset({applies_to}),
match_spec={},
emits=_EMITS_BY_RULE.get(rule_id, ()),
evidence_fields=(),
state=RuleState(),
)
def _make_lifter(*rule_ids: str) -> HttpFingerprintLifter:
rules = [_rule(rid) for rid in rule_ids]
lifter = HttpFingerprintLifter(StubRuleStore(compiled=rules))
for rule in rules:
lifter._index.install(rule)
return lifter
def _ev(payload: dict[str, Any]) -> TaggerEvent:
return TaggerEvent(
source_kind="http_fingerprint",
source_id="src-fp",
attacker_uuid="att-1",
identity_uuid=None,
session_id=None,
decky_id=None,
payload=payload,
)
# ── HFP-0001: scanner JA4H prefix match ─────────────────────────────
class TestScannerJA4H:
def test_curl_h1_ja4h_fires(self):
lifter = _make_lifter("HFP-0001")
out = asyncio.run(lifter.tag(_ev({
"ja4h": "GE11nn0000_02_abc123def456_000000000000",
"protocol": "h1",
"client_ip": "1.2.3.4",
"seen_at": "2026-05-10T00:00:00Z",
})))
assert out, "HFP-0001 must fire on curl-default JA4H prefix"
assert out[0].technique_id == "T1592"
def test_curl_h2_ja4h_fires(self):
lifter = _make_lifter("HFP-0001")
out = asyncio.run(lifter.tag(_ev({
"ja4h": "GE20nn0000_02_abc123def456_000000000000",
"protocol": "h2",
})))
assert out
def test_browser_ja4h_no_fire(self):
lifter = _make_lifter("HFP-0001")
out = asyncio.run(lifter.tag(_ev({
"ja4h": "GE11cn0000_08_realbrwsr1234_000000000000",
"protocol": "h1",
})))
assert out == []
def test_missing_ja4h_no_fire(self):
lifter = _make_lifter("HFP-0001")
out = asyncio.run(lifter.tag(_ev({"protocol": "h1"})))
assert out == []
def test_evidence_keys_match_typeddict(self):
lifter = _make_lifter("HFP-0001")
out = asyncio.run(lifter.tag(_ev({
"ja4h": "GE11nn0000_02_abc123def456_000000000000",
"protocol": "h1",
"client_ip": "10.0.0.1",
"seen_at": "2026-05-10T00:00:00Z",
})))
assert out
ev = out[0].evidence
assert set(ev) == {"kind", "hash", "protocol", "client_ip", "seen_at", "raw"}
assert ev["kind"] == "ja4h"
assert ev["protocol"] == "h1"
def test_rule_not_installed_no_fire(self):
lifter = _make_lifter() # no rules installed
out = asyncio.run(lifter.tag(_ev({
"ja4h": "GE11nn0000_02_abc_000000000000",
})))
assert out == []
# ── HFP-0002: h2/h3 settings probe ──────────────────────────────────
class TestH2H3Probe:
def test_h2_settings_fires(self):
lifter = _make_lifter("HFP-0002")
out = asyncio.run(lifter.tag(_ev({
"fingerprint_type": "http2_settings",
"settings": {"HEADER_TABLE_SIZE": 65536},
"client_ip": "5.6.7.8",
"seen_at": "2026-05-10T00:00:00Z",
})))
assert out, "HFP-0002 must fire on http2_settings"
assert out[0].technique_id == "T1046"
def test_h3_settings_fires(self):
lifter = _make_lifter("HFP-0002")
out = asyncio.run(lifter.tag(_ev({
"fingerprint_type": "http3_settings",
"settings": {"QPACK_MAX_TABLE_CAPACITY": 0},
})))
assert out
ev = out[0].evidence
assert ev["protocol"] == "h3"
def test_h2_settings_evidence_carries_raw(self):
lifter = _make_lifter("HFP-0002")
settings = {"HEADER_TABLE_SIZE": 4096, "MAX_CONCURRENT_STREAMS": 100}
out = asyncio.run(lifter.tag(_ev({
"fingerprint_type": "http2_settings",
"settings": settings,
})))
assert out
assert out[0].evidence["raw"] == settings
def test_ja4h_event_does_not_fire_h2_probe(self):
lifter = _make_lifter("HFP-0002")
out = asyncio.run(lifter.tag(_ev({
"ja4h": "GE11nn0000_02_abc_000000000000",
})))
assert out == []
def test_unknown_fp_type_no_fire(self):
lifter = _make_lifter("HFP-0002")
out = asyncio.run(lifter.tag(_ev({
"fingerprint_type": "ja3",
})))
assert out == []
# ── HFP-0003: QUIC probe ─────────────────────────────────────────────
class TestQuicProbe:
def test_ja4_quic_fires(self):
lifter = _make_lifter("HFP-0003")
out = asyncio.run(lifter.tag(_ev({
"ja4_quic": "q13d0310h2_002f,0035_0403,0804_h3",
"client_ip": "9.8.7.6",
"seen_at": "2026-05-10T00:00:00Z",
})))
assert out, "HFP-0003 must fire on ja4_quic"
assert out[0].technique_id == "T1046"
def test_evidence_protocol_is_h3(self):
lifter = _make_lifter("HFP-0003")
out = asyncio.run(lifter.tag(_ev({
"ja4_quic": "q13d0310h2_002f,0035_0403,0804_h3",
})))
assert out
assert out[0].evidence["protocol"] == "h3"
assert out[0].evidence["kind"] == "ja4_quic"
def test_missing_ja4_quic_no_fire(self):
lifter = _make_lifter("HFP-0003")
out = asyncio.run(lifter.tag(_ev({"client_ip": "1.1.1.1"})))
assert out == []
# ── Combined: all three rules installed ──────────────────────────────
class TestAllRulesCombined:
def test_only_matching_rule_fires(self):
lifter = _make_lifter("HFP-0001", "HFP-0002", "HFP-0003")
# h2_settings payload should only fire HFP-0002
out = asyncio.run(lifter.tag(_ev({
"fingerprint_type": "http2_settings",
"settings": {},
})))
rule_ids = {tag.rule_id for tag in out}
assert "HFP-0002" in rule_ids
assert "HFP-0001" not in rule_ids
assert "HFP-0003" not in rule_ids
def test_empty_payload_no_errors(self):
lifter = _make_lifter("HFP-0001", "HFP-0002", "HFP-0003")
out = asyncio.run(lifter.tag(_ev({})))
assert out == []
def test_handles_only_http_fingerprint(self):
assert HttpFingerprintLifter.HANDLES == frozenset({"http_fingerprint"})