From 5b990743db284e93d4d0582789692143305ffae0 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 7 Apr 2026 14:54:36 -0400 Subject: [PATCH] feat: implement Auth endpoints for web dashboard --- decnet/web/api.py | 78 +++++++++++++++++++++++++++++++++++++++++++ decnet/web/auth.py | 33 ++++++++++++++++++ tests/test_web_api.py | 47 ++++++++++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 decnet/web/api.py create mode 100644 decnet/web/auth.py create mode 100644 tests/test_web_api.py diff --git a/decnet/web/api.py b/decnet/web/api.py new file mode 100644 index 0000000..13955f2 --- /dev/null +++ b/decnet/web/api.py @@ -0,0 +1,78 @@ +import uuid +from contextlib import asynccontextmanager +from datetime import timedelta +from typing import Any, AsyncGenerator + +from fastapi import FastAPI, HTTPException, status +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from decnet.web.auth import ( + ACCESS_TOKEN_EXPIRE_MINUTES, + create_access_token, + get_password_hash, + verify_password, +) +from decnet.web.sqlite_repository import SQLiteRepository + +repo: SQLiteRepository = SQLiteRepository() + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + await repo.initialize() + # Create default admin if no users exist + admin_user: dict[str, Any] | None = await repo.get_user_by_username("admin") + if not admin_user: + await repo.create_user( + { + "uuid": str(uuid.uuid4()), + "username": "admin", + "password_hash": get_password_hash("admin"), + "role": "admin", + } + ) + yield + + +app: FastAPI = FastAPI( + title="DECNET Web Dashboard API", + version="1.0.0", + lifespan=lifespan +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +class Token(BaseModel): + access_token: str + token_type: str + + +class LoginRequest(BaseModel): + username: str + password: str + + +@app.post("/api/v1/auth/login", response_model=Token) +async def login(request: LoginRequest) -> dict[str, str]: + user: dict[str, Any] | None = await repo.get_user_by_username(request.username) + if not user or not verify_password(request.password, user["password_hash"]): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + + access_token_expires: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + # Token uses uuid instead of sub + access_token: str = create_access_token( + data={"uuid": user["uuid"]}, expires_delta=access_token_expires + ) + return {"access_token": access_token, "token_type": "bearer"} diff --git a/decnet/web/auth.py b/decnet/web/auth.py new file mode 100644 index 0000000..a4737cf --- /dev/null +++ b/decnet/web/auth.py @@ -0,0 +1,33 @@ +import os +from datetime import datetime, timedelta, timezone +from typing import Optional, Any +import jwt +from passlib.context import CryptContext + +SECRET_KEY: str = os.environ.get("DECNET_SECRET_KEY", "super-secret-key-change-me") +ALGORITHM: str = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES: int = 1440 + +pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto") + + +def verify_password(plain_password: str, hashed_password: str) -> bool: + return pwd_context.verify(plain_password, hashed_password) + + +def get_password_hash(password: str) -> str: + return pwd_context.hash(password) + + +def create_access_token(data: dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: + to_encode: dict[str, Any] = data.copy() + expire: datetime + if expires_delta: + expire = datetime.now(timezone.utc) + expires_delta + else: + expire = datetime.now(timezone.utc) + timedelta(minutes=15) + + to_encode.update({"exp": expire}) + to_encode.update({"iat": datetime.now(timezone.utc)}) + encoded_jwt: str = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt diff --git a/tests/test_web_api.py b/tests/test_web_api.py new file mode 100644 index 0000000..b22e039 --- /dev/null +++ b/tests/test_web_api.py @@ -0,0 +1,47 @@ +import os +from typing import Generator +import pytest +from fastapi.testclient import TestClient +from decnet.web.api import app, repo + + +@pytest.fixture(autouse=True) +def setup_db() -> Generator[None, None, None]: + repo.db_path = "test_decnet.db" + if os.path.exists(repo.db_path): + os.remove(repo.db_path) + + # Yield control to the test function + yield + + # Teardown + if os.path.exists(repo.db_path): + os.remove(repo.db_path) + + +def test_login_success() -> None: + with TestClient(app) as client: + # The TestClient context manager triggers startup/shutdown events + response = client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "admin"} + ) + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert data["token_type"] == "bearer" + + +def test_login_failure() -> None: + with TestClient(app) as client: + response = client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "wrongpassword"} + ) + assert response.status_code == 401 + + response = client.post( + "/api/v1/auth/login", + json={"username": "nonexistent", "password": "wrongpassword"} + ) + assert response.status_code == 401