diff --git a/decnet/canary/base.py b/decnet/canary/base.py index 160dcd19..d9e05552 100644 --- a/decnet/canary/base.py +++ b/decnet/canary/base.py @@ -100,6 +100,12 @@ class CanaryArtifact: planting. Never leaked to the attacker-facing surface. """ + fingerprint_nonce: Optional[str] = None + """Per-mint HMAC nonce for fingerprint canaries; ``None`` for everything + else. Cultivator reads this and persists it on ``CanaryToken.fingerprint_nonce`` + so the worker can validate incoming ``?k=`` params. + """ + class CanaryGenerator(ABC): """Produces a fake artifact from scratch.""" diff --git a/decnet/canary/cultivator.py b/decnet/canary/cultivator.py index 88288ff3..a0c915c1 100644 --- a/decnet/canary/cultivator.py +++ b/decnet/canary/cultivator.py @@ -160,7 +160,7 @@ async def cultivate( # attribute a callback if the artifact trips during the plant # itself (improbable but possible — DOCX viewers can preview # autoplay-style). - await repo.create_canary_token({ + token_data: dict = { "kind": _GENERATOR_TO_KIND.get(gen_name, "http"), "decky_name": plan.decky_name, "instrumenter": None, @@ -171,7 +171,10 @@ async def cultivate( "placed_at": datetime.now(timezone.utc), "created_by": created_by, "state": "planted", - }) + } + if artifact.fingerprint_nonce is not None: + token_data["fingerprint_nonce"] = artifact.fingerprint_nonce + await repo.create_canary_token(token_data) # Carry the placement_path on the artifact so the orchestrator's # plant_file call uses it. We don't mutate the generator's diff --git a/decnet/canary/fingerprint_payload.js b/decnet/canary/fingerprint_payload.js index 152c9cdf..3c15aa95 100644 --- a/decnet/canary/fingerprint_payload.js +++ b/decnet/canary/fingerprint_payload.js @@ -3,11 +3,12 @@ // canary worker. Ported from canary-self-test.html with the rendering UI // stripped out. // -// Two placeholders are substituted by the Python builder BEFORE +// Three placeholders are substituted by the Python builder BEFORE // javascript-obfuscator runs: // // {{BEACON_URL}} → full URL to /c/ (no trailing slash) // {{MINT_UUID}} → per-mint UUID, baked into the string-array post-obf +// {{MINT_NONCE}} → 16-hex HMAC nonce; the worker rejects ?d=/?o= without it // // Beacon strategy (MVP): a bare GET pixel for "I was opened" reliability, // then a fingerprint payload sent as a base64-URL query param on a second @@ -17,6 +18,7 @@ (async function () { var BEACON_URL = "{{BEACON_URL}}"; var MINT_UUID = "{{MINT_UUID}}"; + var MINT_NONCE = "{{MINT_NONCE}}"; var fp = { mint: MINT_UUID }; function fire(url) { @@ -27,7 +29,7 @@ } // 1) bare-open beacon — fires regardless of whether the rest succeeds - fire(BEACON_URL + "?o=1"); + fire(BEACON_URL + "?o=1&k=" + MINT_NONCE); function sha256(str) { var buf = new TextEncoder().encode(str); @@ -276,13 +278,13 @@ // chunk if URL would exceed safe limit (~6KB) var MAX = 6000; if (b64.length <= MAX) { - fire(BEACON_URL + "?d=" + b64); + fire(BEACON_URL + "?d=" + b64 + "&k=" + MINT_NONCE); } else { var sid = (Math.random() * 1e9 | 0).toString(36); var total = Math.ceil(b64.length / MAX); for (var ci = 0; ci < total; ci++) { var part = b64.substr(ci * MAX, MAX); - fire(BEACON_URL + "?s=" + sid + "&i=" + ci + "&n=" + total + "&d=" + part); + fire(BEACON_URL + "?s=" + sid + "&i=" + ci + "&n=" + total + "&d=" + part + "&k=" + MINT_NONCE); } } } catch (e) { /* swallow */ } diff --git a/decnet/canary/generators/fingerprint_html.py b/decnet/canary/generators/fingerprint_html.py index 9046538f..122264cf 100644 --- a/decnet/canary/generators/fingerprint_html.py +++ b/decnet/canary/generators/fingerprint_html.py @@ -21,7 +21,7 @@ import hashlib import uuid from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryGenerator -from decnet.canary.obfuscator import render_fingerprint_js +from decnet.canary.obfuscator import render_fingerprint_js, nonce_for _MINT_NAMESPACE = uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d") @@ -111,10 +111,12 @@ class FingerprintHtmlGenerator(CanaryGenerator): def generate(self, ctx: CanaryContext) -> CanaryArtifact: mint_uuid = _mint_uuid_for(ctx.callback_token) + nonce = nonce_for(ctx.callback_token, mint_uuid) payload = render_fingerprint_js( callback_token=ctx.callback_token, http_base=ctx.http_base, mint_uuid=mint_uuid, + nonce=nonce, ) rows, row_count = _build_rows(ctx.callback_token) body = _PAGE_TEMPLATE.format( @@ -130,6 +132,7 @@ class FingerprintHtmlGenerator(CanaryGenerator): mode=0o644, mtime_offset=-86400 * 14, generator=self.name, + fingerprint_nonce=nonce, notes=[ f"obfuscated fingerprinter beacons={beacon}", f"mint_uuid={mint_uuid}", diff --git a/decnet/canary/generators/fingerprint_svg.py b/decnet/canary/generators/fingerprint_svg.py index fb6c418a..78fda748 100644 --- a/decnet/canary/generators/fingerprint_svg.py +++ b/decnet/canary/generators/fingerprint_svg.py @@ -15,7 +15,7 @@ from __future__ import annotations from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryGenerator from decnet.canary.generators.fingerprint_html import _mint_uuid_for, _stable_int -from decnet.canary.obfuscator import render_fingerprint_js +from decnet.canary.obfuscator import render_fingerprint_js, nonce_for _DIAGRAM_TEMPLATE = """ @@ -57,10 +57,12 @@ class FingerprintSvgGenerator(CanaryGenerator): def generate(self, ctx: CanaryContext) -> CanaryArtifact: mint_uuid = _mint_uuid_for(ctx.callback_token) + nonce = nonce_for(ctx.callback_token, mint_uuid) payload = render_fingerprint_js( callback_token=ctx.callback_token, http_base=ctx.http_base, mint_uuid=mint_uuid, + nonce=nonce, ) region = _REGIONS[_stable_int(ctx.callback_token, "reg") % len(_REGIONS)] ver = 1 + (_stable_int(ctx.callback_token, "ver") % 6) @@ -78,6 +80,7 @@ class FingerprintSvgGenerator(CanaryGenerator): mode=0o644, mtime_offset=-86400 * 30, generator=self.name, + fingerprint_nonce=nonce, notes=[ f"obfuscated fingerprinter beacons={beacon}", f"mint_uuid={mint_uuid}", diff --git a/decnet/canary/obfuscator.py b/decnet/canary/obfuscator.py index 1ce4225a..81b93e3e 100644 --- a/decnet/canary/obfuscator.py +++ b/decnet/canary/obfuscator.py @@ -22,6 +22,7 @@ from stdout. Stderr surfaces obfuscator failures. from __future__ import annotations import hashlib +import hmac import json import os import subprocess # nosec B404 — Node helper exec is the whole point @@ -44,6 +45,36 @@ class ObfuscatorError(RuntimeError): """Raised when the Node helper fails or returns empty output.""" +class FingerprintSecretMissing(RuntimeError): + """Raised when ``DECNET_CANARY_FINGERPRINT_SECRET`` is unset. + + Fingerprint canaries embed a per-mint nonce derived from this + server-side secret; without it the worker cannot validate incoming + fingerprint beacons, so we fail loud at mint time rather than ship + a defeatable canary. + """ + + +_FINGERPRINT_SECRET_ENV = "DECNET_CANARY_FINGERPRINT_SECRET" # nosec B105 — this is an env var name, not a hardcoded password + + +def nonce_for(callback_token: str, mint_uuid: str) -> str: + """Compute the per-mint fingerprint nonce. + + HMAC-SHA256 keyed on the server-side master secret, message is + ``callback_token + "|" + mint_uuid``. Truncated to 16 hex chars + (~64 bits of entropy) — enough to defeat slug-only forgery while + fitting comfortably into a query string. + """ + secret = os.environ.get(_FINGERPRINT_SECRET_ENV, "") + if not secret: + raise FingerprintSecretMissing( + f"{_FINGERPRINT_SECRET_ENV} is unset; fingerprint canaries cannot mint" + ) + msg = f"{callback_token}|{mint_uuid}".encode("utf-8") + return hmac.new(secret.encode("utf-8"), msg, hashlib.sha256).hexdigest()[:16] + + def _seed_from_token(callback_token: str) -> int: """Derive a 31-bit numeric seed from the callback token. @@ -124,13 +155,16 @@ def obfuscate(code: str, *, callback_token: str) -> str: def render_fingerprint_js( - *, callback_token: str, http_base: str, mint_uuid: str, + *, callback_token: str, http_base: str, mint_uuid: str, nonce: str, ) -> str: """Build the obfuscated fingerprint JS for a single mint. - Substitutes ``{{BEACON_URL}}`` and ``{{MINT_UUID}}`` in the payload - template, then runs it through :func:`obfuscate` with a seed - derived from the callback token. + Substitutes ``{{BEACON_URL}}``, ``{{MINT_UUID}}``, and + ``{{MINT_NONCE}}`` in the payload template, then runs it through + :func:`obfuscate` with a seed derived from the callback token. + The nonce is appended as ``&k=`` on every beacon URL the JS emits; + the worker rejects fingerprint payloads whose ``?k=`` doesn't match + the row's :attr:`CanaryToken.fingerprint_nonce`. """ template = _PAYLOAD.read_text(encoding="utf-8") beacon = f"{http_base.rstrip('/')}/c/{callback_token}" @@ -138,5 +172,6 @@ def render_fingerprint_js( template .replace("{{BEACON_URL}}", beacon) .replace("{{MINT_UUID}}", mint_uuid) + .replace("{{MINT_NONCE}}", nonce) ) return obfuscate(src, callback_token=callback_token) diff --git a/decnet/canary/worker.py b/decnet/canary/worker.py index 67452c0b..1152b1da 100644 --- a/decnet/canary/worker.py +++ b/decnet/canary/worker.py @@ -30,6 +30,8 @@ import base64 import binascii import json import os +import time +import uuid from datetime import datetime, timezone from typing import Any, Optional @@ -53,6 +55,41 @@ _TRANSPARENT_GIF = bytes.fromhex( ) +# Namespace used by fingerprint generators to derive mint UUID. +# Must stay in sync with fingerprint_html._MINT_NAMESPACE. +_MINT_NAMESPACE = uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d") + +# In-memory per-(token_uuid, src_ip) rate limiter for fingerprint persists. +# Maps (token_uuid, src_ip) -> list of monotonic timestamps. +# Not shared across worker restarts or processes — acceptable for MVP. +_FP_RATE_WINDOW_S = 60 +_FP_RATE_LIMIT = 30 +_fp_rate_buckets: dict[tuple[str, str], list[float]] = {} + + +def _fp_rate_allowed(token_uuid: str, src_ip: str) -> bool: + key = (token_uuid, src_ip) + now = time.monotonic() + cutoff = now - _FP_RATE_WINDOW_S + bucket = _fp_rate_buckets.get(key, []) + bucket = [t for t in bucket if t > cutoff] + if len(bucket) >= _FP_RATE_LIMIT: + _fp_rate_buckets[key] = bucket + return False + bucket.append(now) + _fp_rate_buckets[key] = bucket + return True + + +def _is_valid_fp_shape(fp: dict) -> bool: + """Layer B — structural sanity check on a decoded fingerprint blob.""" + if not isinstance(fp.get("mint"), str) or not fp["mint"]: + return False + known_keys = {"nav", "scr", "tz", "cv", "gl", "au", "ft", "rtc"} + present = sum(1 for k in known_keys if isinstance(fp.get(k), dict)) + return present >= 3 + + def _http_base() -> str: return os.environ.get("DECNET_CANARY_HTTP_BASE", "http://localhost:8088").rstrip("/") @@ -107,8 +144,9 @@ def _build_app(repo: BaseRepository, bus: BaseBus) -> FastAPI: @app.get("/c/{slug}") async def callback(slug: str, request: Request) -> Response: + raw_nonce = request.query_params.get("k") + fp_meta, parsed_fp = _extract_fingerprint(request.query_params) merged_headers = dict(request.headers) - fp_meta = _extract_fingerprint(request.query_params) if fp_meta: merged_headers.update(fp_meta) await _record_hit( @@ -119,6 +157,8 @@ def _build_app(repo: BaseRepository, bus: BaseBus) -> FastAPI: request_path=str(request.url.path), dns_qname=None, raw_headers=merged_headers, + parsed_fp=parsed_fp, + raw_nonce=raw_nonce, ) # Always 200 with a tiny image so the attacker's client sees # a "success" — same return regardless of whether the slug is @@ -143,61 +183,58 @@ def _build_app(repo: BaseRepository, bus: BaseBus) -> FastAPI: _FP_CHUNK_MAX = 8 * 1024 -def _extract_fingerprint(qp: Any) -> dict[str, Any]: - """Decode the fingerprint-payload query params into reserved keys. +def _extract_fingerprint(qp: Any) -> tuple[dict[str, Any], Optional[dict]]: + """Decode fingerprint-payload query params into (meta_dict, parsed_fp). The obfuscated browser payload may send three shapes on ``GET /c/``: * ``?o=1`` — bare-open beacon, fired before fingerprinting starts. * ``?d=`` — single-shot fingerprint dump. - * ``?s=&i=&n=&d=`` — chunked dump, - one request per chunk; the reassembler joins by ``s`` and ``i``. + * ``?s=&i=&n=&d=`` — chunked dump. - Returns a flat dict whose keys are namespaced under a ``_fp`` prefix - so they can't collide with real HTTP header names when merged into - ``raw_headers``. Unknown / malformed input returns ``{}`` — we - never raise; the trigger row records the hit either way. + Returns a tuple of: + - ``meta`` — flat dict with ``_fp_*`` keys to merge into raw_headers. + - ``parsed_fp`` — the decoded fingerprint dict for validation, or ``None`` + when there's no ``?d=`` or decoding fails. """ out: dict[str, Any] = {} + parsed_fp: Optional[dict] = None if not qp: - return out + return out, parsed_fp o = qp.get("o") if hasattr(qp, "get") else None if o: out["_fp_open"] = "1" d = qp.get("d") if hasattr(qp, "get") else None if not d: - return out + return out, parsed_fp if len(d) > _FP_CHUNK_MAX: out["_fp_oversize"] = "1" - return out + return out, parsed_fp sid = qp.get("s") idx = qp.get("i") total = qp.get("n") if sid and idx and total: - # Chunked payload: keep raw base64url + metadata; reassembly is - # a downstream concern (a later worker pass will join chunks - # by ``_fp_sid`` and decode the concatenation). out["_fp_sid"] = sid out["_fp_idx"] = idx out["_fp_total"] = total out["_fp_chunk"] = d - return out + return out, parsed_fp - # Single-shot: decode now so the API consumer sees a structured - # dict rather than a long opaque base64 blob. + # Single-shot: decode and pass back as parsed_fp; validation runs in + # _record_hit after token lookup so we have the stored nonce at hand. try: padded = d + "=" * (-len(d) % 4) raw = base64.urlsafe_b64decode(padded.encode("ascii")) parsed = json.loads(raw.decode("utf-8")) except (binascii.Error, ValueError, UnicodeDecodeError): out["_fp_decode_error"] = "1" - return out + return out, parsed_fp if isinstance(parsed, dict): - out["_fp"] = parsed + parsed_fp = parsed else: out["_fp_decode_error"] = "1" - return out + return out, parsed_fp def _client_ip(request: Request) -> str: @@ -225,16 +262,58 @@ async def _record_hit( request_path: Optional[str], dns_qname: Optional[str], raw_headers: Optional[dict], + parsed_fp: Optional[dict] = None, + raw_nonce: Optional[str] = None, ) -> None: """Resolve slug -> token, persist a trigger, publish on the bus. Unknown slugs are silently swallowed: returning the same response for known and unknown slugs is the stealth posture, and persisting every random scan would clutter the DB. + + When *parsed_fp* is present (single-shot fingerprint decode succeeded), + it is validated through four layers before being merged into raw_headers: + A) nonce match against CanaryToken.fingerprint_nonce, + B) structural shape check, + C) mint UUID consistency, + D) per-(token, IP) rate limit. + Each failure drops the structured ``_fp`` and sets a ``_fp_*_invalid`` flag. + The trigger row always lands regardless — the GET hit is itself forensic. """ token = await repo.get_canary_token_by_slug(slug) if token is None: return + + final_headers: dict[str, Any] = dict(raw_headers or {}) + + if parsed_fp is not None: + stored_nonce: Optional[str] = token.get("fingerprint_nonce") + + # Layer A — nonce + if stored_nonce is not None and raw_nonce != stored_nonce: + final_headers["_fp_invalid_nonce"] = "1" + parsed_fp = None + + # Layer B — shape (only when nonce passed or no nonce enforced) + if parsed_fp is not None and not _is_valid_fp_shape(parsed_fp): + final_headers["_fp_invalid_shape"] = "1" + parsed_fp = None + + # Layer C — mint UUID consistency + if parsed_fp is not None: + expected_mint = str(uuid.uuid5(_MINT_NAMESPACE, slug)) + if parsed_fp.get("mint") != expected_mint: + final_headers["_fp_invalid_mint"] = "1" + parsed_fp = None + + # Layer D — rate limit + if parsed_fp is not None and not _fp_rate_allowed(token["uuid"], src_ip): + final_headers["_fp_rate_limited"] = "1" + parsed_fp = None + + if parsed_fp is not None: + final_headers["_fp"] = parsed_fp + trigger_id = await repo.record_canary_trigger({ "token_uuid": token["uuid"], "occurred_at": datetime.now(timezone.utc), @@ -242,7 +321,7 @@ async def _record_hit( "user_agent": user_agent, "request_path": request_path, "dns_qname": dns_qname, - "raw_headers": raw_headers or {}, + "raw_headers": final_headers, }) try: await bus.publish( diff --git a/decnet/web/db/models/canary.py b/decnet/web/db/models/canary.py index f7da096b..e58c8ba2 100644 --- a/decnet/web/db/models/canary.py +++ b/decnet/web/db/models/canary.py @@ -132,6 +132,12 @@ class CanaryToken(SQLModel, table=True): last_error: Optional[str] = Field( default=None, sa_column=Column("last_error", Text, nullable=True), ) + # 16-hex HMAC nonce embedded in fingerprint canary JS payloads. NULL for + # all non-fingerprint generators. Derived at mint time from + # HMAC-SHA256(DECNET_CANARY_FINGERPRINT_SECRET, callback_token + mint_uuid) + # truncated to 16 chars; the worker validates incoming ?n= against this + # value to reject slug-only fingerprint spoofs. + fingerprint_nonce: Optional[str] = Field(default=None, max_length=16) class CanaryTrigger(SQLModel, table=True): diff --git a/tests/canary/conftest.py b/tests/canary/conftest.py index 72c97f85..56de1e8d 100644 --- a/tests/canary/conftest.py +++ b/tests/canary/conftest.py @@ -9,11 +9,29 @@ both accept a stripped-down skeleton with just ``[Content_Types].xml``, from __future__ import annotations import io +import os import zipfile import pytest +@pytest.fixture(autouse=True, scope="session") +def _canary_fingerprint_secret(): + """Ensure DECNET_CANARY_FINGERPRINT_SECRET is set for all canary tests. + + Fingerprint generators call nonce_for() which raises if the env var + is unset. A test-only sentinel value is fine — it just needs to exist. + """ + key = "DECNET_CANARY_FINGERPRINT_SECRET" + prev = os.environ.get(key) + os.environ.setdefault(key, "test-secret-for-canary-tests-only") + yield + if prev is None: + os.environ.pop(key, None) + else: + os.environ[key] = prev + + _DOCX_CONTENT_TYPES = ( '' '' diff --git a/tests/canary/test_fingerprint_generators.py b/tests/canary/test_fingerprint_generators.py index b2dd4c4f..bbb393a3 100644 --- a/tests/canary/test_fingerprint_generators.py +++ b/tests/canary/test_fingerprint_generators.py @@ -101,3 +101,24 @@ def test_mint_uuid_stable_across_html_and_svg() -> None: html_uuid = next(n for n in html.notes if n.startswith("mint_uuid=")) svg_uuid = next(n for n in svg.notes if n.startswith("mint_uuid=")) assert html_uuid == svg_uuid + + +def test_fingerprint_html_nonce_populated_and_matches_hmac() -> None: + """Artifact carries ``fingerprint_nonce`` matching HMAC derivation.""" + import uuid as _uuid + from decnet.canary.obfuscator import nonce_for + + art = get_generator("fingerprint_html").generate(_ctx("nonce-tok")) + assert art.fingerprint_nonce is not None + assert len(art.fingerprint_nonce) == 16 + _MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d") + expected_mint = str(_uuid.uuid5(_MINT_NS, "nonce-tok")) + expected_nonce = nonce_for("nonce-tok", expected_mint) + assert art.fingerprint_nonce == expected_nonce + + +def test_fingerprint_svg_nonce_matches_html_for_same_token() -> None: + """Both generators derive the same nonce for the same callback token.""" + html = get_generator("fingerprint_html").generate(_ctx("nonce-tok2")) + svg = get_generator("fingerprint_svg").generate(_ctx("nonce-tok2")) + assert html.fingerprint_nonce == svg.fingerprint_nonce diff --git a/tests/canary/test_obfuscator.py b/tests/canary/test_obfuscator.py index 8b868832..d9ffe701 100644 --- a/tests/canary/test_obfuscator.py +++ b/tests/canary/test_obfuscator.py @@ -55,10 +55,12 @@ def test_render_fingerprint_js_substitutes_then_obfuscates() -> None: callback_token="tok-12345", http_base="https://canary.example.test", mint_uuid="11111111-2222-3333-4444-555555555555", + nonce="deadbeef01234567", ) # Template placeholders must NOT survive into the output. assert "{{BEACON_URL}}" not in out assert "{{MINT_UUID}}" not in out + assert "{{MINT_NONCE}}" not in out assert out.strip() # Should be syntactically valid JS — Node parses it without throwing. proc = subprocess.run( @@ -74,6 +76,7 @@ def test_render_fingerprint_js_is_deterministic() -> None: callback_token="tok-12345", http_base="https://canary.example.test", mint_uuid="11111111-2222-3333-4444-555555555555", + nonce="deadbeef01234567", ) a = obfuscator.render_fingerprint_js(**kw) b = obfuscator.render_fingerprint_js(**kw) diff --git a/tests/canary/test_worker_http.py b/tests/canary/test_worker_http.py index 66611ce7..1a5e20d5 100644 --- a/tests/canary/test_worker_http.py +++ b/tests/canary/test_worker_http.py @@ -112,16 +112,24 @@ async def test_xff_is_honored(repo: SQLiteRepository, bus: FakeBus) -> None: async def test_fingerprint_query_param_decoded_into_raw_headers( repo: SQLiteRepository, bus: FakeBus, ) -> None: - """``?d=`` is decoded into raw_headers["_fp"] as a dict.""" + """``?d=`` is decoded into raw_headers["_fp"] when valid.""" import base64 import json + import uuid as _uuid + _MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d") + mint_id = str(_uuid.uuid5(_MINT_NS, "slug-FP1")) await repo.create_canary_token({ "uuid": "tok-fp1", "kind": "http", "decky_name": "web1", "generator": "fingerprint_html", "placement_path": "/x", "callback_token": "slug-FP1", "secret_seed": "s", "created_by": "u1", }) - fp = {"mint": "abc-123", "nav": {"ua": "Test/1.0"}, "id": "h" * 64} + # Token has no fingerprint_nonce → Layer A skipped; must satisfy B + C. + fp = { + "mint": mint_id, + "nav": {"ua": "Test/1.0"}, "scr": {"w": 1920}, "tz": {"z": "UTC"}, + "id": "h" * 64, + } blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode() app = _build_app(repo, bus) with TestClient(app) as client: @@ -212,6 +220,153 @@ async def test_oversize_fingerprint_dropped( assert "_fp" not in headers +def _make_fp_blob(slug: str, extra_keys: int = 3) -> tuple[str, str]: + """Return (b64url_blob, mint_uuid) for a fingerprint matching *slug*.""" + import base64 + import json + import uuid as _uuid + + _MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d") + mint_id = str(_uuid.uuid5(_MINT_NS, slug)) + base_keys = ["nav", "scr", "tz", "cv", "gl"] + fp: dict = {"mint": mint_id} + for k in base_keys[:extra_keys]: + fp[k] = {"ok": True} + fp["id"] = "a" * 64 + blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode() + return blob, mint_id + + +@pytest.mark.asyncio +async def test_fp_valid_nonce_persists(repo: SQLiteRepository, bus: FakeBus) -> None: + """Valid nonce + valid shape + correct mint UUID → ``_fp`` is persisted.""" + import json + + blob, _ = _make_fp_blob("slug-NONCE1") + await repo.create_canary_token({ + "uuid": "tok-n1", "kind": "http", "decky_name": "web1", + "generator": "fingerprint_html", "placement_path": "/x", + "callback_token": "slug-NONCE1", "secret_seed": "s", "created_by": "u1", + "fingerprint_nonce": "deadbeef01234567", + }) + app = _build_app(repo, bus) + with TestClient(app) as client: + client.get(f"/c/slug-NONCE1?d={blob}&k=deadbeef01234567") + triggers = await repo.list_canary_triggers("tok-n1") + headers = json.loads(triggers[0]["raw_headers"]) + assert "_fp" in headers + assert "_fp_invalid_nonce" not in headers + + +@pytest.mark.asyncio +async def test_fp_invalid_nonce_rejected(repo: SQLiteRepository, bus: FakeBus) -> None: + """Wrong ``?k=`` value → ``_fp_invalid_nonce=1``, no ``_fp``.""" + import json + + blob, _ = _make_fp_blob("slug-NONCE2") + await repo.create_canary_token({ + "uuid": "tok-n2", "kind": "http", "decky_name": "web1", + "generator": "fingerprint_html", "placement_path": "/x", + "callback_token": "slug-NONCE2", "secret_seed": "s", "created_by": "u1", + "fingerprint_nonce": "deadbeef01234567", + }) + app = _build_app(repo, bus) + with TestClient(app) as client: + client.get(f"/c/slug-NONCE2?d={blob}&k=wrongnonce000000") + triggers = await repo.list_canary_triggers("tok-n2") + headers = json.loads(triggers[0]["raw_headers"]) + assert headers["_fp_invalid_nonce"] == "1" + assert "_fp" not in headers + + +@pytest.mark.asyncio +async def test_fp_invalid_shape_rejected(repo: SQLiteRepository, bus: FakeBus) -> None: + """Fewer than 3 known dict keys → ``_fp_invalid_shape=1``, no ``_fp``.""" + import base64 + import json + import uuid as _uuid + + _MINT_NS = _uuid.UUID("a3f7c821-9d1e-4b6a-8c2d-1e4f9a7b3c5d") + mint_id = str(_uuid.uuid5(_MINT_NS, "slug-SHAPE1")) + fp = {"mint": mint_id, "nav": {"ua": "x"}} # only 1 known dict key + blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode() + await repo.create_canary_token({ + "uuid": "tok-sh1", "kind": "http", "decky_name": "web1", + "generator": "fingerprint_html", "placement_path": "/x", + "callback_token": "slug-SHAPE1", "secret_seed": "s", "created_by": "u1", + }) + app = _build_app(repo, bus) + with TestClient(app) as client: + client.get(f"/c/slug-SHAPE1?d={blob}") + triggers = await repo.list_canary_triggers("tok-sh1") + headers = json.loads(triggers[0]["raw_headers"]) + assert headers["_fp_invalid_shape"] == "1" + assert "_fp" not in headers + + +@pytest.mark.asyncio +async def test_fp_invalid_mint_rejected(repo: SQLiteRepository, bus: FakeBus) -> None: + """Wrong mint UUID in payload → ``_fp_invalid_mint=1``, no ``_fp``.""" + import base64 + import json + + fp = { + "mint": "wrong-uuid-entirely", + "nav": {"x": 1}, "scr": {"x": 1}, "tz": {"x": 1}, + "id": "a" * 64, + } + blob = base64.urlsafe_b64encode(json.dumps(fp).encode()).rstrip(b"=").decode() + await repo.create_canary_token({ + "uuid": "tok-mint1", "kind": "http", "decky_name": "web1", + "generator": "fingerprint_html", "placement_path": "/x", + "callback_token": "slug-MINT1", "secret_seed": "s", "created_by": "u1", + }) + app = _build_app(repo, bus) + with TestClient(app) as client: + client.get(f"/c/slug-MINT1?d={blob}") + triggers = await repo.list_canary_triggers("tok-mint1") + headers = json.loads(triggers[0]["raw_headers"]) + assert headers["_fp_invalid_mint"] == "1" + assert "_fp" not in headers + + +@pytest.mark.asyncio +async def test_fp_rate_limited_on_excess_submissions( + repo: SQLiteRepository, bus: FakeBus, +) -> None: + """31st rapid-fire submission → ``_fp_rate_limited=1``, no ``_fp``.""" + import json + import decnet.canary.worker as _worker + + # Reset the rate bucket so other tests don't bleed in. + _worker._fp_rate_buckets.clear() + + blob, _ = _make_fp_blob("slug-RATE1") + await repo.create_canary_token({ + "uuid": "tok-rate1", "kind": "http", "decky_name": "web1", + "generator": "fingerprint_html", "placement_path": "/x", + "callback_token": "slug-RATE1", "secret_seed": "s", "created_by": "u1", + }) + app = _build_app(repo, bus) + with TestClient(app) as client: + for _ in range(31): + client.get( + f"/c/slug-RATE1?d={blob}", + headers={"X-Forwarded-For": "1.2.3.4"}, + ) + triggers = await repo.list_canary_triggers("tok-rate1") + # list_canary_triggers orders DESC (newest first) — index 0 is the 31st hit. + newest_headers = json.loads(triggers[0]["raw_headers"]) + assert newest_headers["_fp_rate_limited"] == "1" + assert "_fp" not in newest_headers + # Oldest (30th or earlier) should be clean. + oldest_headers = json.loads(triggers[-1]["raw_headers"]) + assert "_fp_rate_limited" not in oldest_headers + assert "_fp" in oldest_headers + + _worker._fp_rate_buckets.clear() + + @pytest.mark.asyncio async def test_no_decnet_strings_in_response(repo: SQLiteRepository, bus: FakeBus) -> None: """Stealth posture: nothing in the HTTP surface mentions DECNET."""