Files
stealergram/tests/test_web_rbac.py
anti b4025b8a41 Fix test isolation: redirect hits.db to tmp_path in web RBAC tests
Dashboard route queries utils.database, which was pointing at the real
data/hits.db instead of a temp file, causing no such table: hits.
2026-05-19 10:21:37 -04:00

198 lines
7.1 KiB
Python

"""
Tests for web RBAC enforcement and config JSON round-trip.
Uses FastAPI TestClient with a temporary web.db.
"""
import json
import pytest
@pytest.fixture(autouse=True)
def isolated_web(tmp_path, monkeypatch):
"""Redirect web.db and runtime_config.json to tmp dirs for each test."""
from pathlib import Path
import web.db as db_mod
import config as cfg_mod
db_path = tmp_path / "web.db"
cfg_path = tmp_path / "runtime_config.json"
import utils.database as hitdb_mod
hits_path = tmp_path / "hits.db"
monkeypatch.setattr(db_mod, "DB_FILE", db_path)
monkeypatch.setattr(cfg_mod, "RUNTIME_CONFIG_PATH", cfg_path)
monkeypatch.setattr(hitdb_mod, "DB_FILE", hits_path)
db_mod.init_db()
hitdb_mod.init_db()
@pytest.fixture
def client():
from fastapi.testclient import TestClient
from web.app import create_app
app = create_app()
return TestClient(app, raise_server_exceptions=True)
def _login(client, username="superadmin", password="superpass") -> dict:
"""Log in and return the cookies dict."""
r = client.post("/login", data={"username": username, "password": password},
follow_redirects=False)
assert r.status_code == 303, f"Login failed: {r.text}"
return client.cookies
# ─── Auth flow ────────────────────────────────────────────────────────────────
class TestAuthFlow:
def test_login_sets_cookies(self, client):
r = client.post("/login", data={"username": "superadmin", "password": "superpass"},
follow_redirects=False)
assert r.status_code == 303
assert "access_token" in r.cookies
assert "refresh_token" in r.cookies
def test_invalid_password_returns_401(self, client):
r = client.post("/login", data={"username": "superadmin", "password": "wrong"},
follow_redirects=False)
assert r.status_code == 401
def test_unauthenticated_dashboard_redirected_or_401(self, client):
r = client.get("/dashboard", follow_redirects=False)
assert r.status_code in (401, 302, 303)
def test_logout_clears_cookies(self, client):
_login(client)
r = client.post("/logout", follow_redirects=False)
assert r.status_code == 303
def test_authenticated_dashboard_ok(self, client):
_login(client)
r = client.get("/dashboard")
assert r.status_code == 200
# ─── RBAC ─────────────────────────────────────────────────────────────────────
class TestRBAC:
def test_reader_cannot_access_config(self, client):
import web.db as db_mod
db_mod.create_user("reader1", "pass", "reader")
_login(client, "reader1", "pass")
r = client.get("/config/keywords")
assert r.status_code == 403
def test_admin_can_access_config(self, client):
import web.db as db_mod
db_mod.create_user("admin1", "pass", "admin")
_login(client, "admin1", "pass")
r = client.get("/config/keywords")
assert r.status_code == 200
def test_reader_cannot_access_users(self, client):
import web.db as db_mod
db_mod.create_user("reader2", "pass", "reader")
_login(client, "reader2", "pass")
r = client.get("/users")
assert r.status_code == 403
def test_admin_cannot_access_users(self, client):
import web.db as db_mod
db_mod.create_user("admin2", "pass", "admin")
_login(client, "admin2", "pass")
r = client.get("/users")
assert r.status_code == 403
def test_superadmin_can_access_users(self, client):
_login(client)
r = client.get("/users")
assert r.status_code == 200
# ─── Config round-trip ────────────────────────────────────────────────────────
class TestConfigRoundTrip:
def test_put_keywords_saves_and_reloads(self, client):
import config
_login(client)
groups = [
{
"id": "testorg",
"name": "Test Org",
"patterns": [
{"regex": r"testorg\.com", "label": "Domain"},
{"regex": r"@testorg\.com", "label": "Employees"},
],
}
]
r = client.put("/config/keywords", json={"groups": groups})
assert r.status_code == 200
assert r.json()["groups"] == 1
# Config module globals updated in-process
assert any("testorg" in kw for kw in config.TARGET_KEYWORDS)
def test_put_keywords_invalid_regex_rejected(self, client):
_login(client)
groups = [
{
"id": "bad",
"name": "Bad",
"patterns": [{"regex": "[invalid(regex", "label": "oops"}],
}
]
r = client.put("/config/keywords", json={"groups": groups})
assert r.status_code == 422
def test_put_channels_saves(self, client):
import config
_login(client)
channels = ["testchannel", -1002748707556]
r = client.put("/config/channels", json={"channels": channels})
assert r.status_code == 200
assert config.WATCHED_CHANNELS == channels
# ─── User management ─────────────────────────────────────────────────────────
class TestUserManagement:
def test_create_user(self, client):
_login(client)
r = client.post("/users", json={"username": "newuser", "password": "pass", "role": "reader"})
assert r.status_code == 200
assert "id" in r.json()
def test_cannot_create_duplicate_username(self, client):
_login(client)
client.post("/users", json={"username": "dup", "password": "p", "role": "reader"})
r = client.post("/users", json={"username": "dup", "password": "p", "role": "reader"})
assert r.status_code == 409
def test_update_user_role(self, client):
import web.db as db_mod
_login(client)
uid = db_mod.create_user("patchme", "p", "reader")
r = client.patch(f"/users/{uid}", json={"role": "admin"})
assert r.status_code == 200
assert db_mod.get_user_by_id(uid)["role"] == "admin"
def test_deactivate_user(self, client):
import web.db as db_mod
_login(client)
uid = db_mod.create_user("byebye", "p", "reader")
r = client.delete(f"/users/{uid}")
assert r.status_code == 200
assert db_mod.get_user_by_id(uid)["is_active"] == 0
def test_cannot_deactivate_self(self, client):
import web.db as db_mod
_login(client)
me = db_mod.get_user_by_username("superadmin")
r = client.delete(f"/users/{me['id']}")
assert r.status_code == 403