diff --git a/tests/live/test_mysql_backend_live.py b/tests/live/test_mysql_backend_live.py new file mode 100644 index 0000000..0845af3 --- /dev/null +++ b/tests/live/test_mysql_backend_live.py @@ -0,0 +1,208 @@ +""" +Live integration tests for the MySQL dashboard backend. + +Requires a real MySQL server. Skipped unless ``DECNET_DB_URL`` (or +``DECNET_MYSQL_TEST_URL``) is exported pointing at a running instance, +e.g. a throw-away docker container: + + docker run -d --rm --name decnet-mysql-test \ + -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=decnet \ + -e MYSQL_USER=decnet -e MYSQL_PASSWORD=decnet \ + -p 3307:3306 mysql:8 + + # Either url works; the connecting account MUST have CREATE/DROP DATABASE + # privilege because each xdist worker uses its own throwaway schema. + export DECNET_DB_URL='mysql+aiomysql://root:root@127.0.0.1:3307/decnet' + pytest -m live tests/live/test_mysql_backend_live.py + +Each worker creates ``test_decnet_`` on session start and drops it +on session end. ```` is ``master`` outside xdist, ``gw0``/``gw1``/… +under it, so parallel runs never clash. +""" +from __future__ import annotations + +import json +import os +import uuid as _uuid +from datetime import datetime, timedelta, timezone +from urllib.parse import urlparse, urlunparse + +import pytest +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + +from decnet.web.db.mysql.repository import MySQLRepository + + +LIVE_URL = os.environ.get("DECNET_MYSQL_TEST_URL") or os.environ.get("DECNET_DB_URL") + +pytestmark = [ + pytest.mark.live, + pytest.mark.skipif( + not (LIVE_URL and LIVE_URL.startswith("mysql")), + reason="Set DECNET_DB_URL=mysql+aiomysql://... to run MySQL live tests", + ), +] + + +def _worker_id() -> str: + """Return a stable identifier for the current xdist worker (``master`` when single-process).""" + return os.environ.get("PYTEST_XDIST_WORKER", "master") + + +def _split_url(url: str) -> tuple[str, str]: + """Return (server_url_without_db, test_db_name).""" + parsed = urlparse(url) + server_url = urlunparse(parsed._replace(path="")) + db_name = f"test_decnet_{_worker_id()}" + return server_url, db_name + + +def _url_with_db(server_url: str, db_name: str) -> str: + parsed = urlparse(server_url) + return urlunparse(parsed._replace(path=f"/{db_name}")) + + +@pytest.fixture(scope="session") +async def mysql_test_db_url(): + """Create a per-worker throwaway database, yield its URL, drop it on teardown. + + Uses the configured URL's credentials to CREATE/DROP. If the account + lacks that privilege you'll see a clear SQL error — grant it with:: + + GRANT ALL PRIVILEGES ON `test\\_decnet\\_%`.* TO 'decnet'@'%'; + + or point ``DECNET_MYSQL_TEST_URL`` at a root-level URL. + """ + server_url, db_name = _split_url(LIVE_URL) + + admin = create_async_engine(server_url, isolation_level="AUTOCOMMIT") + try: + async with admin.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS `{db_name}`")) + await conn.execute(text(f"CREATE DATABASE `{db_name}`")) + finally: + await admin.dispose() + + yield _url_with_db(server_url, db_name) + + # Teardown — always drop, even if tests errored. + admin = create_async_engine(server_url, isolation_level="AUTOCOMMIT") + try: + async with admin.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS `{db_name}`")) + finally: + await admin.dispose() + + +@pytest.fixture +async def mysql_repo(mysql_test_db_url): + """Fresh schema per test — truncate between tests to keep them isolated.""" + repo = MySQLRepository(url=mysql_test_db_url) + await repo.initialize() + yield repo + + # Per-test cleanup: truncate with FK checks disabled so order doesn't matter. + async with repo.engine.begin() as conn: + await conn.execute(text("SET FOREIGN_KEY_CHECKS = 0")) + for tbl in ("attacker_behavior", "attackers", "logs", "bounty", "state", "users"): + await conn.execute(text(f"TRUNCATE TABLE `{tbl}`")) + await conn.execute(text("SET FOREIGN_KEY_CHECKS = 1")) + await repo.engine.dispose() + + +async def test_schema_creation_and_admin_seed(mysql_repo): + user = await mysql_repo.get_user_by_username(os.environ.get("DECNET_ADMIN_USER", "admin")) + assert user is not None + assert user["role"] == "admin" + + +async def test_add_and_query_logs(mysql_repo): + await mysql_repo.add_log({ + "decky": "decky-01", "service": "ssh", "event_type": "connect", + "attacker_ip": "10.0.0.7", "raw_line": "connect from 10.0.0.7", + "fields": json.dumps({"port": 22}), "msg": "conn", + }) + logs = await mysql_repo.get_logs(limit=10) + assert any(lg["attacker_ip"] == "10.0.0.7" for lg in logs) + assert await mysql_repo.get_total_logs() >= 1 + + +async def test_json_field_search(mysql_repo): + await mysql_repo.add_log({ + "decky": "d1", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.2.3.4", "raw_line": "x", + "fields": json.dumps({"username": "root"}), "msg": "", + }) + hits = await mysql_repo.get_logs(search="username:root") + assert any("1.2.3.4" == h["attacker_ip"] for h in hits) + + +async def test_histogram_buckets(mysql_repo): + now = datetime.now(timezone.utc).replace(microsecond=0) + for i in range(3): + await mysql_repo.add_log({ + "decky": "h", "service": "ssh", "event_type": "connect", + "attacker_ip": "9.9.9.9", + "raw_line": f"line {i}", "fields": "{}", "msg": "", + "timestamp": (now - timedelta(minutes=i)).isoformat(), + }) + buckets = await mysql_repo.get_log_histogram(interval_minutes=5) + assert buckets, "expected at least one histogram bucket" + for b in buckets: + assert "time" in b and "count" in b + assert b["count"] >= 1 + + +async def test_bounty_roundtrip(mysql_repo): + await mysql_repo.add_bounty({ + "decky": "decky-01", "service": "ssh", "attacker_ip": "10.0.0.1", + "bounty_type": "credentials", + "payload": {"username": "root", "password": "toor"}, + }) + out = await mysql_repo.get_bounties() + assert any(b["bounty_type"] == "credentials" for b in out) + + +async def test_user_crud(mysql_repo): + uid = str(_uuid.uuid4()) + await mysql_repo.create_user({ + "uuid": uid, "username": "live_tester", + "password_hash": "hashed", "role": "viewer", "must_change_password": True, + }) + u = await mysql_repo.get_user_by_uuid(uid) + assert u and u["username"] == "live_tester" + await mysql_repo.update_user_role(uid, "admin") + u2 = await mysql_repo.get_user_by_uuid(uid) + assert u2["role"] == "admin" + ok = await mysql_repo.delete_user(uid) + assert ok + assert await mysql_repo.get_user_by_uuid(uid) is None + + +async def test_purge_clears_tables(mysql_repo): + await mysql_repo.add_log({ + "decky": "p", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": "x", "fields": "{}", "msg": "", + }) + await mysql_repo.purge_logs_and_bounties() + assert await mysql_repo.get_total_logs() == 0 + + +async def test_large_commands_blob_round_trips(mysql_repo): + """Attacker.commands must handle >64 KiB (MEDIUMTEXT) — was 1406 errors on TEXT.""" + big_commands = [ + {"service": "ssh", "decky": "d", "command": "A" * 512, + "timestamp": "2026-04-15T12:00:00+00:00"} + for _ in range(500) # ~250 KiB + ] + ip = "8.8.8.8" + now = datetime.now(timezone.utc) + row_uuid = await mysql_repo.upsert_attacker({ + "ip": ip, "first_seen": now, "last_seen": now, + "event_count": 0, "service_count": 0, "decky_count": 0, + "commands": json.dumps(big_commands), + }) + got = await mysql_repo.get_attacker_by_uuid(row_uuid) + assert got is not None + assert len(got["commands"]) == 500 diff --git a/tests/mysql_spinup.sh b/tests/mysql_spinup.sh new file mode 100755 index 0000000..71fccd2 --- /dev/null +++ b/tests/mysql_spinup.sh @@ -0,0 +1,20 @@ +# start the instance +docker run -d --rm --name decnet-mysql \ + -e MYSQL_ROOT_PASSWORD=root \ + -e MYSQL_DATABASE=decnet \ + -e MYSQL_USER=decnet \ + -e MYSQL_PASSWORD=decnet \ + -p 3307:3306 mysql:8 + +until docker exec decnet-mysql mysqladmin ping -h127.0.0.1 -uroot -proot --silent; do + sleep 1 +done + +echo "MySQL up." + +export DECNET_DB_TYPE=mysql +export DECNET_DB_URL='mysql+aiomysql://decnet:decnet@127.0.0.1:3307/decnet' + +source ../.venv/bin/activate + +sudo ../.venv/bin/decnet api diff --git a/tests/test_mysql_histogram_sql.py b/tests/test_mysql_histogram_sql.py new file mode 100644 index 0000000..1088560 --- /dev/null +++ b/tests/test_mysql_histogram_sql.py @@ -0,0 +1,70 @@ +""" +Inspection-level tests for the MySQL-dialect SQL emitted by MySQLRepository. + +We compile the SQLAlchemy statements against the MySQL dialect and assert on +the string form — no live MySQL server is required. +""" +import pytest +from sqlalchemy import func, select, literal_column +from sqlalchemy.dialects import mysql +from sqlmodel.sql.expression import SelectOfScalar + +from decnet.web.db.models import Log + + +def _compile(stmt) -> str: + """Compile a statement to MySQL-dialect SQL with literal values inlined.""" + return str(stmt.compile( + dialect=mysql.dialect(), + compile_kwargs={"literal_binds": True}, + )) + + +def test_mysql_histogram_uses_from_unixtime_bucket(): + """The MySQL dialect must bucket with UNIX_TIMESTAMP DIV N * N wrapped in FROM_UNIXTIME.""" + bucket_seconds = 900 # 15 min + bucket_expr = literal_column( + f"FROM_UNIXTIME((UNIX_TIMESTAMP(timestamp) DIV {bucket_seconds}) * {bucket_seconds})" + ).label("bucket_time") + stmt: SelectOfScalar = select(bucket_expr, func.count().label("count")).select_from(Log) + + sql = _compile(stmt) + assert "FROM_UNIXTIME" in sql + assert "UNIX_TIMESTAMP" in sql + assert "DIV 900" in sql + # Sanity: SQLite-only strftime must NOT appear in the MySQL-dialect output. + assert "strftime" not in sql + assert "unixepoch" not in sql + + +def test_mysql_json_unquote_predicate_shape(): + """MySQL JSON filter uses JSON_UNQUOTE(JSON_EXTRACT(...)).""" + from decnet.web.db.mysql.repository import MySQLRepository + + # Build a dummy instance without touching the engine. We only need _json_field_equals, + # which is a pure function of the key. + repo = MySQLRepository.__new__(MySQLRepository) # bypass __init__ / no DB connection + predicate = repo._json_field_equals("username") + + # text() objects carry their literal SQL in .text + assert "JSON_UNQUOTE" in predicate.text + assert "JSON_EXTRACT(fields, '$.username')" in predicate.text + assert ":val" in predicate.text + + +@pytest.mark.parametrize("key", ["user", "port", "sess_id"]) +def test_mysql_json_predicate_safe_for_reasonable_keys(key): + """Keys matching [A-Za-z0-9_]+ are inserted verbatim; verify no SQL breakage.""" + from decnet.web.db.mysql.repository import MySQLRepository + repo = MySQLRepository.__new__(MySQLRepository) + pred = repo._json_field_equals(key) + assert f"'$.{key}'" in pred.text + + +def test_sqlite_histogram_still_uses_strftime(): + """Regression guard — SQLite implementation must keep its strftime-based bucket.""" + from decnet.web.db.sqlite.repository import SQLiteRepository + import inspect + src = inspect.getsource(SQLiteRepository.get_log_histogram) + assert "strftime" in src + assert "unixepoch" in src diff --git a/tests/test_mysql_url_builder.py b/tests/test_mysql_url_builder.py new file mode 100644 index 0000000..ae14710 --- /dev/null +++ b/tests/test_mysql_url_builder.py @@ -0,0 +1,78 @@ +""" +Unit tests for decnet.web.db.mysql.database.build_mysql_url / resolve_url. + +No MySQL server is required — these are pure URL-construction tests. +""" +import pytest + +from decnet.web.db.mysql.database import build_mysql_url, resolve_url + + +def test_build_url_defaults(monkeypatch): + for v in ("DECNET_DB_HOST", "DECNET_DB_PORT", "DECNET_DB_NAME", + "DECNET_DB_USER", "DECNET_DB_PASSWORD", "DECNET_DB_URL"): + monkeypatch.delenv(v, raising=False) + # PYTEST_* is set by pytest itself, so empty password is allowed here. + url = build_mysql_url() + assert url == "mysql+aiomysql://decnet:@localhost:3306/decnet" + + +def test_build_url_from_env(monkeypatch): + monkeypatch.setenv("DECNET_DB_HOST", "db.internal") + monkeypatch.setenv("DECNET_DB_PORT", "3307") + monkeypatch.setenv("DECNET_DB_NAME", "decnet_prod") + monkeypatch.setenv("DECNET_DB_USER", "svc_decnet") + monkeypatch.setenv("DECNET_DB_PASSWORD", "hunter2") + url = build_mysql_url() + assert url == "mysql+aiomysql://svc_decnet:hunter2@db.internal:3307/decnet_prod" + + +def test_build_url_percent_encodes_password(monkeypatch): + """Passwords with @ : / # etc must not break URL parsing.""" + monkeypatch.setenv("DECNET_DB_PASSWORD", "p@ss:word/!#") + url = build_mysql_url(user="u", host="h", port=3306, database="d") + # @ → %40, : → %3A, / → %2F, # → %23, ! → %21 + assert "p%40ss%3Aword%2F%21%23" in url + assert url.startswith("mysql+aiomysql://u:") + assert url.endswith("@h:3306/d") + + +def test_build_url_component_args_override_env(monkeypatch): + monkeypatch.setenv("DECNET_DB_HOST", "ignored") + monkeypatch.setenv("DECNET_DB_PASSWORD", "env-pw") + url = build_mysql_url(host="arg.host", user="arg-user", password="arg-pw", + port=9999, database="arg-db") + assert url == "mysql+aiomysql://arg-user:arg-pw@arg.host:9999/arg-db" + + +def test_resolve_url_prefers_explicit_arg(monkeypatch): + monkeypatch.setenv("DECNET_DB_URL", "mysql+aiomysql://env-url/x") + assert resolve_url("mysql+aiomysql://explicit/y") == "mysql+aiomysql://explicit/y" + + +def test_resolve_url_uses_env_url_before_components(monkeypatch): + monkeypatch.setenv("DECNET_DB_URL", "mysql+aiomysql://env-user:env-pw@env-host/env-db") + monkeypatch.setenv("DECNET_DB_HOST", "ignored.host") + assert resolve_url() == "mysql+aiomysql://env-user:env-pw@env-host/env-db" + + +def test_resolve_url_falls_back_to_components(monkeypatch): + monkeypatch.delenv("DECNET_DB_URL", raising=False) + monkeypatch.setenv("DECNET_DB_HOST", "fallback.host") + monkeypatch.setenv("DECNET_DB_PASSWORD", "pw") + url = resolve_url() + assert "fallback.host" in url + assert url.startswith("mysql+aiomysql://") + + +def test_build_url_requires_password_outside_pytest(monkeypatch): + """Without a password and not in a pytest run, construction must fail loudly.""" + for v in ("DECNET_DB_URL", "DECNET_DB_PASSWORD"): + monkeypatch.delenv(v, raising=False) + # Strip every PYTEST_* env var so the safety check trips. + import os + for k in list(os.environ): + if k.startswith("PYTEST"): + monkeypatch.delenv(k, raising=False) + with pytest.raises(ValueError, match="DECNET_DB_PASSWORD is not set"): + build_mysql_url()