refactor: modularize API tests to match router structure
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
{
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"mcp__plugin_context-mode_context-mode__ctx_batch_execute"
|
||||
"mcp__plugin_context-mode_context-mode__ctx_batch_execute",
|
||||
"mcp__plugin_context-mode_context-mode__ctx_search",
|
||||
"Bash(grep:*)",
|
||||
"Bash(python -m pytest --tb=short -q)"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
4
.hypothesis/constants/3b152726a666601e
Normal file
4
.hypothesis/constants/3b152726a666601e
Normal file
@@ -0,0 +1,4 @@
|
||||
# file: /home/anti/Tools/DECNET/decnet/web/sqlite_repository.py
|
||||
# hypothesis_version: 6.151.11
|
||||
|
||||
[0.1, ' AND ', ' WHERE ', ':', 'BEGIN IMMEDIATE', 'COMMIT', 'ROLLBACK', '[^a-zA-Z0-9_]', 'active_deckies', 'admin', 'attacker', 'attacker-ip', 'attacker_ip', 'bounty_type', 'bounty_type = ?', 'bucket_time', 'count', 'decky', 'decnet.db', 'deployed_deckies', 'event', 'event_type', 'fields', 'id > ?', 'max_id', 'msg', 'must_change_password', 'password_hash', 'payload', 'raw_line', 'role', 'service', 'time', 'timestamp', 'timestamp <= ?', 'timestamp >= ?', 'total', 'total_logs', 'unique_attackers', 'username', 'uuid']
|
||||
4
.hypothesis/constants/b807ea9189944fb3
Normal file
4
.hypothesis/constants/b807ea9189944fb3
Normal file
@@ -0,0 +1,4 @@
|
||||
# file: /home/anti/Tools/DECNET/decnet/web/api.py
|
||||
# hypothesis_version: 6.151.11
|
||||
|
||||
[0.5, '/api/v1', '/docs', '/openapi.json', '/redoc', '1.0.0', 'Authorization', 'Content-Type', 'DELETE', 'GET', 'OPTIONS', 'POST', 'PUT']
|
||||
4
.hypothesis/constants/ff35158fdfe08acb
Normal file
4
.hypothesis/constants/ff35158fdfe08acb
Normal file
@@ -0,0 +1,4 @@
|
||||
# file: /home/anti/Tools/DECNET/decnet/env.py
|
||||
# hypothesis_version: 6.151.11
|
||||
|
||||
[',', '.env', '.env.local', '0.0.0.0', '8000', '8080', 'DECNET_ADMIN_USER', 'DECNET_API_HOST', 'DECNET_API_PORT', 'DECNET_CORS_ORIGINS', 'DECNET_DEVELOPER', 'DECNET_JWT_SECRET', 'DECNET_WEB_HOST', 'DECNET_WEB_PORT', 'False', 'admin', 'changeme', 'password', 'secret', 'true']
|
||||
Binary file not shown.
BIN
test_api_decnet.db-shm
Normal file
BIN
test_api_decnet.db-shm
Normal file
Binary file not shown.
0
test_api_decnet.db-wal
Normal file
0
test_api_decnet.db-wal
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,41 +0,0 @@
|
||||
import os
|
||||
from typing import Generator
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from decnet.web.api import app
|
||||
from decnet.web.dependencies import repo
|
||||
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db() -> Generator[None, None, None]:
|
||||
repo.db_path = "test_bounty_decnet.db"
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
repo.reinitialize()
|
||||
yield
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
|
||||
@pytest.fixture
|
||||
def auth_token():
|
||||
with TestClient(app) as client:
|
||||
resp = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||
return resp.json()["access_token"]
|
||||
|
||||
def test_add_and_get_bounty(auth_token):
|
||||
with TestClient(app) as client:
|
||||
# We can't directly call add_bounty from API yet (it's internal to ingester)
|
||||
# But we can test the repository if we want, or mock a log line that triggers it.
|
||||
# For now, let's test the endpoint returns 200 even if empty.
|
||||
resp = client.get("/api/v1/bounty", headers={"Authorization": f"Bearer {auth_token}"})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "total" in data
|
||||
assert "data" in data
|
||||
assert isinstance(data["data"], list)
|
||||
|
||||
def test_bounty_pagination(auth_token):
|
||||
with TestClient(app) as client:
|
||||
resp = client.get("/api/v1/bounty?limit=1&offset=0", headers={"Authorization": f"Bearer {auth_token}"})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["limit"] == 1
|
||||
@@ -1,99 +0,0 @@
|
||||
import json
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from decnet.web.api import app
|
||||
import decnet.config
|
||||
from pathlib import Path
|
||||
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db():
|
||||
repo.db_path = "test_fleet_decnet.db"
|
||||
import os
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
repo.reinitialize()
|
||||
yield
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
|
||||
TEST_STATE_FILE = Path("test-decnet-state.json")
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_state_file(monkeypatch):
|
||||
# Patch the global STATE_FILE variable in the config module
|
||||
monkeypatch.setattr(decnet.config, "STATE_FILE", TEST_STATE_FILE)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_state_file():
|
||||
# Create a dummy state file for testing
|
||||
_test_state = {
|
||||
"config": {
|
||||
"mode": "unihost",
|
||||
"interface": "eth0",
|
||||
"subnet": "192.168.1.0/24",
|
||||
"gateway": "192.168.1.1",
|
||||
"deckies": [
|
||||
{
|
||||
"name": "test-decky-1",
|
||||
"ip": "192.168.1.10",
|
||||
"services": ["ssh"],
|
||||
"distro": "debian",
|
||||
"base_image": "debian",
|
||||
"hostname": "test-host-1",
|
||||
"service_config": {"ssh": {"banner": "SSH-2.0-OpenSSH_8.9"}},
|
||||
"archetype": "deaddeck",
|
||||
"nmap_os": "linux",
|
||||
"build_base": "debian:bookworm-slim"
|
||||
},
|
||||
{
|
||||
"name": "test-decky-2",
|
||||
"ip": "192.168.1.11",
|
||||
"services": ["http"],
|
||||
"distro": "ubuntu",
|
||||
"base_image": "ubuntu",
|
||||
"hostname": "test-host-2",
|
||||
"service_config": {},
|
||||
"archetype": None,
|
||||
"nmap_os": "linux",
|
||||
"build_base": "debian:bookworm-slim"
|
||||
}
|
||||
],
|
||||
"log_target": None,
|
||||
"log_file": "test.log",
|
||||
"ipvlan": False
|
||||
},
|
||||
"compose_path": "test-compose.yml"
|
||||
}
|
||||
TEST_STATE_FILE.write_text(json.dumps(_test_state))
|
||||
|
||||
yield _test_state
|
||||
|
||||
# Cleanup
|
||||
if TEST_STATE_FILE.exists():
|
||||
TEST_STATE_FILE.unlink()
|
||||
|
||||
def test_get_deckies_endpoint(mock_state_file):
|
||||
with TestClient(app) as _client:
|
||||
# Login to get token
|
||||
_login_resp = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||
_token = _login_resp.json()["access_token"]
|
||||
|
||||
_response = _client.get("/api/v1/deckies", headers={"Authorization": f"Bearer {_token}"})
|
||||
assert _response.status_code == 200
|
||||
_data = _response.json()
|
||||
assert len(_data) == 2
|
||||
assert _data[0]["name"] == "test-decky-1"
|
||||
assert _data[0]["service_config"]["ssh"]["banner"] == "SSH-2.0-OpenSSH_8.9"
|
||||
|
||||
def test_stats_includes_deployed_count(mock_state_file):
|
||||
with TestClient(app) as _client:
|
||||
_login_resp = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||
_token = _login_resp.json()["access_token"]
|
||||
|
||||
_response = _client.get("/api/v1/stats", headers={"Authorization": f"Bearer {_token}"})
|
||||
assert _response.status_code == 200
|
||||
_data = _response.json()
|
||||
assert "deployed_deckies" in _data
|
||||
assert _data["deployed_deckies"] == 2
|
||||
@@ -1,135 +0,0 @@
|
||||
import os
|
||||
from typing import Generator
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from decnet.web.api import app
|
||||
from decnet.web.dependencies import repo
|
||||
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db() -> Generator[None, None, None]:
|
||||
repo.db_path = "test_decnet.db"
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
|
||||
repo.reinitialize()
|
||||
|
||||
# Yield control to the test function
|
||||
yield
|
||||
|
||||
# Teardown
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
|
||||
|
||||
def test_login_success() -> None:
|
||||
with TestClient(app) as client:
|
||||
# The TestClient context manager triggers startup/shutdown events
|
||||
response = client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "access_token" in data
|
||||
assert data["token_type"] == "bearer"
|
||||
assert "must_change_password" in data
|
||||
assert data["must_change_password"] is True
|
||||
|
||||
|
||||
def test_login_failure() -> None:
|
||||
with TestClient(app) as client:
|
||||
response = client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "wrongpassword"}
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
response = client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "nonexistent", "password": "wrongpassword"}
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_change_password() -> None:
|
||||
with TestClient(app) as client:
|
||||
# First login to get token
|
||||
login_resp = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||
token = login_resp.json()["access_token"]
|
||||
|
||||
# Try changing password with wrong old password
|
||||
resp1 = client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"old_password": "wrong", "new_password": "new_secure_password"},
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
assert resp1.status_code == 401
|
||||
|
||||
# Change password successfully
|
||||
resp2 = client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"old_password": DECNET_ADMIN_PASSWORD, "new_password": "new_secure_password"},
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
assert resp2.status_code == 200
|
||||
|
||||
# Verify old password no longer works
|
||||
resp3 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||
assert resp3.status_code == 401
|
||||
|
||||
# Verify new password works and must_change_password is False
|
||||
resp4 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": "new_secure_password"})
|
||||
assert resp4.status_code == 200
|
||||
assert resp4.json()["must_change_password"] is False
|
||||
|
||||
|
||||
def test_get_logs_unauthorized() -> None:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/api/v1/logs")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_get_logs_success() -> None:
|
||||
with TestClient(app) as client:
|
||||
login_response = client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}
|
||||
)
|
||||
token = login_response.json()["access_token"]
|
||||
|
||||
response = client.get(
|
||||
"/api/v1/logs",
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "data" in data
|
||||
assert data["total"] >= 0
|
||||
assert isinstance(data["data"], list)
|
||||
|
||||
def test_get_stats_unauthorized() -> None:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/api/v1/stats")
|
||||
assert response.status_code == 401
|
||||
|
||||
def test_get_stats_success() -> None:
|
||||
with TestClient(app) as client:
|
||||
login_response = client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}
|
||||
)
|
||||
token = login_response.json()["access_token"]
|
||||
|
||||
response = client.get(
|
||||
"/api/v1/stats",
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "total_logs" in data
|
||||
assert "unique_attackers" in data
|
||||
assert "active_deckies" in data
|
||||
@@ -1,110 +0,0 @@
|
||||
import os
|
||||
import pytest
|
||||
import json
|
||||
from typing import Generator, Any, Optional
|
||||
from fastapi.testclient import TestClient
|
||||
from hypothesis import given, strategies as st, settings, HealthCheck
|
||||
import httpx
|
||||
|
||||
from decnet.web.api import app
|
||||
from decnet.web.dependencies import repo
|
||||
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
|
||||
|
||||
# Re-use setup from test_web_api
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def setup_db() -> Generator[None, None, None]:
|
||||
repo.db_path = "test_fuzz_decnet.db"
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
|
||||
repo.reinitialize()
|
||||
yield
|
||||
if os.path.exists(repo.db_path):
|
||||
os.remove(repo.db_path)
|
||||
|
||||
# bcrypt is intentionally slow, so we disable/extend the deadline
|
||||
_FUZZ_SETTINGS: dict[str, Any] = {
|
||||
"max_examples": 50,
|
||||
"deadline": None, # bcrypt hashing takes >200ms
|
||||
"suppress_health_check": [HealthCheck.function_scoped_fixture]
|
||||
}
|
||||
|
||||
@settings(**_FUZZ_SETTINGS)
|
||||
@given(
|
||||
username=st.text(min_size=0, max_size=2048),
|
||||
password=st.text(min_size=0, max_size=2048)
|
||||
)
|
||||
def test_fuzz_login(username: str, password: str) -> None:
|
||||
"""Fuzz the login endpoint with random strings (including non-ASCII)."""
|
||||
with TestClient(app) as _client:
|
||||
_payload: dict[str, str] = {"username": username, "password": password}
|
||||
try:
|
||||
_response: httpx.Response = _client.post("/api/v1/auth/login", json=_payload)
|
||||
# 200, 401, or 422 are acceptable. 500 is a failure.
|
||||
assert _response.status_code in (200, 401, 422)
|
||||
except (UnicodeEncodeError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
@settings(**_FUZZ_SETTINGS)
|
||||
@given(
|
||||
old_password=st.text(min_size=0, max_size=2048),
|
||||
new_password=st.text(min_size=0, max_size=2048)
|
||||
)
|
||||
def test_fuzz_change_password(old_password: str, new_password: str) -> None:
|
||||
"""Fuzz the change-password endpoint with random strings."""
|
||||
with TestClient(app) as _client:
|
||||
# Get valid token first
|
||||
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||
_token: str = _login_resp.json()["access_token"]
|
||||
|
||||
_payload: dict[str, str] = {"old_password": old_password, "new_password": new_password}
|
||||
try:
|
||||
_response: httpx.Response = _client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json=_payload,
|
||||
headers={"Authorization": f"Bearer {_token}"}
|
||||
)
|
||||
assert _response.status_code in (200, 401, 422)
|
||||
except (UnicodeEncodeError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
@settings(**_FUZZ_SETTINGS)
|
||||
@given(
|
||||
limit=st.integers(min_value=-2000, max_value=5000),
|
||||
offset=st.integers(min_value=-2000, max_value=5000),
|
||||
search=st.one_of(st.none(), st.text(max_size=2048))
|
||||
)
|
||||
def test_fuzz_get_logs(limit: int, offset: int, search: Optional[str]) -> None:
|
||||
"""Fuzz the logs pagination and search."""
|
||||
with TestClient(app) as _client:
|
||||
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||
_token: str = _login_resp.json()["access_token"]
|
||||
|
||||
_params: dict[str, Any] = {"limit": limit, "offset": offset}
|
||||
if search is not None:
|
||||
_params["search"] = search
|
||||
|
||||
_response: httpx.Response = _client.get(
|
||||
"/api/v1/logs",
|
||||
params=_params,
|
||||
headers={"Authorization": f"Bearer {_token}"}
|
||||
)
|
||||
|
||||
assert _response.status_code in (200, 422)
|
||||
|
||||
@settings(**_FUZZ_SETTINGS)
|
||||
@given(
|
||||
token=st.text(min_size=0, max_size=4096)
|
||||
)
|
||||
def test_fuzz_auth_header(token: str) -> None:
|
||||
"""Fuzz the Authorization header with full unicode noise."""
|
||||
with TestClient(app) as _client:
|
||||
try:
|
||||
_response: httpx.Response = _client.get(
|
||||
"/api/v1/stats",
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
assert _response.status_code in (401, 422)
|
||||
except (UnicodeEncodeError, httpx.InvalidURL, httpx.CookieConflict):
|
||||
# Expected client-side rejection of invalid header characters
|
||||
pass
|
||||
Reference in New Issue
Block a user