- FastAPI + htmx + Jinja2 web frontend, started with --web flag - JWT HS256 auth (WEB_SECRET_KEY) with httpOnly cookies; access (15 min) + refresh (7 day) tokens; refresh rotation + JTI revocation in data/web.db - RBAC: superadmin > admin > reader enforced per route - Live SSE dashboard fed by tui/events broadcast queue - Config editor: keyword groups and channel list saved to data/runtime_config.json and hot-reloaded in-process (scorer.reload_from_config, signal_channel_changed) - config.py migrated to load groups/channels from runtime_config.json; falls back to hardcoded defaults when file absent - tui/events.py: subscribe/unsubscribe broadcast, set_bot_context/signal_channel_changed - utils/scorer.py: import config as _config (fixes local binding); reload_from_config() - utils/database.py: count_by_severity, recent_for_domains, count_by_severity_for_domains - 53 new tests (events bus, JWT lifecycle, web DB CRUD, RBAC enforcement, config round-trip); total 141 passing
192 lines
6.9 KiB
Python
192 lines
6.9 KiB
Python
"""
|
|
Tests for web RBAC enforcement and config JSON round-trip.
|
|
|
|
Uses FastAPI TestClient with a temporary web.db.
|
|
"""
|
|
|
|
import json
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def isolated_web(tmp_path, monkeypatch):
|
|
"""Redirect web.db and runtime_config.json to tmp dirs for each test."""
|
|
from pathlib import Path
|
|
import web.db as db_mod
|
|
import config as cfg_mod
|
|
|
|
db_path = tmp_path / "web.db"
|
|
cfg_path = tmp_path / "runtime_config.json"
|
|
|
|
monkeypatch.setattr(db_mod, "DB_FILE", db_path)
|
|
monkeypatch.setattr(cfg_mod, "RUNTIME_CONFIG_PATH", cfg_path)
|
|
|
|
db_mod.init_db()
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
from fastapi.testclient import TestClient
|
|
from web.app import create_app
|
|
app = create_app()
|
|
return TestClient(app, raise_server_exceptions=True)
|
|
|
|
|
|
def _login(client, username="superadmin", password="superpass") -> dict:
|
|
"""Log in and return the cookies dict."""
|
|
r = client.post("/login", data={"username": username, "password": password},
|
|
follow_redirects=False)
|
|
assert r.status_code == 303, f"Login failed: {r.text}"
|
|
return client.cookies
|
|
|
|
|
|
# ─── Auth flow ────────────────────────────────────────────────────────────────
|
|
|
|
class TestAuthFlow:
|
|
def test_login_sets_cookies(self, client):
|
|
r = client.post("/login", data={"username": "superadmin", "password": "superpass"},
|
|
follow_redirects=False)
|
|
assert r.status_code == 303
|
|
assert "access_token" in r.cookies
|
|
assert "refresh_token" in r.cookies
|
|
|
|
def test_invalid_password_returns_401(self, client):
|
|
r = client.post("/login", data={"username": "superadmin", "password": "wrong"},
|
|
follow_redirects=False)
|
|
assert r.status_code == 401
|
|
|
|
def test_unauthenticated_dashboard_redirected_or_401(self, client):
|
|
r = client.get("/dashboard", follow_redirects=False)
|
|
assert r.status_code in (401, 302, 303)
|
|
|
|
def test_logout_clears_cookies(self, client):
|
|
_login(client)
|
|
r = client.post("/logout", follow_redirects=False)
|
|
assert r.status_code == 303
|
|
|
|
def test_authenticated_dashboard_ok(self, client):
|
|
_login(client)
|
|
r = client.get("/dashboard")
|
|
assert r.status_code == 200
|
|
|
|
|
|
# ─── RBAC ─────────────────────────────────────────────────────────────────────
|
|
|
|
class TestRBAC:
|
|
def test_reader_cannot_access_config(self, client):
|
|
import web.db as db_mod
|
|
db_mod.create_user("reader1", "pass", "reader")
|
|
_login(client, "reader1", "pass")
|
|
r = client.get("/config/keywords")
|
|
assert r.status_code == 403
|
|
|
|
def test_admin_can_access_config(self, client):
|
|
import web.db as db_mod
|
|
db_mod.create_user("admin1", "pass", "admin")
|
|
_login(client, "admin1", "pass")
|
|
r = client.get("/config/keywords")
|
|
assert r.status_code == 200
|
|
|
|
def test_reader_cannot_access_users(self, client):
|
|
import web.db as db_mod
|
|
db_mod.create_user("reader2", "pass", "reader")
|
|
_login(client, "reader2", "pass")
|
|
r = client.get("/users")
|
|
assert r.status_code == 403
|
|
|
|
def test_admin_cannot_access_users(self, client):
|
|
import web.db as db_mod
|
|
db_mod.create_user("admin2", "pass", "admin")
|
|
_login(client, "admin2", "pass")
|
|
r = client.get("/users")
|
|
assert r.status_code == 403
|
|
|
|
def test_superadmin_can_access_users(self, client):
|
|
_login(client)
|
|
r = client.get("/users")
|
|
assert r.status_code == 200
|
|
|
|
|
|
# ─── Config round-trip ────────────────────────────────────────────────────────
|
|
|
|
class TestConfigRoundTrip:
|
|
def test_put_keywords_saves_and_reloads(self, client):
|
|
import config
|
|
_login(client)
|
|
|
|
groups = [
|
|
{
|
|
"id": "testorg",
|
|
"name": "Test Org",
|
|
"patterns": [
|
|
{"regex": r"testorg\.com", "label": "Domain"},
|
|
{"regex": r"@testorg\.com", "label": "Employees"},
|
|
],
|
|
}
|
|
]
|
|
r = client.put("/config/keywords", json={"groups": groups})
|
|
assert r.status_code == 200
|
|
assert r.json()["groups"] == 1
|
|
|
|
# Config module globals updated in-process
|
|
assert any("testorg" in kw for kw in config.TARGET_KEYWORDS)
|
|
|
|
def test_put_keywords_invalid_regex_rejected(self, client):
|
|
_login(client)
|
|
groups = [
|
|
{
|
|
"id": "bad",
|
|
"name": "Bad",
|
|
"patterns": [{"regex": "[invalid(regex", "label": "oops"}],
|
|
}
|
|
]
|
|
r = client.put("/config/keywords", json={"groups": groups})
|
|
assert r.status_code == 422
|
|
|
|
def test_put_channels_saves(self, client):
|
|
import config
|
|
_login(client)
|
|
channels = ["testchannel", -1002748707556]
|
|
r = client.put("/config/channels", json={"channels": channels})
|
|
assert r.status_code == 200
|
|
assert config.WATCHED_CHANNELS == channels
|
|
|
|
|
|
# ─── User management ─────────────────────────────────────────────────────────
|
|
|
|
class TestUserManagement:
|
|
def test_create_user(self, client):
|
|
_login(client)
|
|
r = client.post("/users", json={"username": "newuser", "password": "pass", "role": "reader"})
|
|
assert r.status_code == 200
|
|
assert "id" in r.json()
|
|
|
|
def test_cannot_create_duplicate_username(self, client):
|
|
_login(client)
|
|
client.post("/users", json={"username": "dup", "password": "p", "role": "reader"})
|
|
r = client.post("/users", json={"username": "dup", "password": "p", "role": "reader"})
|
|
assert r.status_code == 409
|
|
|
|
def test_update_user_role(self, client):
|
|
import web.db as db_mod
|
|
_login(client)
|
|
uid = db_mod.create_user("patchme", "p", "reader")
|
|
r = client.patch(f"/users/{uid}", json={"role": "admin"})
|
|
assert r.status_code == 200
|
|
assert db_mod.get_user_by_id(uid)["role"] == "admin"
|
|
|
|
def test_deactivate_user(self, client):
|
|
import web.db as db_mod
|
|
_login(client)
|
|
uid = db_mod.create_user("byebye", "p", "reader")
|
|
r = client.delete(f"/users/{uid}")
|
|
assert r.status_code == 200
|
|
assert db_mod.get_user_by_id(uid)["is_active"] == 0
|
|
|
|
def test_cannot_deactivate_self(self, client):
|
|
import web.db as db_mod
|
|
_login(client)
|
|
me = db_mod.get_user_by_username("superadmin")
|
|
r = client.delete(f"/users/{me['id']}")
|
|
assert r.status_code == 403
|