test: add profiler behavioral analysis and RBAC endpoint tests
- test_profiler_behavioral.py: attacker behavior pattern matching tests - api/test_rbac.py: comprehensive RBAC role separation tests - api/config/: configuration API endpoint tests (CRUD, reinit, user management)
This commit is contained in:
0
tests/api/config/__init__.py
Normal file
0
tests/api/config/__init__.py
Normal file
BIN
tests/api/config/__pycache__/__init__.cpython-314.pyc
Normal file
BIN
tests/api/config/__pycache__/__init__.cpython-314.pyc
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1
tests/api/config/conftest.py
Normal file
1
tests/api/config/conftest.py
Normal file
@@ -0,0 +1 @@
|
||||
# viewer_token fixture is now in tests/api/conftest.py (shared across all API tests)
|
||||
57
tests/api/config/test_deploy_limit.py
Normal file
57
tests/api/config/test_deploy_limit.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def contract_test_mode(monkeypatch):
|
||||
"""Skip actual Docker deployment in tests."""
|
||||
monkeypatch.setenv("DECNET_CONTRACT_TEST", "true")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_network():
|
||||
"""Mock network detection so deploy doesn't call `ip addr show`."""
|
||||
with patch("decnet.web.router.fleet.api_deploy_deckies.get_host_ip", return_value="192.168.1.100"):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_deploy_respects_limit(client, auth_token, mock_state_file):
|
||||
"""Deploy should reject if total deckies would exceed limit."""
|
||||
await repo.set_state("config_limits", {"deployment_limit": 1})
|
||||
await repo.set_state("deployment", mock_state_file)
|
||||
|
||||
ini = """[decky-new]
|
||||
services = ssh
|
||||
"""
|
||||
resp = await client.post(
|
||||
"/api/v1/deckies/deploy",
|
||||
json={"ini_content": ini},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
# 2 existing + 1 new = 3 > limit of 1
|
||||
assert resp.status_code == 409
|
||||
assert "limit" in resp.json()["detail"].lower()
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_deploy_within_limit(client, auth_token, mock_state_file):
|
||||
"""Deploy should succeed when within limit."""
|
||||
await repo.set_state("config_limits", {"deployment_limit": 100})
|
||||
await repo.set_state("deployment", mock_state_file)
|
||||
|
||||
ini = """[decky-new]
|
||||
services = ssh
|
||||
"""
|
||||
resp = await client.post(
|
||||
"/api/v1/deckies/deploy",
|
||||
json={"ini_content": ini},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
# Should not fail due to limit
|
||||
if resp.status_code == 409:
|
||||
assert "limit" not in resp.json()["detail"].lower()
|
||||
else:
|
||||
assert resp.status_code == 200
|
||||
69
tests/api/config/test_get_config.py
Normal file
69
tests/api/config/test_get_config.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_config_defaults_admin(client, auth_token):
|
||||
"""Admin gets full config with users list and defaults."""
|
||||
resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["role"] == "admin"
|
||||
assert data["deployment_limit"] == 10
|
||||
assert data["global_mutation_interval"] == "30m"
|
||||
assert "users" in data
|
||||
assert isinstance(data["users"], list)
|
||||
assert len(data["users"]) >= 1
|
||||
# Ensure no password_hash leaked
|
||||
for user in data["users"]:
|
||||
assert "password_hash" not in user
|
||||
assert "uuid" in user
|
||||
assert "username" in user
|
||||
assert "role" in user
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_config_viewer_no_users(client, auth_token, viewer_token):
|
||||
"""Viewer gets config without users list — server-side gating."""
|
||||
resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["role"] == "viewer"
|
||||
assert data["deployment_limit"] == 10
|
||||
assert data["global_mutation_interval"] == "30m"
|
||||
assert "users" not in data
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_config_returns_stored_values(client, auth_token):
|
||||
"""Config returns stored values after update."""
|
||||
await client.put(
|
||||
"/api/v1/config/deployment-limit",
|
||||
json={"deployment_limit": 42},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
await client.put(
|
||||
"/api/v1/config/global-mutation-interval",
|
||||
json={"global_mutation_interval": "7d"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
|
||||
resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["deployment_limit"] == 42
|
||||
assert data["global_mutation_interval"] == "7d"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_config_unauthenticated(client):
|
||||
resp = await client.get("/api/v1/config")
|
||||
assert resp.status_code == 401
|
||||
76
tests/api/config/test_reinit.py
Normal file
76
tests/api/config/test_reinit.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import pytest
|
||||
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def enable_developer_mode(monkeypatch):
|
||||
monkeypatch.setattr("decnet.web.router.config.api_reinit.DECNET_DEVELOPER", True)
|
||||
monkeypatch.setattr("decnet.web.router.config.api_get_config.DECNET_DEVELOPER", True)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reinit_purges_data(client, auth_token):
|
||||
"""Admin can purge all logs, bounties, and attackers in developer mode."""
|
||||
# Seed some data
|
||||
await repo.add_log({
|
||||
"decky": "d1", "service": "ssh", "event_type": "connect",
|
||||
"attacker_ip": "1.2.3.4", "raw_line": "test", "fields": "{}",
|
||||
})
|
||||
await repo.add_bounty({
|
||||
"decky": "d1", "service": "ssh", "attacker_ip": "1.2.3.4",
|
||||
"bounty_type": "credential", "payload": '{"user":"root"}',
|
||||
})
|
||||
|
||||
resp = await client.delete(
|
||||
"/api/v1/config/reinit",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["deleted"]["logs"] >= 1
|
||||
assert data["deleted"]["bounties"] >= 1
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reinit_viewer_forbidden(client, auth_token, viewer_token):
|
||||
resp = await client.delete(
|
||||
"/api/v1/config/reinit",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reinit_forbidden_without_developer_mode(client, auth_token, monkeypatch):
|
||||
monkeypatch.setattr("decnet.web.router.config.api_reinit.DECNET_DEVELOPER", False)
|
||||
|
||||
resp = await client.delete(
|
||||
"/api/v1/config/reinit",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
assert "developer mode" in resp.json()["detail"].lower()
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_config_includes_developer_mode(client, auth_token):
|
||||
"""Admin config response includes developer_mode when enabled."""
|
||||
resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["developer_mode"] is True
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_config_excludes_developer_mode_when_disabled(client, auth_token, monkeypatch):
|
||||
monkeypatch.setattr("decnet.web.router.config.api_get_config.DECNET_DEVELOPER", False)
|
||||
|
||||
resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert "developer_mode" not in resp.json()
|
||||
77
tests/api/config/test_update_config.py
Normal file
77
tests/api/config/test_update_config.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_deployment_limit_admin(client, auth_token):
|
||||
resp = await client.put(
|
||||
"/api/v1/config/deployment-limit",
|
||||
json={"deployment_limit": 50},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["message"] == "Deployment limit updated"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_deployment_limit_out_of_range(client, auth_token):
|
||||
resp = await client.put(
|
||||
"/api/v1/config/deployment-limit",
|
||||
json={"deployment_limit": 0},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 422
|
||||
|
||||
resp = await client.put(
|
||||
"/api/v1/config/deployment-limit",
|
||||
json={"deployment_limit": 501},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 422
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_deployment_limit_viewer_forbidden(client, auth_token, viewer_token):
|
||||
resp = await client.put(
|
||||
"/api/v1/config/deployment-limit",
|
||||
json={"deployment_limit": 50},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_global_mutation_interval_admin(client, auth_token):
|
||||
resp = await client.put(
|
||||
"/api/v1/config/global-mutation-interval",
|
||||
json={"global_mutation_interval": "7d"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["message"] == "Global mutation interval updated"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_global_mutation_interval_invalid(client, auth_token):
|
||||
resp = await client.put(
|
||||
"/api/v1/config/global-mutation-interval",
|
||||
json={"global_mutation_interval": "abc"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 422
|
||||
|
||||
resp = await client.put(
|
||||
"/api/v1/config/global-mutation-interval",
|
||||
json={"global_mutation_interval": "0m"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 422
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_global_mutation_interval_viewer_forbidden(client, auth_token, viewer_token):
|
||||
resp = await client.put(
|
||||
"/api/v1/config/global-mutation-interval",
|
||||
json={"global_mutation_interval": "7d"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
188
tests/api/config/test_user_management.py
Normal file
188
tests/api/config/test_user_management.py
Normal file
@@ -0,0 +1,188 @@
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_create_user(client, auth_token):
|
||||
resp = await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "newuser", "password": "securepass123", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["username"] == "newuser"
|
||||
assert data["role"] == "viewer"
|
||||
assert data["must_change_password"] is True
|
||||
assert "password_hash" not in data
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_create_user_duplicate(client, auth_token):
|
||||
await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "dupuser", "password": "securepass123", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
resp = await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "dupuser", "password": "securepass456", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 409
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_create_user_viewer_forbidden(client, auth_token, viewer_token):
|
||||
resp = await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "blocked", "password": "securepass123", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_delete_user(client, auth_token):
|
||||
# Create a user to delete
|
||||
create_resp = await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "todelete", "password": "securepass123", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
user_uuid = create_resp.json()["uuid"]
|
||||
|
||||
resp = await client.delete(
|
||||
f"/api/v1/config/users/{user_uuid}",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_delete_self_forbidden(client, auth_token):
|
||||
# Get own UUID from config
|
||||
config_resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
users = config_resp.json()["users"]
|
||||
admin_uuid = next(u["uuid"] for u in users if u["role"] == "admin")
|
||||
|
||||
resp = await client.delete(
|
||||
f"/api/v1/config/users/{admin_uuid}",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_delete_nonexistent_user(client, auth_token):
|
||||
resp = await client.delete(
|
||||
"/api/v1/config/users/00000000-0000-0000-0000-000000000000",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_user_role(client, auth_token):
|
||||
create_resp = await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "roletest", "password": "securepass123", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
user_uuid = create_resp.json()["uuid"]
|
||||
|
||||
resp = await client.put(
|
||||
f"/api/v1/config/users/{user_uuid}/role",
|
||||
json={"role": "admin"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Verify role changed
|
||||
config_resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
updated = next(u for u in config_resp.json()["users"] if u["uuid"] == user_uuid)
|
||||
assert updated["role"] == "admin"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_own_role_forbidden(client, auth_token):
|
||||
config_resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
admin_uuid = next(u["uuid"] for u in config_resp.json()["users"] if u["role"] == "admin")
|
||||
|
||||
resp = await client.put(
|
||||
f"/api/v1/config/users/{admin_uuid}/role",
|
||||
json={"role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reset_user_password(client, auth_token):
|
||||
create_resp = await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "resetme", "password": "securepass123", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
user_uuid = create_resp.json()["uuid"]
|
||||
|
||||
resp = await client.put(
|
||||
f"/api/v1/config/users/{user_uuid}/reset-password",
|
||||
json={"new_password": "newpass12345"},
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Verify must_change_password is set
|
||||
config_resp = await client.get(
|
||||
"/api/v1/config",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
updated = next(u for u in config_resp.json()["users"] if u["uuid"] == user_uuid)
|
||||
assert updated["must_change_password"] is True
|
||||
|
||||
# Verify new password works
|
||||
login_resp = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "resetme", "password": "newpass12345"},
|
||||
)
|
||||
assert login_resp.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_all_user_endpoints_viewer_forbidden(client, auth_token, viewer_token):
|
||||
"""Viewer cannot access any user management endpoints."""
|
||||
resp = await client.post(
|
||||
"/api/v1/config/users",
|
||||
json={"username": "x", "password": "securepass123", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
resp = await client.delete(
|
||||
"/api/v1/config/users/fake-uuid",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
resp = await client.put(
|
||||
"/api/v1/config/users/fake-uuid/role",
|
||||
json={"role": "admin"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
resp = await client.put(
|
||||
"/api/v1/config/users/fake-uuid/reset-password",
|
||||
json={"new_password": "securepass123"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
116
tests/api/test_rbac.py
Normal file
116
tests/api/test_rbac.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""RBAC matrix tests — verify role enforcement on every API endpoint."""
|
||||
import pytest
|
||||
|
||||
|
||||
# ── Read-only endpoints: viewer + admin should both get access ──────────
|
||||
|
||||
_VIEWER_ENDPOINTS = [
|
||||
("GET", "/api/v1/logs"),
|
||||
("GET", "/api/v1/logs/histogram"),
|
||||
("GET", "/api/v1/bounty"),
|
||||
("GET", "/api/v1/deckies"),
|
||||
("GET", "/api/v1/stats"),
|
||||
("GET", "/api/v1/attackers"),
|
||||
("GET", "/api/v1/config"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize("method,path", _VIEWER_ENDPOINTS)
|
||||
async def test_viewer_can_access_read_endpoints(client, viewer_token, method, path):
|
||||
resp = await client.request(
|
||||
method, path, headers={"Authorization": f"Bearer {viewer_token}"}
|
||||
)
|
||||
assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize("method,path", _VIEWER_ENDPOINTS)
|
||||
async def test_admin_can_access_read_endpoints(client, auth_token, method, path):
|
||||
resp = await client.request(
|
||||
method, path, headers={"Authorization": f"Bearer {auth_token}"}
|
||||
)
|
||||
assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}"
|
||||
|
||||
|
||||
# ── Admin-only endpoints: viewer must get 403 ──────────────────────────
|
||||
|
||||
_ADMIN_ENDPOINTS = [
|
||||
("PUT", "/api/v1/config/deployment-limit", {"deployment_limit": 5}),
|
||||
("PUT", "/api/v1/config/global-mutation-interval", {"global_mutation_interval": "1d"}),
|
||||
("POST", "/api/v1/config/users", {"username": "rbac-test", "password": "pass123456", "role": "viewer"}),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize("method,path,body", _ADMIN_ENDPOINTS)
|
||||
async def test_viewer_blocked_from_admin_endpoints(client, viewer_token, method, path, body):
|
||||
resp = await client.request(
|
||||
method, path,
|
||||
json=body,
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403, f"{method} {path} returned {resp.status_code}"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize("method,path,body", _ADMIN_ENDPOINTS)
|
||||
async def test_admin_can_access_admin_endpoints(client, auth_token, method, path, body):
|
||||
resp = await client.request(
|
||||
method, path,
|
||||
json=body,
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}"
|
||||
|
||||
|
||||
# ── Unauthenticated access: must get 401 ───────────────────────────────
|
||||
|
||||
_ALL_PROTECTED = [
|
||||
("GET", "/api/v1/logs"),
|
||||
("GET", "/api/v1/stats"),
|
||||
("GET", "/api/v1/deckies"),
|
||||
("GET", "/api/v1/bounty"),
|
||||
("GET", "/api/v1/attackers"),
|
||||
("GET", "/api/v1/config"),
|
||||
("PUT", "/api/v1/config/deployment-limit"),
|
||||
("POST", "/api/v1/config/users"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize("method,path", _ALL_PROTECTED)
|
||||
async def test_unauthenticated_returns_401(client, method, path):
|
||||
resp = await client.request(method, path)
|
||||
assert resp.status_code == 401, f"{method} {path} returned {resp.status_code}"
|
||||
|
||||
|
||||
# ── Fleet write endpoints: viewer must get 403 ─────────────────────────
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_viewer_blocked_from_deploy(client, viewer_token):
|
||||
resp = await client.post(
|
||||
"/api/v1/deckies/deploy",
|
||||
json={"ini_content": "[decky-rbac-test]\nservices=ssh"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_viewer_blocked_from_mutate(client, viewer_token):
|
||||
resp = await client.post(
|
||||
"/api/v1/deckies/test-decky/mutate",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_viewer_blocked_from_mutate_interval(client, viewer_token):
|
||||
resp = await client.put(
|
||||
"/api/v1/deckies/test-decky/mutate-interval",
|
||||
json={"mutate_interval": "5d"},
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
Reference in New Issue
Block a user