feat(canary): API-trashing defense — 4-layer fingerprint validation
Adds per-mint nonce gating, structural shape validation, mint UUID consistency checks, and a per-(token, IP) rate limiter to the canary worker so attackers who extract a canary from a decky filesystem cannot poison fingerprint forensics by replaying or forging ?d= submissions. Changes: base.py fingerprint_nonce: Optional[str] added to CanaryArtifact so generators can surface the nonce to the cultivator without coupling the generator directly to DB code. obfuscator.py nonce_for(callback_token, mint_uuid): HMAC-SHA256 keyed on DECNET_CANARY_FINGERPRINT_SECRET, truncated to 16 hex chars. FingerprintSecretMissing raised at mint time if env var is unset. render_fingerprint_js() now accepts nonce= and substitutes MINT_NONCE. fingerprint_payload.js New MINT_NONCE placeholder. Appended as &k= on all beacon URLs (bare-open, single-shot, chunked). Using &k= avoids colliding with &n= (chunk total). fingerprint_html.py / fingerprint_svg.py Derive nonce via nonce_for() and pass to render_fingerprint_js(). Set artifact.fingerprint_nonce so the cultivator can persist it. cultivator.py Passes fingerprint_nonce into create_canary_token() when present on the artifact; NULL for all non-fingerprint generators. canary.py (model) fingerprint_nonce: Optional[str] = Field(default=None, max_length=16) added to CanaryToken. None for non-fingerprint tokens. worker.py _extract_fingerprint now returns (meta_dict, parsed_fp) tuple. _record_hit accepts parsed_fp + raw_nonce and runs 4 layers after token lookup: nonce match, shape check, mint UUID consistency, rate limit. Each failure sets _fp_invalid_* flag and drops structured _fp. Trigger row always lands regardless. tests/canary/conftest.py Session-scoped autouse fixture sets DECNET_CANARY_FINGERPRINT_SECRET so fingerprint generator and worker tests work offline. tests 5 new worker HTTP tests and 2 new generator tests covering each validation layer.
This commit is contained in:
@@ -9,11 +9,29 @@ both accept a stripped-down skeleton with just ``[Content_Types].xml``,
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import os
|
||||
import zipfile
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="session")
|
||||
def _canary_fingerprint_secret():
|
||||
"""Ensure DECNET_CANARY_FINGERPRINT_SECRET is set for all canary tests.
|
||||
|
||||
Fingerprint generators call nonce_for() which raises if the env var
|
||||
is unset. A test-only sentinel value is fine — it just needs to exist.
|
||||
"""
|
||||
key = "DECNET_CANARY_FINGERPRINT_SECRET"
|
||||
prev = os.environ.get(key)
|
||||
os.environ.setdefault(key, "test-secret-for-canary-tests-only")
|
||||
yield
|
||||
if prev is None:
|
||||
os.environ.pop(key, None)
|
||||
else:
|
||||
os.environ[key] = prev
|
||||
|
||||
|
||||
_DOCX_CONTENT_TYPES = (
|
||||
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
|
||||
'<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">'
|
||||
|
||||
@@ -101,3 +101,24 @@ def test_mint_uuid_stable_across_html_and_svg() -> None:
|
||||
html_uuid = next(n for n in html.notes if n.startswith("mint_uuid="))
|
||||
svg_uuid = next(n for n in svg.notes if n.startswith("mint_uuid="))
|
||||
assert html_uuid == svg_uuid
|
||||
|
||||
|
||||
def test_fingerprint_html_nonce_populated_and_matches_hmac() -> None:
|
||||
"""Artifact carries ``fingerprint_nonce`` matching HMAC derivation."""
|
||||
import uuid as _uuid
|
||||
from decnet.canary.obfuscator import nonce_for
|
||||
|
||||
art = get_generator("fingerprint_html").generate(_ctx("nonce-tok"))
|
||||
assert art.fingerprint_nonce is not None
|
||||
assert len(art.fingerprint_nonce) == 16
|
||||
_MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d")
|
||||
expected_mint = str(_uuid.uuid5(_MINT_NS, "nonce-tok"))
|
||||
expected_nonce = nonce_for("nonce-tok", expected_mint)
|
||||
assert art.fingerprint_nonce == expected_nonce
|
||||
|
||||
|
||||
def test_fingerprint_svg_nonce_matches_html_for_same_token() -> None:
|
||||
"""Both generators derive the same nonce for the same callback token."""
|
||||
html = get_generator("fingerprint_html").generate(_ctx("nonce-tok2"))
|
||||
svg = get_generator("fingerprint_svg").generate(_ctx("nonce-tok2"))
|
||||
assert html.fingerprint_nonce == svg.fingerprint_nonce
|
||||
|
||||
@@ -55,10 +55,12 @@ def test_render_fingerprint_js_substitutes_then_obfuscates() -> None:
|
||||
callback_token="tok-12345",
|
||||
http_base="https://canary.example.test",
|
||||
mint_uuid="11111111-2222-3333-4444-555555555555",
|
||||
nonce="deadbeef01234567",
|
||||
)
|
||||
# Template placeholders must NOT survive into the output.
|
||||
assert "{{BEACON_URL}}" not in out
|
||||
assert "{{MINT_UUID}}" not in out
|
||||
assert "{{MINT_NONCE}}" not in out
|
||||
assert out.strip()
|
||||
# Should be syntactically valid JS — Node parses it without throwing.
|
||||
proc = subprocess.run(
|
||||
@@ -74,6 +76,7 @@ def test_render_fingerprint_js_is_deterministic() -> None:
|
||||
callback_token="tok-12345",
|
||||
http_base="https://canary.example.test",
|
||||
mint_uuid="11111111-2222-3333-4444-555555555555",
|
||||
nonce="deadbeef01234567",
|
||||
)
|
||||
a = obfuscator.render_fingerprint_js(**kw)
|
||||
b = obfuscator.render_fingerprint_js(**kw)
|
||||
|
||||
@@ -112,16 +112,24 @@ async def test_xff_is_honored(repo: SQLiteRepository, bus: FakeBus) -> None:
|
||||
async def test_fingerprint_query_param_decoded_into_raw_headers(
|
||||
repo: SQLiteRepository, bus: FakeBus,
|
||||
) -> None:
|
||||
"""``?d=<b64url(json)>`` is decoded into raw_headers["_fp"] as a dict."""
|
||||
"""``?d=<b64url(json)>`` is decoded into raw_headers["_fp"] when valid."""
|
||||
import base64
|
||||
import json
|
||||
import uuid as _uuid
|
||||
|
||||
_MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d")
|
||||
mint_id = str(_uuid.uuid5(_MINT_NS, "slug-FP1"))
|
||||
await repo.create_canary_token({
|
||||
"uuid": "tok-fp1", "kind": "http", "decky_name": "web1",
|
||||
"generator": "fingerprint_html", "placement_path": "/x",
|
||||
"callback_token": "slug-FP1", "secret_seed": "s", "created_by": "u1",
|
||||
})
|
||||
fp = {"mint": "abc-123", "nav": {"ua": "Test/1.0"}, "id": "h" * 64}
|
||||
# Token has no fingerprint_nonce → Layer A skipped; must satisfy B + C.
|
||||
fp = {
|
||||
"mint": mint_id,
|
||||
"nav": {"ua": "Test/1.0"}, "scr": {"w": 1920}, "tz": {"z": "UTC"},
|
||||
"id": "h" * 64,
|
||||
}
|
||||
blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode()
|
||||
app = _build_app(repo, bus)
|
||||
with TestClient(app) as client:
|
||||
@@ -212,6 +220,153 @@ async def test_oversize_fingerprint_dropped(
|
||||
assert "_fp" not in headers
|
||||
|
||||
|
||||
def _make_fp_blob(slug: str, extra_keys: int = 3) -> tuple[str, str]:
|
||||
"""Return (b64url_blob, mint_uuid) for a fingerprint matching *slug*."""
|
||||
import base64
|
||||
import json
|
||||
import uuid as _uuid
|
||||
|
||||
_MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d")
|
||||
mint_id = str(_uuid.uuid5(_MINT_NS, slug))
|
||||
base_keys = ["nav", "scr", "tz", "cv", "gl"]
|
||||
fp: dict = {"mint": mint_id}
|
||||
for k in base_keys[:extra_keys]:
|
||||
fp[k] = {"ok": True}
|
||||
fp["id"] = "a" * 64
|
||||
blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode()
|
||||
return blob, mint_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fp_valid_nonce_persists(repo: SQLiteRepository, bus: FakeBus) -> None:
|
||||
"""Valid nonce + valid shape + correct mint UUID → ``_fp`` is persisted."""
|
||||
import json
|
||||
|
||||
blob, _ = _make_fp_blob("slug-NONCE1")
|
||||
await repo.create_canary_token({
|
||||
"uuid": "tok-n1", "kind": "http", "decky_name": "web1",
|
||||
"generator": "fingerprint_html", "placement_path": "/x",
|
||||
"callback_token": "slug-NONCE1", "secret_seed": "s", "created_by": "u1",
|
||||
"fingerprint_nonce": "deadbeef01234567",
|
||||
})
|
||||
app = _build_app(repo, bus)
|
||||
with TestClient(app) as client:
|
||||
client.get(f"/c/slug-NONCE1?d={blob}&k=deadbeef01234567")
|
||||
triggers = await repo.list_canary_triggers("tok-n1")
|
||||
headers = json.loads(triggers[0]["raw_headers"])
|
||||
assert "_fp" in headers
|
||||
assert "_fp_invalid_nonce" not in headers
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fp_invalid_nonce_rejected(repo: SQLiteRepository, bus: FakeBus) -> None:
|
||||
"""Wrong ``?k=`` value → ``_fp_invalid_nonce=1``, no ``_fp``."""
|
||||
import json
|
||||
|
||||
blob, _ = _make_fp_blob("slug-NONCE2")
|
||||
await repo.create_canary_token({
|
||||
"uuid": "tok-n2", "kind": "http", "decky_name": "web1",
|
||||
"generator": "fingerprint_html", "placement_path": "/x",
|
||||
"callback_token": "slug-NONCE2", "secret_seed": "s", "created_by": "u1",
|
||||
"fingerprint_nonce": "deadbeef01234567",
|
||||
})
|
||||
app = _build_app(repo, bus)
|
||||
with TestClient(app) as client:
|
||||
client.get(f"/c/slug-NONCE2?d={blob}&k=wrongnonce000000")
|
||||
triggers = await repo.list_canary_triggers("tok-n2")
|
||||
headers = json.loads(triggers[0]["raw_headers"])
|
||||
assert headers["_fp_invalid_nonce"] == "1"
|
||||
assert "_fp" not in headers
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fp_invalid_shape_rejected(repo: SQLiteRepository, bus: FakeBus) -> None:
|
||||
"""Fewer than 3 known dict keys → ``_fp_invalid_shape=1``, no ``_fp``."""
|
||||
import base64
|
||||
import json
|
||||
import uuid as _uuid
|
||||
|
||||
_MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d")
|
||||
mint_id = str(_uuid.uuid5(_MINT_NS, "slug-SHAPE1"))
|
||||
fp = {"mint": mint_id, "nav": {"ua": "x"}} # only 1 known dict key
|
||||
blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode()
|
||||
await repo.create_canary_token({
|
||||
"uuid": "tok-sh1", "kind": "http", "decky_name": "web1",
|
||||
"generator": "fingerprint_html", "placement_path": "/x",
|
||||
"callback_token": "slug-SHAPE1", "secret_seed": "s", "created_by": "u1",
|
||||
})
|
||||
app = _build_app(repo, bus)
|
||||
with TestClient(app) as client:
|
||||
client.get(f"/c/slug-SHAPE1?d={blob}")
|
||||
triggers = await repo.list_canary_triggers("tok-sh1")
|
||||
headers = json.loads(triggers[0]["raw_headers"])
|
||||
assert headers["_fp_invalid_shape"] == "1"
|
||||
assert "_fp" not in headers
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fp_invalid_mint_rejected(repo: SQLiteRepository, bus: FakeBus) -> None:
|
||||
"""Wrong mint UUID in payload → ``_fp_invalid_mint=1``, no ``_fp``."""
|
||||
import base64
|
||||
import json
|
||||
|
||||
fp = {
|
||||
"mint": "wrong-uuid-entirely",
|
||||
"nav": {"x": 1}, "scr": {"x": 1}, "tz": {"x": 1},
|
||||
"id": "a" * 64,
|
||||
}
|
||||
blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode()
|
||||
await repo.create_canary_token({
|
||||
"uuid": "tok-mint1", "kind": "http", "decky_name": "web1",
|
||||
"generator": "fingerprint_html", "placement_path": "/x",
|
||||
"callback_token": "slug-MINT1", "secret_seed": "s", "created_by": "u1",
|
||||
})
|
||||
app = _build_app(repo, bus)
|
||||
with TestClient(app) as client:
|
||||
client.get(f"/c/slug-MINT1?d={blob}")
|
||||
triggers = await repo.list_canary_triggers("tok-mint1")
|
||||
headers = json.loads(triggers[0]["raw_headers"])
|
||||
assert headers["_fp_invalid_mint"] == "1"
|
||||
assert "_fp" not in headers
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fp_rate_limited_on_excess_submissions(
|
||||
repo: SQLiteRepository, bus: FakeBus,
|
||||
) -> None:
|
||||
"""31st rapid-fire submission → ``_fp_rate_limited=1``, no ``_fp``."""
|
||||
import json
|
||||
import decnet.canary.worker as _worker
|
||||
|
||||
# Reset the rate bucket so other tests don't bleed in.
|
||||
_worker._fp_rate_buckets.clear()
|
||||
|
||||
blob, _ = _make_fp_blob("slug-RATE1")
|
||||
await repo.create_canary_token({
|
||||
"uuid": "tok-rate1", "kind": "http", "decky_name": "web1",
|
||||
"generator": "fingerprint_html", "placement_path": "/x",
|
||||
"callback_token": "slug-RATE1", "secret_seed": "s", "created_by": "u1",
|
||||
})
|
||||
app = _build_app(repo, bus)
|
||||
with TestClient(app) as client:
|
||||
for _ in range(31):
|
||||
client.get(
|
||||
f"/c/slug-RATE1?d={blob}",
|
||||
headers={"X-Forwarded-For": "1.2.3.4"},
|
||||
)
|
||||
triggers = await repo.list_canary_triggers("tok-rate1")
|
||||
# list_canary_triggers orders DESC (newest first) — index 0 is the 31st hit.
|
||||
newest_headers = json.loads(triggers[0]["raw_headers"])
|
||||
assert newest_headers["_fp_rate_limited"] == "1"
|
||||
assert "_fp" not in newest_headers
|
||||
# Oldest (30th or earlier) should be clean.
|
||||
oldest_headers = json.loads(triggers[-1]["raw_headers"])
|
||||
assert "_fp_rate_limited" not in oldest_headers
|
||||
assert "_fp" in oldest_headers
|
||||
|
||||
_worker._fp_rate_buckets.clear()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_decnet_strings_in_response(repo: SQLiteRepository, bus: FakeBus) -> None:
|
||||
"""Stealth posture: nothing in the HTTP surface mentions DECNET."""
|
||||
|
||||
Reference in New Issue
Block a user