Files
stealergram/tests/test_web_db.py
anti 4c104cddd2 Add web frontend with JWT auth, RBAC, SSE dashboard, and config editor
- FastAPI + htmx + Jinja2 web frontend, started with --web flag
- JWT HS256 auth (WEB_SECRET_KEY) with httpOnly cookies; access (15 min) +
  refresh (7 day) tokens; refresh rotation + JTI revocation in data/web.db
- RBAC: superadmin > admin > reader enforced per route
- Live SSE dashboard fed by tui/events broadcast queue
- Config editor: keyword groups and channel list saved to data/runtime_config.json
  and hot-reloaded in-process (scorer.reload_from_config, signal_channel_changed)
- config.py migrated to load groups/channels from runtime_config.json;
  falls back to hardcoded defaults when file absent
- tui/events.py: subscribe/unsubscribe broadcast, set_bot_context/signal_channel_changed
- utils/scorer.py: import config as _config (fixes local binding); reload_from_config()
- utils/database.py: count_by_severity, recent_for_domains, count_by_severity_for_domains
- 53 new tests (events bus, JWT lifecycle, web DB CRUD, RBAC enforcement,
  config round-trip); total 141 passing
2026-04-02 11:41:46 -03:00

109 lines
3.9 KiB
Python

"""
Tests for web/db.py — user store and refresh token management.
"""
import pytest
from pathlib import Path
@pytest.fixture
def tmp_db(tmp_path, monkeypatch):
"""Point web.db at a temp file for each test."""
import web.db as db_mod
db_path = tmp_path / "test_web.db"
monkeypatch.setattr(db_mod, "DB_FILE", db_path)
db_mod.init_db()
return db_mod
class TestInitDb:
def test_creates_superadmin_on_first_run(self, tmp_db):
import os
admin_user = os.environ["WEB_ADMIN_USER"]
user = tmp_db.get_user_by_username(admin_user)
assert user is not None
assert user["role"] == "superadmin"
assert user["is_active"] == 1
def test_second_init_does_not_duplicate(self, tmp_db):
import os
admin_user = os.environ["WEB_ADMIN_USER"]
tmp_db.init_db() # second call
users = tmp_db.list_users()
assert len([u for u in users if u["username"] == admin_user]) == 1
class TestUserCRUD:
def test_create_and_get_user(self, tmp_db):
uid = tmp_db.create_user("alice", "pass1", "reader")
user = tmp_db.get_user_by_id(uid)
assert user["username"] == "alice"
assert user["role"] == "reader"
def test_get_by_username(self, tmp_db):
tmp_db.create_user("bob", "pass2", "admin")
user = tmp_db.get_user_by_username("bob")
assert user is not None
assert user["role"] == "admin"
def test_update_role(self, tmp_db):
uid = tmp_db.create_user("carol", "pass3", "reader")
tmp_db.update_user(uid, role="admin")
user = tmp_db.get_user_by_id(uid)
assert user["role"] == "admin"
def test_update_password_is_hashed(self, tmp_db):
from web.auth import verify_password
uid = tmp_db.create_user("dave", "oldpass", "reader")
tmp_db.update_user(uid, password="newpass")
user = tmp_db.get_user_by_id(uid)
assert verify_password("newpass", user["password_hash"])
assert not verify_password("oldpass", user["password_hash"])
def test_deactivate_user(self, tmp_db):
uid = tmp_db.create_user("eve", "pass4", "reader")
tmp_db.deactivate_user(uid)
# get_user_by_username filters is_active=1
assert tmp_db.get_user_by_username("eve") is None
# get_user_by_id still returns the row
user = tmp_db.get_user_by_id(uid)
assert user["is_active"] == 0
def test_list_users(self, tmp_db):
tmp_db.create_user("u1", "p", "reader")
tmp_db.create_user("u2", "p", "admin")
users = tmp_db.list_users()
usernames = [u["username"] for u in users]
assert "u1" in usernames
assert "u2" in usernames
class TestRefreshTokens:
def test_store_and_validate(self, tmp_db):
from datetime import datetime, timedelta, timezone
import uuid
jti = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(days=7)
tmp_db.store_refresh_token(jti, "user-x", expires_at)
assert tmp_db.is_refresh_token_valid(jti)
def test_revoked_token_invalid(self, tmp_db):
from datetime import datetime, timedelta, timezone
import uuid
jti = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(days=7)
tmp_db.store_refresh_token(jti, "user-x", expires_at)
tmp_db.revoke_refresh_token(jti)
assert not tmp_db.is_refresh_token_valid(jti)
def test_expired_token_invalid(self, tmp_db):
from datetime import datetime, timedelta, timezone
import uuid
jti = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) - timedelta(seconds=1)
tmp_db.store_refresh_token(jti, "user-x", expires_at)
assert not tmp_db.is_refresh_token_valid(jti)
def test_unknown_jti_invalid(self, tmp_db):
assert not tmp_db.is_refresh_token_valid("nonexistent-jti")