From ec66e01f55d938faac92ae8a21812a60154934fe Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 9 Apr 2026 12:24:09 -0400 Subject: [PATCH] fix: add missing __init__.py to tests/api subpackages to fix relative imports --- tests/api/__init__.py | 0 tests/api/auth/__init__.py | 0 tests/api/auth/test_change_pass.py | 61 +++++++++++++++++ tests/api/auth/test_login.py | 49 +++++++++++++ tests/api/bounty/__init__.py | 0 tests/api/bounty/test_get_bounties.py | 19 +++++ tests/api/conftest.py | 99 +++++++++++++++++++++++++++ tests/api/fleet/__init__.py | 0 tests/api/fleet/test_get_deckies.py | 16 +++++ tests/api/logs/__init__.py | 0 tests/api/logs/test_get_logs.py | 53 ++++++++++++++ tests/api/stats/__init__.py | 0 tests/api/stats/test_get_stats.py | 58 ++++++++++++++++ 13 files changed, 355 insertions(+) create mode 100644 tests/api/__init__.py create mode 100644 tests/api/auth/__init__.py create mode 100644 tests/api/auth/test_change_pass.py create mode 100644 tests/api/auth/test_login.py create mode 100644 tests/api/bounty/__init__.py create mode 100644 tests/api/bounty/test_get_bounties.py create mode 100644 tests/api/conftest.py create mode 100644 tests/api/fleet/__init__.py create mode 100644 tests/api/fleet/test_get_deckies.py create mode 100644 tests/api/logs/__init__.py create mode 100644 tests/api/logs/test_get_logs.py create mode 100644 tests/api/stats/__init__.py create mode 100644 tests/api/stats/test_get_stats.py diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/auth/__init__.py b/tests/api/auth/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/auth/test_change_pass.py b/tests/api/auth/test_change_pass.py new file mode 100644 index 0000000..2830d74 --- /dev/null +++ b/tests/api/auth/test_change_pass.py @@ -0,0 +1,61 @@ +import json +from fastapi.testclient import TestClient +from decnet.web.api import app +from hypothesis import given, strategies as st, settings +import httpx +from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD +from ..conftest import _FUZZ_SETTINGS + +def test_change_password() -> None: + with TestClient(app) as client: + # First login to get token + login_resp = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) + token = login_resp.json()["access_token"] + + # Try changing password with wrong old password + resp1 = client.post( + "/api/v1/auth/change-password", + json={"old_password": "wrong", "new_password": "new_secure_password"}, + headers={"Authorization": f"Bearer {token}"} + ) + assert resp1.status_code == 401 + + # Change password successfully + resp2 = client.post( + "/api/v1/auth/change-password", + json={"old_password": DECNET_ADMIN_PASSWORD, "new_password": "new_secure_password"}, + headers={"Authorization": f"Bearer {token}"} + ) + assert resp2.status_code == 200 + + # Verify old password no longer works + resp3 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) + assert resp3.status_code == 401 + + # Verify new password works and must_change_password is False + resp4 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": "new_secure_password"}) + assert resp4.status_code == 200 + assert resp4.json()["must_change_password"] is False + +@settings(**_FUZZ_SETTINGS) +@given( + old_password=st.text(min_size=0, max_size=2048), + new_password=st.text(min_size=0, max_size=2048) +) +def test_fuzz_change_password(old_password: str, new_password: str) -> None: + """Fuzz the change-password endpoint with random strings.""" + with TestClient(app) as _client: + # Get valid token first + _login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) + _token: str = _login_resp.json()["access_token"] + + _payload: dict[str, str] = {"old_password": old_password, "new_password": new_password} + try: + _response: httpx.Response = _client.post( + "/api/v1/auth/change-password", + json=_payload, + headers={"Authorization": f"Bearer {_token}"} + ) + assert _response.status_code in (200, 401, 422) + except (UnicodeEncodeError, json.JSONDecodeError): + pass diff --git a/tests/api/auth/test_login.py b/tests/api/auth/test_login.py new file mode 100644 index 0000000..220b7b4 --- /dev/null +++ b/tests/api/auth/test_login.py @@ -0,0 +1,49 @@ +import json +from fastapi.testclient import TestClient +from decnet.web.api import app +from hypothesis import given, strategies as st, settings +import httpx +from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD +from ..conftest import _FUZZ_SETTINGS + +def test_login_success() -> None: + with TestClient(app) as client: + response = client.post( + "/api/v1/auth/login", + json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD} + ) + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert data["token_type"] == "bearer" + assert "must_change_password" in data + assert data["must_change_password"] is True + +def test_login_failure() -> None: + with TestClient(app) as client: + response = client.post( + "/api/v1/auth/login", + json={"username": DECNET_ADMIN_USER, "password": "wrongpassword"} + ) + assert response.status_code == 401 + + response = client.post( + "/api/v1/auth/login", + json={"username": "nonexistent", "password": "wrongpassword"} + ) + assert response.status_code == 401 + +@settings(**_FUZZ_SETTINGS) +@given( + username=st.text(min_size=0, max_size=2048), + password=st.text(min_size=0, max_size=2048) +) +def test_fuzz_login(username: str, password: str) -> None: + """Fuzz the login endpoint with random strings (including non-ASCII).""" + with TestClient(app) as _client: + _payload: dict[str, str] = {"username": username, "password": password} + try: + _response: httpx.Response = _client.post("/api/v1/auth/login", json=_payload) + assert _response.status_code in (200, 401, 422) + except (UnicodeEncodeError, json.JSONDecodeError): + pass diff --git a/tests/api/bounty/__init__.py b/tests/api/bounty/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/bounty/test_get_bounties.py b/tests/api/bounty/test_get_bounties.py new file mode 100644 index 0000000..11a331f --- /dev/null +++ b/tests/api/bounty/test_get_bounties.py @@ -0,0 +1,19 @@ +from fastapi.testclient import TestClient +from decnet.web.api import app + +def test_add_and_get_bounty(auth_token): + with TestClient(app) as client: + # We can't directly call add_bounty from API yet (it's internal to ingester) + # But we can test the endpoint returns 200 even if empty. + resp = client.get("/api/v1/bounty", headers={"Authorization": f"Bearer {auth_token}"}) + assert resp.status_code == 200 + data = resp.json() + assert "total" in data + assert "data" in data + assert isinstance(data["data"], list) + +def test_bounty_pagination(auth_token): + with TestClient(app) as client: + resp = client.get("/api/v1/bounty?limit=1&offset=0", headers={"Authorization": f"Bearer {auth_token}"}) + assert resp.status_code == 200 + assert resp.json()["limit"] == 1 diff --git a/tests/api/conftest.py b/tests/api/conftest.py new file mode 100644 index 0000000..195c155 --- /dev/null +++ b/tests/api/conftest.py @@ -0,0 +1,99 @@ +import os +import json +import pytest +from typing import Generator, Any +from pathlib import Path +from fastapi.testclient import TestClient +from hypothesis import HealthCheck + +from decnet.web.api import app +from decnet.web.dependencies import repo +from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD +import decnet.config + +TEST_STATE_FILE = Path("test-decnet-state.json") + +@pytest.fixture(scope="function", autouse=True) +def setup_db() -> Generator[None, None, None]: + # Use a unique DB for each test process/thread if possible, but for now just one + repo.db_path = "test_api_decnet.db" + if os.path.exists(repo.db_path): + try: + os.remove(repo.db_path) + except OSError: + pass + + repo.reinitialize() + yield + if os.path.exists(repo.db_path): + try: + os.remove(repo.db_path) + except OSError: + pass + +@pytest.fixture +def auth_token() -> str: + with TestClient(app) as client: + resp = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) + return resp.json()["access_token"] + +@pytest.fixture(autouse=True) +def patch_state_file(monkeypatch): + monkeypatch.setattr(decnet.config, "STATE_FILE", TEST_STATE_FILE) + +@pytest.fixture +def mock_state_file(): + _test_state = { + "config": { + "mode": "unihost", + "interface": "eth0", + "subnet": "192.168.1.0/24", + "gateway": "192.168.1.1", + "deckies": [ + { + "name": "test-decky-1", + "ip": "192.168.1.10", + "services": ["ssh"], + "distro": "debian", + "base_image": "debian", + "hostname": "test-host-1", + "service_config": {"ssh": {"banner": "SSH-2.0-OpenSSH_8.9"}}, + "archetype": "deaddeck", + "nmap_os": "linux", + "build_base": "debian:bookworm-slim", + "mutate_interval": 30, + "last_mutated": 0.0 + }, + { + "name": "test-decky-2", + "ip": "192.168.1.11", + "services": ["http"], + "distro": "ubuntu", + "base_image": "ubuntu", + "hostname": "test-host-2", + "service_config": {}, + "archetype": None, + "nmap_os": "linux", + "build_base": "debian:bookworm-slim", + "mutate_interval": 30, + "last_mutated": 0.0 + } + ], + "log_target": None, + "log_file": "test.log", + "ipvlan": False, + "mutate_interval": 30 + }, + "compose_path": "test-compose.yml" + } + TEST_STATE_FILE.write_text(json.dumps(_test_state)) + yield _test_state + if TEST_STATE_FILE.exists(): + TEST_STATE_FILE.unlink() + +# Share fuzz settings across API tests +_FUZZ_SETTINGS: dict[str, Any] = { + "max_examples": 50, + "deadline": None, + "suppress_health_check": [HealthCheck.function_scoped_fixture] +} diff --git a/tests/api/fleet/__init__.py b/tests/api/fleet/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/fleet/test_get_deckies.py b/tests/api/fleet/test_get_deckies.py new file mode 100644 index 0000000..a7d6189 --- /dev/null +++ b/tests/api/fleet/test_get_deckies.py @@ -0,0 +1,16 @@ +from fastapi.testclient import TestClient +from decnet.web.api import app +from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD + +def test_get_deckies_endpoint(mock_state_file): + with TestClient(app) as _client: + # Login to get token + _login_resp = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) + _token = _login_resp.json()["access_token"] + + _response = _client.get("/api/v1/deckies", headers={"Authorization": f"Bearer {_token}"}) + assert _response.status_code == 200 + _data = _response.json() + assert len(_data) == 2 + assert _data[0]["name"] == "test-decky-1" + assert _data[0]["service_config"]["ssh"]["banner"] == "SSH-2.0-OpenSSH_8.9" diff --git a/tests/api/logs/__init__.py b/tests/api/logs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/logs/test_get_logs.py b/tests/api/logs/test_get_logs.py new file mode 100644 index 0000000..12471bb --- /dev/null +++ b/tests/api/logs/test_get_logs.py @@ -0,0 +1,53 @@ +from typing import Any, Optional +from fastapi.testclient import TestClient +from decnet.web.api import app +from hypothesis import given, strategies as st, settings +import httpx +from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD +from ..conftest import _FUZZ_SETTINGS + +def test_get_logs_unauthorized() -> None: + with TestClient(app) as client: + response = client.get("/api/v1/logs") + assert response.status_code == 401 + +def test_get_logs_success() -> None: + with TestClient(app) as client: + login_response = client.post( + "/api/v1/auth/login", + json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD} + ) + token = login_response.json()["access_token"] + + response = client.get( + "/api/v1/logs", + headers={"Authorization": f"Bearer {token}"} + ) + assert response.status_code == 200 + data = response.json() + assert "data" in data + assert data["total"] >= 0 + assert isinstance(data["data"], list) + +@settings(**_FUZZ_SETTINGS) +@given( + limit=st.integers(min_value=-2000, max_value=5000), + offset=st.integers(min_value=-2000, max_value=5000), + search=st.one_of(st.none(), st.text(max_size=2048)) +) +def test_fuzz_get_logs(limit: int, offset: int, search: Optional[str]) -> None: + with TestClient(app) as _client: + _login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) + _token: str = _login_resp.json()["access_token"] + + _params: dict[str, Any] = {"limit": limit, "offset": offset} + if search is not None: + _params["search"] = search + + _response: httpx.Response = _client.get( + "/api/v1/logs", + params=_params, + headers={"Authorization": f"Bearer {_token}"} + ) + + assert _response.status_code in (200, 422) diff --git a/tests/api/stats/__init__.py b/tests/api/stats/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/stats/test_get_stats.py b/tests/api/stats/test_get_stats.py new file mode 100644 index 0000000..7849530 --- /dev/null +++ b/tests/api/stats/test_get_stats.py @@ -0,0 +1,58 @@ +from typing import Any +from fastapi.testclient import TestClient +from decnet.web.api import app +from hypothesis import given, strategies as st, settings +import httpx +from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD +from ..conftest import _FUZZ_SETTINGS + +def test_get_stats_unauthorized() -> None: + with TestClient(app) as client: + response = client.get("/api/v1/stats") + assert response.status_code == 401 + +def test_get_stats_success() -> None: + with TestClient(app) as client: + login_response = client.post( + "/api/v1/auth/login", + json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD} + ) + token = login_response.json()["access_token"] + + response = client.get( + "/api/v1/stats", + headers={"Authorization": f"Bearer {token}"} + ) + assert response.status_code == 200 + data = response.json() + assert "total_logs" in data + assert "unique_attackers" in data + assert "active_deckies" in data + +def test_stats_includes_deployed_count(mock_state_file): + with TestClient(app) as _client: + _login_resp = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) + _token = _login_resp.json()["access_token"] + + _response = _client.get("/api/v1/stats", headers={"Authorization": f"Bearer {_token}"}) + assert _response.status_code == 200 + _data = _response.json() + assert "deployed_deckies" in _data + assert _data["deployed_deckies"] == 2 + +@settings(**_FUZZ_SETTINGS) +@given( + token=st.text(min_size=0, max_size=4096) +) +def test_fuzz_auth_header(token: str) -> None: + """Fuzz the Authorization header with full unicode noise.""" + with TestClient(app) as _client: + try: + _response: httpx.Response = _client.get( + "/api/v1/stats", + headers={"Authorization": f"Bearer {token}"} + ) + assert _response.status_code in (401, 422) + except (UnicodeEncodeError, httpx.InvalidURL, httpx.CookieConflict): + # Expected client-side rejection of invalid header characters + pass