diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 02b56d2..6053293 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,7 +1,10 @@ { "permissions": { "allow": [ - "mcp__plugin_context-mode_context-mode__ctx_batch_execute" + "mcp__plugin_context-mode_context-mode__ctx_batch_execute", + "mcp__plugin_context-mode_context-mode__ctx_search", + "Bash(grep:*)", + "Bash(python -m pytest --tb=short -q)" ] } } diff --git a/.hypothesis/constants/3b152726a666601e b/.hypothesis/constants/3b152726a666601e new file mode 100644 index 0000000..e6e1485 --- /dev/null +++ b/.hypothesis/constants/3b152726a666601e @@ -0,0 +1,4 @@ +# file: /home/anti/Tools/DECNET/decnet/web/sqlite_repository.py +# hypothesis_version: 6.151.11 + +[0.1, ' AND ', ' WHERE ', ':', 'BEGIN IMMEDIATE', 'COMMIT', 'ROLLBACK', '[^a-zA-Z0-9_]', 'active_deckies', 'admin', 'attacker', 'attacker-ip', 'attacker_ip', 'bounty_type', 'bounty_type = ?', 'bucket_time', 'count', 'decky', 'decnet.db', 'deployed_deckies', 'event', 'event_type', 'fields', 'id > ?', 'max_id', 'msg', 'must_change_password', 'password_hash', 'payload', 'raw_line', 'role', 'service', 'time', 'timestamp', 'timestamp <= ?', 'timestamp >= ?', 'total', 'total_logs', 'unique_attackers', 'username', 'uuid'] \ No newline at end of file diff --git a/.hypothesis/constants/b807ea9189944fb3 b/.hypothesis/constants/b807ea9189944fb3 new file mode 100644 index 0000000..04c6133 --- /dev/null +++ b/.hypothesis/constants/b807ea9189944fb3 @@ -0,0 +1,4 @@ +# file: /home/anti/Tools/DECNET/decnet/web/api.py +# hypothesis_version: 6.151.11 + +[0.5, '/api/v1', '/docs', '/openapi.json', '/redoc', '1.0.0', 'Authorization', 'Content-Type', 'DELETE', 'GET', 'OPTIONS', 'POST', 'PUT'] \ No newline at end of file diff --git a/.hypothesis/constants/ff35158fdfe08acb b/.hypothesis/constants/ff35158fdfe08acb new file mode 100644 index 0000000..86a9d99 --- /dev/null +++ b/.hypothesis/constants/ff35158fdfe08acb @@ -0,0 +1,4 @@ +# file: /home/anti/Tools/DECNET/decnet/env.py +# hypothesis_version: 6.151.11 + +[',', '.env', '.env.local', '0.0.0.0', '8000', '8080', 'DECNET_ADMIN_USER', 'DECNET_API_HOST', 'DECNET_API_PORT', 'DECNET_CORS_ORIGINS', 'DECNET_DEVELOPER', 'DECNET_JWT_SECRET', 'DECNET_WEB_HOST', 'DECNET_WEB_PORT', 'False', 'admin', 'changeme', 'password', 'secret', 'true'] \ No newline at end of file diff --git a/.hypothesis/unicode_data/16.0.0/codec-utf-8.json.gz b/.hypothesis/unicode_data/16.0.0/codec-utf-8.json.gz index 864dfa3..c11a20d 100644 Binary files a/.hypothesis/unicode_data/16.0.0/codec-utf-8.json.gz and b/.hypothesis/unicode_data/16.0.0/codec-utf-8.json.gz differ diff --git a/test_api_decnet.db-shm b/test_api_decnet.db-shm new file mode 100644 index 0000000..fe9ac28 Binary files /dev/null and b/test_api_decnet.db-shm differ diff --git a/test_api_decnet.db-wal b/test_api_decnet.db-wal new file mode 100644 index 0000000..e69de29 diff --git a/test_bounty_decnet.db-shm b/test_bounty_decnet.db-shm index 3d0aaf9..7c1bef2 100644 Binary files a/test_bounty_decnet.db-shm and b/test_bounty_decnet.db-shm differ diff --git a/test_bounty_decnet.db-wal b/test_bounty_decnet.db-wal index f5ce014..0909403 100644 Binary files a/test_bounty_decnet.db-wal and b/test_bounty_decnet.db-wal differ diff --git a/test_decnet.db-shm b/test_decnet.db-shm index a1988f3..ca4a95d 100644 Binary files a/test_decnet.db-shm and b/test_decnet.db-shm differ diff --git a/test_decnet.db-wal b/test_decnet.db-wal index bf38d63..714b9c6 100644 Binary files a/test_decnet.db-wal and b/test_decnet.db-wal differ diff --git a/test_fleet_decnet.db-shm b/test_fleet_decnet.db-shm index 1b71101..771848a 100644 Binary files a/test_fleet_decnet.db-shm and b/test_fleet_decnet.db-shm differ diff --git a/test_fleet_decnet.db-wal b/test_fleet_decnet.db-wal index c9dd302..8805b8f 100644 Binary files a/test_fleet_decnet.db-wal and b/test_fleet_decnet.db-wal differ diff --git a/test_fuzz_decnet.db-shm b/test_fuzz_decnet.db-shm index ab3ab37..5bfb1ff 100644 Binary files a/test_fuzz_decnet.db-shm and b/test_fuzz_decnet.db-shm differ diff --git a/test_fuzz_decnet.db-wal b/test_fuzz_decnet.db-wal index 7a01469..8e4ab42 100644 Binary files a/test_fuzz_decnet.db-wal and b/test_fuzz_decnet.db-wal differ diff --git a/tests/test_bounty.py b/tests/test_bounty.py deleted file mode 100644 index b35914b..0000000 --- a/tests/test_bounty.py +++ /dev/null @@ -1,41 +0,0 @@ -import os -from typing import Generator -import pytest -from fastapi.testclient import TestClient -from decnet.web.api import app -from decnet.web.dependencies import repo -from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD - -@pytest.fixture(autouse=True) -def setup_db() -> Generator[None, None, None]: - repo.db_path = "test_bounty_decnet.db" - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - repo.reinitialize() - yield - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - -@pytest.fixture -def auth_token(): - with TestClient(app) as client: - resp = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) - return resp.json()["access_token"] - -def test_add_and_get_bounty(auth_token): - with TestClient(app) as client: - # We can't directly call add_bounty from API yet (it's internal to ingester) - # But we can test the repository if we want, or mock a log line that triggers it. - # For now, let's test the endpoint returns 200 even if empty. - resp = client.get("/api/v1/bounty", headers={"Authorization": f"Bearer {auth_token}"}) - assert resp.status_code == 200 - data = resp.json() - assert "total" in data - assert "data" in data - assert isinstance(data["data"], list) - -def test_bounty_pagination(auth_token): - with TestClient(app) as client: - resp = client.get("/api/v1/bounty?limit=1&offset=0", headers={"Authorization": f"Bearer {auth_token}"}) - assert resp.status_code == 200 - assert resp.json()["limit"] == 1 diff --git a/tests/test_fleet_api.py b/tests/test_fleet_api.py deleted file mode 100644 index cb65881..0000000 --- a/tests/test_fleet_api.py +++ /dev/null @@ -1,99 +0,0 @@ -import json -import pytest -from fastapi.testclient import TestClient -from decnet.web.api import app -import decnet.config -from pathlib import Path -from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD -from decnet.web.dependencies import repo - -@pytest.fixture(autouse=True) -def setup_db(): - repo.db_path = "test_fleet_decnet.db" - import os - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - repo.reinitialize() - yield - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - -TEST_STATE_FILE = Path("test-decnet-state.json") - -@pytest.fixture(autouse=True) -def patch_state_file(monkeypatch): - # Patch the global STATE_FILE variable in the config module - monkeypatch.setattr(decnet.config, "STATE_FILE", TEST_STATE_FILE) - -@pytest.fixture -def mock_state_file(): - # Create a dummy state file for testing - _test_state = { - "config": { - "mode": "unihost", - "interface": "eth0", - "subnet": "192.168.1.0/24", - "gateway": "192.168.1.1", - "deckies": [ - { - "name": "test-decky-1", - "ip": "192.168.1.10", - "services": ["ssh"], - "distro": "debian", - "base_image": "debian", - "hostname": "test-host-1", - "service_config": {"ssh": {"banner": "SSH-2.0-OpenSSH_8.9"}}, - "archetype": "deaddeck", - "nmap_os": "linux", - "build_base": "debian:bookworm-slim" - }, - { - "name": "test-decky-2", - "ip": "192.168.1.11", - "services": ["http"], - "distro": "ubuntu", - "base_image": "ubuntu", - "hostname": "test-host-2", - "service_config": {}, - "archetype": None, - "nmap_os": "linux", - "build_base": "debian:bookworm-slim" - } - ], - "log_target": None, - "log_file": "test.log", - "ipvlan": False - }, - "compose_path": "test-compose.yml" - } - TEST_STATE_FILE.write_text(json.dumps(_test_state)) - - yield _test_state - - # Cleanup - if TEST_STATE_FILE.exists(): - TEST_STATE_FILE.unlink() - -def test_get_deckies_endpoint(mock_state_file): - with TestClient(app) as _client: - # Login to get token - _login_resp = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) - _token = _login_resp.json()["access_token"] - - _response = _client.get("/api/v1/deckies", headers={"Authorization": f"Bearer {_token}"}) - assert _response.status_code == 200 - _data = _response.json() - assert len(_data) == 2 - assert _data[0]["name"] == "test-decky-1" - assert _data[0]["service_config"]["ssh"]["banner"] == "SSH-2.0-OpenSSH_8.9" - -def test_stats_includes_deployed_count(mock_state_file): - with TestClient(app) as _client: - _login_resp = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) - _token = _login_resp.json()["access_token"] - - _response = _client.get("/api/v1/stats", headers={"Authorization": f"Bearer {_token}"}) - assert _response.status_code == 200 - _data = _response.json() - assert "deployed_deckies" in _data - assert _data["deployed_deckies"] == 2 diff --git a/tests/test_web_api.py b/tests/test_web_api.py deleted file mode 100644 index 0e1651f..0000000 --- a/tests/test_web_api.py +++ /dev/null @@ -1,135 +0,0 @@ -import os -from typing import Generator - -import pytest -from fastapi.testclient import TestClient - -from decnet.web.api import app -from decnet.web.dependencies import repo -from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD - - -@pytest.fixture(autouse=True) -def setup_db() -> Generator[None, None, None]: - repo.db_path = "test_decnet.db" - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - - repo.reinitialize() - - # Yield control to the test function - yield - - # Teardown - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - - -def test_login_success() -> None: - with TestClient(app) as client: - # The TestClient context manager triggers startup/shutdown events - response = client.post( - "/api/v1/auth/login", - json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD} - ) - assert response.status_code == 200 - data = response.json() - assert "access_token" in data - assert data["token_type"] == "bearer" - assert "must_change_password" in data - assert data["must_change_password"] is True - - -def test_login_failure() -> None: - with TestClient(app) as client: - response = client.post( - "/api/v1/auth/login", - json={"username": "admin", "password": "wrongpassword"} - ) - assert response.status_code == 401 - - response = client.post( - "/api/v1/auth/login", - json={"username": "nonexistent", "password": "wrongpassword"} - ) - assert response.status_code == 401 - - -def test_change_password() -> None: - with TestClient(app) as client: - # First login to get token - login_resp = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) - token = login_resp.json()["access_token"] - - # Try changing password with wrong old password - resp1 = client.post( - "/api/v1/auth/change-password", - json={"old_password": "wrong", "new_password": "new_secure_password"}, - headers={"Authorization": f"Bearer {token}"} - ) - assert resp1.status_code == 401 - - # Change password successfully - resp2 = client.post( - "/api/v1/auth/change-password", - json={"old_password": DECNET_ADMIN_PASSWORD, "new_password": "new_secure_password"}, - headers={"Authorization": f"Bearer {token}"} - ) - assert resp2.status_code == 200 - - # Verify old password no longer works - resp3 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) - assert resp3.status_code == 401 - - # Verify new password works and must_change_password is False - resp4 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": "new_secure_password"}) - assert resp4.status_code == 200 - assert resp4.json()["must_change_password"] is False - - -def test_get_logs_unauthorized() -> None: - with TestClient(app) as client: - response = client.get("/api/v1/logs") - assert response.status_code == 401 - - -def test_get_logs_success() -> None: - with TestClient(app) as client: - login_response = client.post( - "/api/v1/auth/login", - json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD} - ) - token = login_response.json()["access_token"] - - response = client.get( - "/api/v1/logs", - headers={"Authorization": f"Bearer {token}"} - ) - assert response.status_code == 200 - data = response.json() - assert "data" in data - assert data["total"] >= 0 - assert isinstance(data["data"], list) - -def test_get_stats_unauthorized() -> None: - with TestClient(app) as client: - response = client.get("/api/v1/stats") - assert response.status_code == 401 - -def test_get_stats_success() -> None: - with TestClient(app) as client: - login_response = client.post( - "/api/v1/auth/login", - json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD} - ) - token = login_response.json()["access_token"] - - response = client.get( - "/api/v1/stats", - headers={"Authorization": f"Bearer {token}"} - ) - assert response.status_code == 200 - data = response.json() - assert "total_logs" in data - assert "unique_attackers" in data - assert "active_deckies" in data diff --git a/tests/test_web_api_fuzz.py b/tests/test_web_api_fuzz.py deleted file mode 100644 index f14caeb..0000000 --- a/tests/test_web_api_fuzz.py +++ /dev/null @@ -1,110 +0,0 @@ -import os -import pytest -import json -from typing import Generator, Any, Optional -from fastapi.testclient import TestClient -from hypothesis import given, strategies as st, settings, HealthCheck -import httpx - -from decnet.web.api import app -from decnet.web.dependencies import repo -from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD - -# Re-use setup from test_web_api -@pytest.fixture(scope="function", autouse=True) -def setup_db() -> Generator[None, None, None]: - repo.db_path = "test_fuzz_decnet.db" - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - - repo.reinitialize() - yield - if os.path.exists(repo.db_path): - os.remove(repo.db_path) - -# bcrypt is intentionally slow, so we disable/extend the deadline -_FUZZ_SETTINGS: dict[str, Any] = { - "max_examples": 50, - "deadline": None, # bcrypt hashing takes >200ms - "suppress_health_check": [HealthCheck.function_scoped_fixture] -} - -@settings(**_FUZZ_SETTINGS) -@given( - username=st.text(min_size=0, max_size=2048), - password=st.text(min_size=0, max_size=2048) -) -def test_fuzz_login(username: str, password: str) -> None: - """Fuzz the login endpoint with random strings (including non-ASCII).""" - with TestClient(app) as _client: - _payload: dict[str, str] = {"username": username, "password": password} - try: - _response: httpx.Response = _client.post("/api/v1/auth/login", json=_payload) - # 200, 401, or 422 are acceptable. 500 is a failure. - assert _response.status_code in (200, 401, 422) - except (UnicodeEncodeError, json.JSONDecodeError): - pass - -@settings(**_FUZZ_SETTINGS) -@given( - old_password=st.text(min_size=0, max_size=2048), - new_password=st.text(min_size=0, max_size=2048) -) -def test_fuzz_change_password(old_password: str, new_password: str) -> None: - """Fuzz the change-password endpoint with random strings.""" - with TestClient(app) as _client: - # Get valid token first - _login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) - _token: str = _login_resp.json()["access_token"] - - _payload: dict[str, str] = {"old_password": old_password, "new_password": new_password} - try: - _response: httpx.Response = _client.post( - "/api/v1/auth/change-password", - json=_payload, - headers={"Authorization": f"Bearer {_token}"} - ) - assert _response.status_code in (200, 401, 422) - except (UnicodeEncodeError, json.JSONDecodeError): - pass - -@settings(**_FUZZ_SETTINGS) -@given( - limit=st.integers(min_value=-2000, max_value=5000), - offset=st.integers(min_value=-2000, max_value=5000), - search=st.one_of(st.none(), st.text(max_size=2048)) -) -def test_fuzz_get_logs(limit: int, offset: int, search: Optional[str]) -> None: - """Fuzz the logs pagination and search.""" - with TestClient(app) as _client: - _login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}) - _token: str = _login_resp.json()["access_token"] - - _params: dict[str, Any] = {"limit": limit, "offset": offset} - if search is not None: - _params["search"] = search - - _response: httpx.Response = _client.get( - "/api/v1/logs", - params=_params, - headers={"Authorization": f"Bearer {_token}"} - ) - - assert _response.status_code in (200, 422) - -@settings(**_FUZZ_SETTINGS) -@given( - token=st.text(min_size=0, max_size=4096) -) -def test_fuzz_auth_header(token: str) -> None: - """Fuzz the Authorization header with full unicode noise.""" - with TestClient(app) as _client: - try: - _response: httpx.Response = _client.get( - "/api/v1/stats", - headers={"Authorization": f"Bearer {token}"} - ) - assert _response.status_code in (401, 422) - except (UnicodeEncodeError, httpx.InvalidURL, httpx.CookieConflict): - # Expected client-side rejection of invalid header characters - pass