fix/merge-testing-to-main #4

Merged
anti merged 138 commits from fix/merge-testing-to-main into main 2026-04-12 10:10:19 +02:00
3 changed files with 158 additions and 0 deletions
Showing only changes of commit 5b990743db - Show all commits

78
decnet/web/api.py Normal file
View File

@@ -0,0 +1,78 @@
import uuid
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import Any, AsyncGenerator
from fastapi import FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from decnet.web.auth import (
ACCESS_TOKEN_EXPIRE_MINUTES,
create_access_token,
get_password_hash,
verify_password,
)
from decnet.web.sqlite_repository import SQLiteRepository
repo: SQLiteRepository = SQLiteRepository()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
await repo.initialize()
# Create default admin if no users exist
admin_user: dict[str, Any] | None = await repo.get_user_by_username("admin")
if not admin_user:
await repo.create_user(
{
"uuid": str(uuid.uuid4()),
"username": "admin",
"password_hash": get_password_hash("admin"),
"role": "admin",
}
)
yield
app: FastAPI = FastAPI(
title="DECNET Web Dashboard API",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Token(BaseModel):
access_token: str
token_type: str
class LoginRequest(BaseModel):
username: str
password: str
@app.post("/api/v1/auth/login", response_model=Token)
async def login(request: LoginRequest) -> dict[str, str]:
user: dict[str, Any] | None = await repo.get_user_by_username(request.username)
if not user or not verify_password(request.password, user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Token uses uuid instead of sub
access_token: str = create_access_token(
data={"uuid": user["uuid"]}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

33
decnet/web/auth.py Normal file
View File

@@ -0,0 +1,33 @@
import os
from datetime import datetime, timedelta, timezone
from typing import Optional, Any
import jwt
from passlib.context import CryptContext
SECRET_KEY: str = os.environ.get("DECNET_SECRET_KEY", "super-secret-key-change-me")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 1440
pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
to_encode: dict[str, Any] = data.copy()
expire: datetime
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
to_encode.update({"iat": datetime.now(timezone.utc)})
encoded_jwt: str = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

47
tests/test_web_api.py Normal file
View File

@@ -0,0 +1,47 @@
import os
from typing import Generator
import pytest
from fastapi.testclient import TestClient
from decnet.web.api import app, repo
@pytest.fixture(autouse=True)
def setup_db() -> Generator[None, None, None]:
repo.db_path = "test_decnet.db"
if os.path.exists(repo.db_path):
os.remove(repo.db_path)
# Yield control to the test function
yield
# Teardown
if os.path.exists(repo.db_path):
os.remove(repo.db_path)
def test_login_success() -> None:
with TestClient(app) as client:
# The TestClient context manager triggers startup/shutdown events
response = client.post(
"/api/v1/auth/login",
json={"username": "admin", "password": "admin"}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
def test_login_failure() -> None:
with TestClient(app) as client:
response = client.post(
"/api/v1/auth/login",
json={"username": "admin", "password": "wrongpassword"}
)
assert response.status_code == 401
response = client.post(
"/api/v1/auth/login",
json={"username": "nonexistent", "password": "wrongpassword"}
)
assert response.status_code == 401