diff --git a/pyproject.toml b/pyproject.toml index 55a0736..88708b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,16 +30,31 @@ dev = [ "pip-audit>=2.0", "httpx>=0.27.0", "hypothesis>=6.0", + "pytest-cov>=7.0", + "pytest-asyncio>=1.0", + "freezegun>=1.5", + "schemathesis>=4.0", ] [project.scripts] decnet = "decnet.cli:app" [tool.pytest.ini_options] +asyncio_mode = "auto" filterwarnings = [ "ignore::pytest.PytestUnhandledThreadExceptionWarning", + "ignore::DeprecationWarning", ] +[tool.coverage.run] +source = ["decnet"] +omit = ["*/tests/*", "templates/*"] + +[tool.coverage.report] +show_missing = true +skip_covered = false +# Run with: pytest --cov --cov-report=term-missing + [tool.setuptools.packages.find] where = ["."] include = ["decnet*"] diff --git a/tests/api/logs/test_histogram.py b/tests/api/logs/test_histogram.py new file mode 100644 index 0000000..608c13e --- /dev/null +++ b/tests/api/logs/test_histogram.py @@ -0,0 +1,95 @@ +""" +Histogram bucketing tests using freezegun. + +freeze_time controls Python's datetime.now() so we can compute +explicit bucket timestamps deterministically, then pass them to +add_log and verify SQLite groups them into the right buckets. +""" +import json +import pytest +from datetime import datetime, timedelta +from freezegun import freeze_time +from decnet.web.sqlite_repository import SQLiteRepository + + +@pytest.fixture +def repo(tmp_path): + return SQLiteRepository(db_path=str(tmp_path / "histogram_test.db")) + + +def _log(decky="d", service="ssh", ip="1.2.3.4", timestamp=None): + return { + "decky": decky, + "service": service, + "event_type": "connect", + "attacker_ip": ip, + "raw_line": "test", + "fields": "{}", + "msg": "", + **({"timestamp": timestamp} if timestamp else {}), + } + + +async def test_histogram_empty_db(repo): + result = await repo.get_log_histogram() + assert result == [] + + +@freeze_time("2026-04-09 12:00:00") +async def test_histogram_single_bucket(repo): + now = datetime.now() + ts = now.strftime("%Y-%m-%d %H:%M:%S") + + for _ in range(5): + await repo.add_log(_log(timestamp=ts)) + + result = await repo.get_log_histogram(interval_minutes=15) + assert len(result) == 1 + assert result[0]["count"] == 5 + + +@freeze_time("2026-04-09 12:00:00") +async def test_histogram_two_buckets(repo): + now = datetime.now() + bucket_a = now.strftime("%Y-%m-%d %H:%M:%S") + bucket_b = (now + timedelta(minutes=20)).strftime("%Y-%m-%d %H:%M:%S") + + for _ in range(3): + await repo.add_log(_log(timestamp=bucket_a)) + for _ in range(7): + await repo.add_log(_log(timestamp=bucket_b)) + + result = await repo.get_log_histogram(interval_minutes=15) + assert len(result) == 2 + counts = {r["count"] for r in result} + assert counts == {3, 7} + + +@freeze_time("2026-04-09 12:00:00") +async def test_histogram_respects_start_end_filter(repo): + now = datetime.now() + inside = now.strftime("%Y-%m-%d %H:%M:%S") + outside = (now - timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S") + + await repo.add_log(_log(timestamp=inside)) + await repo.add_log(_log(timestamp=outside)) + + start = (now - timedelta(minutes=30)).strftime("%Y-%m-%d %H:%M:%S") + end = (now + timedelta(minutes=30)).strftime("%Y-%m-%d %H:%M:%S") + + result = await repo.get_log_histogram(start_time=start, end_time=end, interval_minutes=15) + total = sum(r["count"] for r in result) + assert total == 1 + + +@freeze_time("2026-04-09 12:00:00") +async def test_histogram_search_filter(repo): + now = datetime.now() + ts = now.strftime("%Y-%m-%d %H:%M:%S") + + await repo.add_log(_log(decky="ssh-decky", service="ssh", timestamp=ts)) + await repo.add_log(_log(decky="ftp-decky", service="ftp", timestamp=ts)) + + result = await repo.get_log_histogram(search="service:ssh", interval_minutes=15) + total = sum(r["count"] for r in result) + assert total == 1 diff --git a/tests/api/test_repository.py b/tests/api/test_repository.py new file mode 100644 index 0000000..f05a244 --- /dev/null +++ b/tests/api/test_repository.py @@ -0,0 +1,164 @@ +""" +Direct async tests for SQLiteRepository. +These exercise the DB layer without going through the HTTP stack, +covering DEBT-006 (zero test coverage on the database layer). +""" +import json +import pytest +from decnet.web.sqlite_repository import SQLiteRepository + + +@pytest.fixture +def repo(tmp_path): + return SQLiteRepository(db_path=str(tmp_path / "test.db")) + + +async def test_add_and_get_log(repo): + await repo.add_log({ + "decky": "decky-01", + "service": "ssh", + "event_type": "connect", + "attacker_ip": "10.0.0.1", + "raw_line": "SSH connect from 10.0.0.1", + "fields": json.dumps({"port": 22}), + "msg": "new connection", + }) + logs = await repo.get_logs(limit=10, offset=0) + assert len(logs) == 1 + assert logs[0]["decky"] == "decky-01" + assert logs[0]["service"] == "ssh" + assert logs[0]["attacker_ip"] == "10.0.0.1" + + +async def test_get_total_logs(repo): + for i in range(5): + await repo.add_log({ + "decky": f"decky-0{i}", + "service": "ssh", + "event_type": "connect", + "attacker_ip": f"10.0.0.{i}", + "raw_line": "test", + "fields": "{}", + "msg": "", + }) + total = await repo.get_total_logs() + assert total == 5 + + +async def test_search_filter_by_decky(repo): + await repo.add_log({"decky": "target", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": "x", "fields": "{}", "msg": ""}) + await repo.add_log({"decky": "other", "service": "ftp", "event_type": "login", + "attacker_ip": "2.2.2.2", "raw_line": "y", "fields": "{}", "msg": ""}) + + logs = await repo.get_logs(search="decky:target") + assert len(logs) == 1 + assert logs[0]["decky"] == "target" + + +async def test_search_filter_by_service(repo): + await repo.add_log({"decky": "d1", "service": "rdp", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": "x", "fields": "{}", "msg": ""}) + await repo.add_log({"decky": "d2", "service": "smtp", "event_type": "connect", + "attacker_ip": "1.1.1.2", "raw_line": "y", "fields": "{}", "msg": ""}) + + logs = await repo.get_logs(search="service:rdp") + assert len(logs) == 1 + assert logs[0]["service"] == "rdp" + + +async def test_search_filter_by_json_field(repo): + await repo.add_log({"decky": "d1", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": "x", + "fields": json.dumps({"username": "root"}), "msg": ""}) + await repo.add_log({"decky": "d2", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.2", "raw_line": "y", + "fields": json.dumps({"username": "admin"}), "msg": ""}) + + logs = await repo.get_logs(search="username:root") + assert len(logs) == 1 + assert json.loads(logs[0]["fields"])["username"] == "root" + + +async def test_get_logs_after_id(repo): + for i in range(4): + await repo.add_log({"decky": "d", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": f"line {i}", + "fields": "{}", "msg": ""}) + + max_id = await repo.get_max_log_id() + assert max_id == 4 + + # Add one more after we captured max_id + await repo.add_log({"decky": "d", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": "line 4", "fields": "{}", "msg": ""}) + + new_logs = await repo.get_logs_after_id(last_id=max_id) + assert len(new_logs) == 1 + + +async def test_full_text_search(repo): + await repo.add_log({"decky": "d1", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": "supersecretstring", + "fields": "{}", "msg": ""}) + await repo.add_log({"decky": "d2", "service": "ftp", "event_type": "login", + "attacker_ip": "2.2.2.2", "raw_line": "nothing special", + "fields": "{}", "msg": ""}) + + logs = await repo.get_logs(search="supersecretstring") + assert len(logs) == 1 + + +async def test_pagination(repo): + for i in range(10): + await repo.add_log({"decky": "d", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.1.1.1", "raw_line": f"line {i}", + "fields": "{}", "msg": ""}) + + page1 = await repo.get_logs(limit=4, offset=0) + page2 = await repo.get_logs(limit=4, offset=4) + page3 = await repo.get_logs(limit=4, offset=8) + + assert len(page1) == 4 + assert len(page2) == 4 + assert len(page3) == 2 + # No duplicates across pages + ids1 = {r["id"] for r in page1} + ids2 = {r["id"] for r in page2} + assert ids1.isdisjoint(ids2) + + +async def test_add_and_get_bounty(repo): + await repo.add_bounty({ + "decky": "decky-01", + "service": "ssh", + "attacker_ip": "10.0.0.1", + "bounty_type": "credentials", + "payload": {"username": "root", "password": "toor"}, + }) + bounties = await repo.get_bounties(limit=10, offset=0) + assert len(bounties) == 1 + assert bounties[0]["decky"] == "decky-01" + assert bounties[0]["bounty_type"] == "credentials" + + +async def test_user_lifecycle(repo): + import uuid + uid = str(uuid.uuid4()) + await repo.create_user({ + "uuid": uid, + "username": "testuser", + "password_hash": "hashed_pw", + "role": "viewer", + "must_change_password": True, + }) + + user = await repo.get_user_by_username("testuser") + assert user is not None + assert user["role"] == "viewer" + assert user["must_change_password"] == 1 + + await repo.update_user_password(uid, "new_hashed_pw", must_change_password=False) + updated = await repo.get_user_by_uuid(uid) + assert updated["password_hash"] == "new_hashed_pw" + assert updated["must_change_password"] == 0 diff --git a/tests/api/test_schemathesis.py b/tests/api/test_schemathesis.py new file mode 100644 index 0000000..d29063d --- /dev/null +++ b/tests/api/test_schemathesis.py @@ -0,0 +1,22 @@ +""" +Schemathesis contract tests. + +Generates requests from the OpenAPI spec and verifies that no input causes a 5xx. + +Currently scoped to `not_a_server_error` only — full response-schema conformance +(including undocumented 401 responses) is blocked by DEBT-020 (missing error +response declarations across all protected endpoints). Once DEBT-020 is resolved, +replace the checks list with the default (remove the argument) for full compliance. + +Requires DECNET_DEVELOPER=true (set in tests/conftest.py) to expose /openapi.json. +""" +import schemathesis +from schemathesis.checks import not_a_server_error +from decnet.web.api import app + +schema = schemathesis.openapi.from_asgi("/openapi.json", app) + + +@schemathesis.pytest.parametrize(api=schema) +def test_schema_compliance(case): + case.call_and_validate(checks=[not_a_server_error]) diff --git a/tests/conftest.py b/tests/conftest.py index 4f1bcb6..efb3d0d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,3 +9,5 @@ import os os.environ.setdefault("DECNET_JWT_SECRET", "test-jwt-secret-not-for-production-use") os.environ.setdefault("DECNET_ADMIN_PASSWORD", "test-admin-password-1234!") os.environ.setdefault("DECNET_ADMIN_USER", "admin") +# Expose OpenAPI schema so schemathesis can load it during tests +os.environ.setdefault("DECNET_DEVELOPER", "true")