Files
DECNET/tests/api/test_rbac.py
anti 8db593a544 test(api): repair pre-existing rotted tests (SSE ticket flow, password policy)
These had been red since the changes they cover landed — invisible because
the pre-commit gate runs mypy/ruff/bandit/pip-audit but NOT pytest, so failing
tests don't block commits and quietly accumulate.

- SSE stream/events auth migrated from ?token=<jwt> to a single-use ?ticket=
  (commit efb4e49d). Three tests still passed a raw JWT as ?token= and got
  401. Updated to mint a ticket via POST /auth/sse-ticket and pass ?ticket=
  (attacker events, topology events, /stream).
- The user-creation password policy is min_length=12; the RBAC admin-access
  test still used a 10-char password and was rejected. Bumped to a valid one.
2026-06-16 12:06:56 -04:00

118 lines
4.1 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""RBAC matrix tests — verify role enforcement on every API endpoint."""
import pytest
# ── Read-only endpoints: viewer + admin should both get access ──────────
_VIEWER_ENDPOINTS = [
("GET", "/api/v1/logs"),
("GET", "/api/v1/logs/histogram"),
("GET", "/api/v1/bounty"),
("GET", "/api/v1/deckies"),
("GET", "/api/v1/stats"),
("GET", "/api/v1/attackers"),
("GET", "/api/v1/config"),
]
@pytest.mark.anyio
@pytest.mark.parametrize("method,path", _VIEWER_ENDPOINTS)
async def test_viewer_can_access_read_endpoints(client, viewer_token, method, path):
resp = await client.request(
method, path, headers={"Authorization": f"Bearer {viewer_token}"}
)
assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}"
@pytest.mark.anyio
@pytest.mark.parametrize("method,path", _VIEWER_ENDPOINTS)
async def test_admin_can_access_read_endpoints(client, auth_token, method, path):
resp = await client.request(
method, path, headers={"Authorization": f"Bearer {auth_token}"}
)
assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}"
# ── Admin-only endpoints: viewer must get 403 ──────────────────────────
_ADMIN_ENDPOINTS = [
("PUT", "/api/v1/config/deployment-limit", {"deployment_limit": 5}),
("PUT", "/api/v1/config/global-mutation-interval", {"global_mutation_interval": "1d"}),
("POST", "/api/v1/config/users", {"username": "rbac-test", "password": "pass-123456789", "role": "viewer"}),
]
@pytest.mark.anyio
@pytest.mark.parametrize("method,path,body", _ADMIN_ENDPOINTS)
async def test_viewer_blocked_from_admin_endpoints(client, viewer_token, method, path, body):
resp = await client.request(
method, path,
json=body,
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 403, f"{method} {path} returned {resp.status_code}"
@pytest.mark.anyio
@pytest.mark.parametrize("method,path,body", _ADMIN_ENDPOINTS)
async def test_admin_can_access_admin_endpoints(client, auth_token, method, path, body):
resp = await client.request(
method, path,
json=body,
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}"
# ── Unauthenticated access: must get 401 ───────────────────────────────
_ALL_PROTECTED = [
("GET", "/api/v1/logs"),
("GET", "/api/v1/stats"),
("GET", "/api/v1/deckies"),
("GET", "/api/v1/bounty"),
("GET", "/api/v1/attackers"),
("GET", "/api/v1/config"),
("PUT", "/api/v1/config/deployment-limit"),
("POST", "/api/v1/config/users"),
]
@pytest.mark.anyio
@pytest.mark.parametrize("method,path", _ALL_PROTECTED)
async def test_unauthenticated_returns_401(client, method, path):
resp = await client.request(method, path)
assert resp.status_code == 401, f"{method} {path} returned {resp.status_code}"
# ── Fleet write endpoints: viewer must get 403 ─────────────────────────
@pytest.mark.anyio
async def test_viewer_blocked_from_deploy(client, viewer_token):
resp = await client.post(
"/api/v1/deckies/deploy",
json={"ini_content": "[decky-rbac-test]\nservices=ssh"},
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 403
@pytest.mark.anyio
async def test_viewer_blocked_from_mutate(client, viewer_token):
resp = await client.post(
"/api/v1/deckies/test-decky/mutate",
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 403
@pytest.mark.anyio
async def test_viewer_blocked_from_mutate_interval(client, viewer_token):
resp = await client.put(
"/api/v1/deckies/test-decky/mutate-interval",
json={"mutate_interval": "5d"},
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 403