fix: harden startup security — require strong secrets, restrict CORS
- decnet/env.py: DECNET_JWT_SECRET and DECNET_ADMIN_PASSWORD are now
required env vars; startup raises ValueError if unset or set to a
known-bad default ("admin", "password", etc.)
- decnet/env.py: add DECNET_CORS_ORIGINS (comma-separated, defaults to
http://localhost:8080) replacing the previous allow_origins=["*"]
- decnet/web/api.py: use DECNET_CORS_ORIGINS and tighten allow_methods
and allow_headers to explicit lists
- tests/conftest.py: set required env vars at module level so test
collection works without real credentials
- tests/test_web_api.py, test_web_api_fuzz.py: use DECNET_ADMIN_PASSWORD
from env instead of hardcoded "admin"
Closes DEBT-001, DEBT-002, DEBT-004
This commit is contained in:
@@ -9,15 +9,38 @@ _ROOT: Path = Path(__file__).parent.parent.absolute()
|
|||||||
load_dotenv(_ROOT / ".env.local")
|
load_dotenv(_ROOT / ".env.local")
|
||||||
load_dotenv(_ROOT / ".env")
|
load_dotenv(_ROOT / ".env")
|
||||||
|
|
||||||
|
|
||||||
|
def _require_env(name: str) -> str:
|
||||||
|
"""Return the env var value or raise at startup if it is unset or a known-bad default."""
|
||||||
|
_KNOWN_BAD = {"fallback-secret-key-change-me", "admin", "secret", "password", "changeme"}
|
||||||
|
value = os.environ.get(name)
|
||||||
|
if not value:
|
||||||
|
raise ValueError(
|
||||||
|
f"Required environment variable '{name}' is not set. "
|
||||||
|
f"Set it in .env.local or export it before starting DECNET."
|
||||||
|
)
|
||||||
|
if value.lower() in _KNOWN_BAD:
|
||||||
|
raise ValueError(
|
||||||
|
f"Environment variable '{name}' is set to an insecure default ('{value}'). "
|
||||||
|
f"Choose a strong, unique value before starting DECNET."
|
||||||
|
)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
# API Options
|
# API Options
|
||||||
DECNET_API_HOST: str = os.environ.get("DECNET_API_HOST", "0.0.0.0") # nosec B104
|
DECNET_API_HOST: str = os.environ.get("DECNET_API_HOST", "0.0.0.0") # nosec B104
|
||||||
DECNET_API_PORT: int = int(os.environ.get("DECNET_API_PORT", "8000"))
|
DECNET_API_PORT: int = int(os.environ.get("DECNET_API_PORT", "8000"))
|
||||||
DECNET_JWT_SECRET: str = os.environ.get("DECNET_JWT_SECRET", "fallback-secret-key-change-me")
|
DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET")
|
||||||
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
||||||
|
|
||||||
# Web Dashboard Options
|
# Web Dashboard Options
|
||||||
DECNET_WEB_HOST: str = os.environ.get("DECNET_WEB_HOST", "0.0.0.0") # nosec B104
|
DECNET_WEB_HOST: str = os.environ.get("DECNET_WEB_HOST", "0.0.0.0") # nosec B104
|
||||||
DECNET_WEB_PORT: int = int(os.environ.get("DECNET_WEB_PORT", "8080"))
|
DECNET_WEB_PORT: int = int(os.environ.get("DECNET_WEB_PORT", "8080"))
|
||||||
DECNET_ADMIN_USER: str = os.environ.get("DECNET_ADMIN_USER", "admin")
|
DECNET_ADMIN_USER: str = os.environ.get("DECNET_ADMIN_USER", "admin")
|
||||||
DECNET_ADMIN_PASSWORD: str = os.environ.get("DECNET_ADMIN_PASSWORD", "admin")
|
DECNET_ADMIN_PASSWORD: str = _require_env("DECNET_ADMIN_PASSWORD")
|
||||||
DECNET_DEVELOPER: bool = os.environ.get("DECNET_DEVELOPER", "False").lower() == "true"
|
DECNET_DEVELOPER: bool = os.environ.get("DECNET_DEVELOPER", "False").lower() == "true"
|
||||||
|
|
||||||
|
# CORS — comma-separated list of allowed origins for the web dashboard API.
|
||||||
|
# Example: DECNET_CORS_ORIGINS=http://localhost:8080,https://dashboard.example.com
|
||||||
|
_cors_raw: str = os.environ.get("DECNET_CORS_ORIGINS", "http://localhost:8080")
|
||||||
|
DECNET_CORS_ORIGINS: list[str] = [o.strip() for o in _cors_raw.split(",") if o.strip()]
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from typing import Any, AsyncGenerator, Optional
|
|||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
from decnet.env import DECNET_DEVELOPER
|
from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER
|
||||||
from decnet.web.dependencies import repo
|
from decnet.web.dependencies import repo
|
||||||
from decnet.web.ingester import log_ingestion_worker
|
from decnet.web.ingester import log_ingestion_worker
|
||||||
from decnet.web.router import api_router
|
from decnet.web.router import api_router
|
||||||
@@ -47,10 +47,10 @@ app: FastAPI = FastAPI(
|
|||||||
|
|
||||||
app.add_middleware(
|
app.add_middleware(
|
||||||
CORSMiddleware,
|
CORSMiddleware,
|
||||||
allow_origins=["*"],
|
allow_origins=DECNET_CORS_ORIGINS,
|
||||||
allow_credentials=False,
|
allow_credentials=False,
|
||||||
allow_methods=["*"],
|
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
||||||
allow_headers=["*"],
|
allow_headers=["Authorization", "Content-Type"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Include the modular API router
|
# Include the modular API router
|
||||||
|
|||||||
11
tests/conftest.py
Normal file
11
tests/conftest.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
"""
|
||||||
|
Shared pytest configuration.
|
||||||
|
|
||||||
|
Env vars required by decnet.env must be set here, at module level, before
|
||||||
|
any test file imports decnet.* — pytest loads conftest.py first.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
|
||||||
|
os.environ.setdefault("DECNET_JWT_SECRET", "test-jwt-secret-not-for-production-use")
|
||||||
|
os.environ.setdefault("DECNET_ADMIN_PASSWORD", "test-admin-password-1234!")
|
||||||
|
os.environ.setdefault("DECNET_ADMIN_USER", "admin")
|
||||||
@@ -6,6 +6,7 @@ from fastapi.testclient import TestClient
|
|||||||
|
|
||||||
from decnet.web.api import app
|
from decnet.web.api import app
|
||||||
from decnet.web.dependencies import repo
|
from decnet.web.dependencies import repo
|
||||||
|
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
@@ -29,7 +30,7 @@ def test_login_success() -> None:
|
|||||||
# The TestClient context manager triggers startup/shutdown events
|
# The TestClient context manager triggers startup/shutdown events
|
||||||
response = client.post(
|
response = client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"}
|
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
data = response.json()
|
data = response.json()
|
||||||
@@ -57,7 +58,7 @@ def test_login_failure() -> None:
|
|||||||
def test_change_password() -> None:
|
def test_change_password() -> None:
|
||||||
with TestClient(app) as client:
|
with TestClient(app) as client:
|
||||||
# First login to get token
|
# First login to get token
|
||||||
login_resp = client.post("/api/v1/auth/login", json={"username": "admin", "password": "admin"})
|
login_resp = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||||
token = login_resp.json()["access_token"]
|
token = login_resp.json()["access_token"]
|
||||||
|
|
||||||
# Try changing password with wrong old password
|
# Try changing password with wrong old password
|
||||||
@@ -71,17 +72,17 @@ def test_change_password() -> None:
|
|||||||
# Change password successfully
|
# Change password successfully
|
||||||
resp2 = client.post(
|
resp2 = client.post(
|
||||||
"/api/v1/auth/change-password",
|
"/api/v1/auth/change-password",
|
||||||
json={"old_password": "admin", "new_password": "new_secure_password"},
|
json={"old_password": DECNET_ADMIN_PASSWORD, "new_password": "new_secure_password"},
|
||||||
headers={"Authorization": f"Bearer {token}"}
|
headers={"Authorization": f"Bearer {token}"}
|
||||||
)
|
)
|
||||||
assert resp2.status_code == 200
|
assert resp2.status_code == 200
|
||||||
|
|
||||||
# Verify old password no longer works
|
# Verify old password no longer works
|
||||||
resp3 = client.post("/api/v1/auth/login", json={"username": "admin", "password": "admin"})
|
resp3 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||||
assert resp3.status_code == 401
|
assert resp3.status_code == 401
|
||||||
|
|
||||||
# Verify new password works and must_change_password is False
|
# Verify new password works and must_change_password is False
|
||||||
resp4 = client.post("/api/v1/auth/login", json={"username": "admin", "password": "new_secure_password"})
|
resp4 = client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": "new_secure_password"})
|
||||||
assert resp4.status_code == 200
|
assert resp4.status_code == 200
|
||||||
assert resp4.json()["must_change_password"] is False
|
assert resp4.json()["must_change_password"] is False
|
||||||
|
|
||||||
@@ -96,7 +97,7 @@ def test_get_logs_success() -> None:
|
|||||||
with TestClient(app) as client:
|
with TestClient(app) as client:
|
||||||
login_response = client.post(
|
login_response = client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"}
|
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}
|
||||||
)
|
)
|
||||||
token = login_response.json()["access_token"]
|
token = login_response.json()["access_token"]
|
||||||
|
|
||||||
@@ -119,7 +120,7 @@ def test_get_stats_success() -> None:
|
|||||||
with TestClient(app) as client:
|
with TestClient(app) as client:
|
||||||
login_response = client.post(
|
login_response = client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"}
|
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}
|
||||||
)
|
)
|
||||||
token = login_response.json()["access_token"]
|
token = login_response.json()["access_token"]
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import httpx
|
|||||||
|
|
||||||
from decnet.web.api import app
|
from decnet.web.api import app
|
||||||
from decnet.web.dependencies import repo
|
from decnet.web.dependencies import repo
|
||||||
|
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
|
||||||
|
|
||||||
# Re-use setup from test_web_api
|
# Re-use setup from test_web_api
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
@@ -53,7 +54,7 @@ def test_fuzz_change_password(old_password: str, new_password: str) -> None:
|
|||||||
"""Fuzz the change-password endpoint with random strings."""
|
"""Fuzz the change-password endpoint with random strings."""
|
||||||
with TestClient(app) as _client:
|
with TestClient(app) as _client:
|
||||||
# Get valid token first
|
# Get valid token first
|
||||||
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": "admin", "password": "admin"})
|
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||||
_token: str = _login_resp.json()["access_token"]
|
_token: str = _login_resp.json()["access_token"]
|
||||||
|
|
||||||
_payload: dict[str, str] = {"old_password": old_password, "new_password": new_password}
|
_payload: dict[str, str] = {"old_password": old_password, "new_password": new_password}
|
||||||
@@ -76,7 +77,7 @@ def test_fuzz_change_password(old_password: str, new_password: str) -> None:
|
|||||||
def test_fuzz_get_logs(limit: int, offset: int, search: Optional[str]) -> None:
|
def test_fuzz_get_logs(limit: int, offset: int, search: Optional[str]) -> None:
|
||||||
"""Fuzz the logs pagination and search."""
|
"""Fuzz the logs pagination and search."""
|
||||||
with TestClient(app) as _client:
|
with TestClient(app) as _client:
|
||||||
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": "admin", "password": "admin"})
|
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
|
||||||
_token: str = _login_resp.json()["access_token"]
|
_token: str = _login_resp.json()["access_token"]
|
||||||
|
|
||||||
_params: dict[str, Any] = {"limit": limit, "offset": offset}
|
_params: dict[str, Any] = {"limit": limit, "offset": offset}
|
||||||
|
|||||||
Reference in New Issue
Block a user