New test walks app.routes, classifies each APIRoute as admin/viewer/open by identity-matching require_admin / require_viewer closures inside the route's dependency tree, then asserts: - admin routes return 403 to a viewer JWT - viewer routes return neither 401 nor 403 to a viewer JWT SSE routes skipped (separate scope under F6). Role hints deliberately NOT encoded in the OpenAPI spec — classification stays server-side so /openapi.json can't be used to enumerate admin routes. Resolves THREAT_MODEL F2/I + F5/E; paired with the existing test_schemathesis.py::test_auth_enforcement (401-half coverage).
178 lines
6.1 KiB
Python
178 lines
6.1 KiB
Python
"""
|
|
RBAC contract test — every route is classified by server-side dependency
|
|
introspection and exercised with a viewer JWT.
|
|
|
|
Covers THREAT_MODEL.md F2/E (mutation bypass via missing `require_admin`)
|
|
and F5/E (mutation routes returning 403 for viewer). The 401-unauth half
|
|
is covered by `test_schemathesis.py::test_auth_enforcement`.
|
|
|
|
We deliberately do NOT annotate role hints in the OpenAPI spec —
|
|
classification stays server-side so an attacker reading /openapi.json
|
|
can't enumerate admin routes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import httpx
|
|
import pytest
|
|
from fastapi.routing import APIRoute
|
|
|
|
from decnet.web.api import app
|
|
from decnet.web.dependencies import (
|
|
require_admin,
|
|
require_viewer,
|
|
require_stream_viewer,
|
|
get_current_user_unchecked,
|
|
get_current_user,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Route classification (runs at import time)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ADMIN_CALLS = {require_admin}
|
|
_VIEWER_CALLS = {require_viewer, require_stream_viewer, get_current_user_unchecked, get_current_user}
|
|
|
|
|
|
def _walk_deps(dependant) -> set:
|
|
"""Recursively collect every dependency `call` in the tree."""
|
|
calls: set = set()
|
|
stack = list(dependant.dependencies)
|
|
while stack:
|
|
d = stack.pop()
|
|
if d.call is not None:
|
|
calls.add(d.call)
|
|
stack.extend(d.dependencies)
|
|
return calls
|
|
|
|
|
|
def _classify(route: APIRoute) -> str:
|
|
calls = _walk_deps(route.dependant)
|
|
if calls & _ADMIN_CALLS:
|
|
return "admin"
|
|
if calls & _VIEWER_CALLS:
|
|
return "viewer"
|
|
return "open"
|
|
|
|
|
|
def _is_sse(route: APIRoute) -> bool:
|
|
"""SSE endpoints keep the connection open — authz fires before the stream
|
|
starts, but httpx won't return until the server closes. Skip them here;
|
|
F6 gets its own dedicated verification pass."""
|
|
return route.path.endswith(("/events", "/stream", "/status-events"))
|
|
|
|
|
|
def _collect() -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
|
|
"""Return (admin_routes, viewer_routes) as (method, path, name) triples."""
|
|
admin: list[tuple[str, str, str]] = []
|
|
viewer: list[tuple[str, str, str]] = []
|
|
for route in app.routes:
|
|
if not isinstance(route, APIRoute):
|
|
continue
|
|
if _is_sse(route):
|
|
continue
|
|
cls = _classify(route)
|
|
for method in sorted(route.methods - {"HEAD", "OPTIONS"}):
|
|
entry = (method, route.path, route.name)
|
|
if cls == "admin":
|
|
admin.append(entry)
|
|
elif cls == "viewer":
|
|
viewer.append(entry)
|
|
return admin, viewer
|
|
|
|
|
|
ADMIN_ROUTES, VIEWER_ROUTES = _collect()
|
|
|
|
assert ADMIN_ROUTES, "no admin routes discovered — classifier is broken"
|
|
assert VIEWER_ROUTES, "no viewer routes discovered — classifier is broken"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Path-param substitution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ZERO_UUID = "00000000-0000-0000-0000-000000000000"
|
|
|
|
|
|
def _substitute_path(path: str, route: APIRoute) -> str:
|
|
"""Fill `{param}` placeholders with dummy values that satisfy path regex.
|
|
|
|
Authz (403) fires before route-handler execution, so the values don't
|
|
need to match real DB rows — they only need to survive FastAPI's
|
|
param-type coercion. Heuristic by param name keeps this independent
|
|
of pydantic-version internals.
|
|
"""
|
|
out = path
|
|
while "{" in out:
|
|
start = out.index("{")
|
|
end = out.index("}", start)
|
|
name = out[start + 1 : end].lower()
|
|
if "uuid" in name:
|
|
value = _ZERO_UUID
|
|
elif name.endswith("_id") or name == "id":
|
|
value = "1"
|
|
else:
|
|
value = "x"
|
|
out = out[:start] + value + out[end + 1 :]
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_WRITE_METHODS = {"POST", "PUT", "PATCH", "DELETE"}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"method,path,name",
|
|
ADMIN_ROUTES,
|
|
ids=lambda t: f"{t[0]} {t[1]}" if isinstance(t, tuple) else str(t),
|
|
)
|
|
async def test_admin_route_rejects_viewer(client, viewer_token, method, path, name):
|
|
"""Every admin-classified route must return 403 when hit with a viewer JWT.
|
|
|
|
If a route returns 422 instead, the `Depends(require_admin)` parameter
|
|
is declared after a body/query param in the route signature — move it
|
|
earlier so authz runs before schema validation. A 401 means the token
|
|
was rejected outright (viewer user seeding broken → check conftest).
|
|
"""
|
|
url = _substitute_path(path, _route_lookup(method, path))
|
|
kwargs = {"headers": {"Authorization": f"Bearer {viewer_token}"}}
|
|
if method in _WRITE_METHODS:
|
|
kwargs["json"] = {}
|
|
resp = await client.request(method, url, **kwargs)
|
|
assert resp.status_code == 403, (
|
|
f"{method} {path} (name={name}): expected 403 for viewer, "
|
|
f"got {resp.status_code} — body={resp.text[:200]!r}. "
|
|
"If 422: move Depends(require_admin) before the body param in the signature. "
|
|
"If 401: viewer token invalid."
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"method,path,name",
|
|
VIEWER_ROUTES,
|
|
ids=lambda t: f"{t[0]} {t[1]}" if isinstance(t, tuple) else str(t),
|
|
)
|
|
async def test_viewer_route_does_not_reject_viewer(client, viewer_token, method, path, name):
|
|
"""Viewer-accessible routes must not return 401/403 for a valid viewer JWT."""
|
|
url = _substitute_path(path, _route_lookup(method, path))
|
|
kwargs = {"headers": {"Authorization": f"Bearer {viewer_token}"}}
|
|
if method in _WRITE_METHODS:
|
|
kwargs["json"] = {}
|
|
resp = await client.request(method, url, **kwargs)
|
|
assert resp.status_code not in (401, 403), (
|
|
f"{method} {path} (name={name}): viewer unexpectedly got {resp.status_code} "
|
|
f"— body={resp.text[:200]!r}"
|
|
)
|
|
|
|
|
|
def _route_lookup(method: str, path: str) -> APIRoute:
|
|
for route in app.routes:
|
|
if isinstance(route, APIRoute) and route.path == path and method in route.methods:
|
|
return route
|
|
raise LookupError(f"{method} {path}")
|