Files
DECNET/decnet/web/api.py

180 lines
5.3 KiB
Python

import uuid
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import Any, AsyncGenerator, Optional
import jwt
from fastapi import Depends, FastAPI, HTTPException, Query, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from decnet.web.auth import (
ACCESS_TOKEN_EXPIRE_MINUTES,
ALGORITHM,
SECRET_KEY,
create_access_token,
get_password_hash,
verify_password,
)
from decnet.web.sqlite_repository import SQLiteRepository
from decnet.web.ingester import log_ingestion_worker
import asyncio
repo: SQLiteRepository = SQLiteRepository()
ingestion_task: Optional[asyncio.Task[Any]] = None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
global ingestion_task
await repo.initialize()
# Create default admin if no users exist
_admin_user: Optional[dict[str, Any]] = await repo.get_user_by_username("admin")
if not _admin_user:
await repo.create_user(
{
"uuid": str(uuid.uuid4()),
"username": "admin",
"password_hash": get_password_hash("admin"),
"role": "admin",
"must_change_password": True
}
)
# Start background ingestion task
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
yield
# Shutdown ingestion task
if ingestion_task:
ingestion_task.cancel()
app: FastAPI = FastAPI(
title="DECNET Web Dashboard API",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> str:
_credentials_exception: HTTPException = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
_payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
_user_uuid: Optional[str] = _payload.get("uuid")
if _user_uuid is None:
raise _credentials_exception
return _user_uuid
except jwt.PyJWTError:
raise _credentials_exception
class Token(BaseModel):
access_token: str
token_type: str
must_change_password: bool = False
class LoginRequest(BaseModel):
username: str
password: str
class ChangePasswordRequest(BaseModel):
old_password: str
new_password: str
class LogsResponse(BaseModel):
total: int
limit: int
offset: int
data: list[dict[str, Any]]
@app.post("/api/v1/auth/login", response_model=Token)
async def login(request: LoginRequest) -> dict[str, Any]:
_user: Optional[dict[str, Any]] = await repo.get_user_by_username(request.username)
if not _user or not verify_password(request.password, _user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
_access_token_expires: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Token uses uuid instead of sub
_access_token: str = create_access_token(
data={"uuid": _user["uuid"]}, expires_delta=_access_token_expires
)
return {
"access_token": _access_token,
"token_type": "bearer",
"must_change_password": bool(_user.get("must_change_password", False))
}
@app.post("/api/v1/auth/change-password")
async def change_password(request: ChangePasswordRequest, current_user: str = Depends(get_current_user)) -> dict[str, str]:
_user: Optional[dict[str, Any]] = await repo.get_user_by_uuid(current_user)
if not _user or not verify_password(request.old_password, _user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect old password",
)
_new_hash: str = get_password_hash(request.new_password)
await repo.update_user_password(current_user, _new_hash, must_change_password=False)
return {"message": "Password updated successfully"}
@app.get("/api/v1/logs", response_model=LogsResponse)
async def get_logs(
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0),
search: Optional[str] = None,
current_user: str = Depends(get_current_user)
) -> dict[str, Any]:
_logs: list[dict[str, Any]] = await repo.get_logs(limit=limit, offset=offset, search=search)
_total: int = await repo.get_total_logs(search=search)
return {
"total": _total,
"limit": limit,
"offset": offset,
"data": _logs
}
class StatsResponse(BaseModel):
total_logs: int
unique_attackers: int
active_deckies: int
deployed_deckies: int
@app.get("/api/v1/stats", response_model=StatsResponse)
async def get_stats(current_user: str = Depends(get_current_user)) -> dict[str, Any]:
return await repo.get_stats_summary()
@app.get("/api/v1/deckies")
async def get_deckies(current_user: str = Depends(get_current_user)) -> list[dict[str, Any]]:
return await repo.get_deckies()