Files
DECNET/decnet/web/api.py

378 lines
13 KiB
Python

import uuid
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import Any, AsyncGenerator, Optional
import jwt
from fastapi import Depends, FastAPI, HTTPException, Query, status, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, Field
from decnet.web.auth import (
ACCESS_TOKEN_EXPIRE_MINUTES,
ALGORITHM,
SECRET_KEY,
create_access_token,
get_password_hash,
verify_password,
)
from decnet.web.sqlite_repository import SQLiteRepository
from decnet.web.ingester import log_ingestion_worker
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
import asyncio
repo: SQLiteRepository = SQLiteRepository()
ingestion_task: Optional[asyncio.Task[Any]] = None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
global ingestion_task
# Retry initialization a few times if DB is locked (common in tests)
for _ in range(5):
try:
await repo.initialize()
break
except Exception:
await asyncio.sleep(0.5)
# Create default admin if no users exist
try:
_admin_user: Optional[dict[str, Any]] = await repo.get_user_by_username(DECNET_ADMIN_USER)
if not _admin_user:
await repo.create_user(
{
"uuid": str(uuid.uuid4()),
"username": DECNET_ADMIN_USER,
"password_hash": get_password_hash(DECNET_ADMIN_PASSWORD),
"role": "admin",
"must_change_password": True # nosec B105
}
)
except Exception: # nosec B110
pass
# Start background ingestion task
if ingestion_task is None or ingestion_task.done():
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
yield
# Shutdown ingestion task
if ingestion_task:
ingestion_task.cancel()
app: FastAPI = FastAPI(
title="DECNET Web Dashboard API",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(request: Request) -> str:
_credentials_exception: HTTPException = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Extract token from header or query param
token: str | None = None
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
elif request.query_params.get("token"):
token = request.query_params.get("token")
if not token:
raise _credentials_exception
try:
_payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
_user_uuid: Optional[str] = _payload.get("uuid")
if _user_uuid is None:
raise _credentials_exception
return _user_uuid
except jwt.PyJWTError:
raise _credentials_exception
class Token(BaseModel):
access_token: str
token_type: str
must_change_password: bool = False
class LoginRequest(BaseModel):
username: str
password: str = Field(..., max_length=72)
class ChangePasswordRequest(BaseModel):
old_password: str = Field(..., max_length=72)
new_password: str = Field(..., max_length=72)
class LogsResponse(BaseModel):
total: int
limit: int
offset: int
data: list[dict[str, Any]]
@app.post("/api/v1/auth/login", response_model=Token)
async def login(request: LoginRequest) -> dict[str, Any]:
_user: Optional[dict[str, Any]] = await repo.get_user_by_username(request.username)
if not _user or not verify_password(request.password, _user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
_access_token_expires: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Token uses uuid instead of sub
_access_token: str = create_access_token(
data={"uuid": _user["uuid"]}, expires_delta=_access_token_expires
)
return {
"access_token": _access_token,
"token_type": "bearer", # nosec B105
"must_change_password": bool(_user.get("must_change_password", False))
}
@app.post("/api/v1/auth/change-password")
async def change_password(request: ChangePasswordRequest, current_user: str = Depends(get_current_user)) -> dict[str, str]:
_user: Optional[dict[str, Any]] = await repo.get_user_by_uuid(current_user)
if not _user or not verify_password(request.old_password, _user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect old password",
)
_new_hash: str = get_password_hash(request.new_password)
await repo.update_user_password(current_user, _new_hash, must_change_password=False)
return {"message": "Password updated successfully"}
@app.get("/api/v1/logs", response_model=LogsResponse)
async def get_logs(
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0),
search: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
current_user: str = Depends(get_current_user)
) -> dict[str, Any]:
_logs: list[dict[str, Any]] = await repo.get_logs(limit=limit, offset=offset, search=search, start_time=start_time, end_time=end_time)
_total: int = await repo.get_total_logs(search=search, start_time=start_time, end_time=end_time)
return {
"total": _total,
"limit": limit,
"offset": offset,
"data": _logs
}
@app.get("/api/v1/logs/histogram")
async def get_logs_histogram(
search: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
interval_minutes: int = Query(15, ge=1),
current_user: str = Depends(get_current_user)
) -> list[dict[str, Any]]:
return await repo.get_log_histogram(search=search, start_time=start_time, end_time=end_time, interval_minutes=interval_minutes)
class StatsResponse(BaseModel):
total_logs: int
unique_attackers: int
active_deckies: int
deployed_deckies: int
@app.get("/api/v1/stats", response_model=StatsResponse)
async def get_stats(current_user: str = Depends(get_current_user)) -> dict[str, Any]:
return await repo.get_stats_summary()
@app.get("/api/v1/deckies")
async def get_deckies(current_user: str = Depends(get_current_user)) -> list[dict[str, Any]]:
return await repo.get_deckies()
class MutateIntervalRequest(BaseModel):
mutate_interval: int | None
@app.post("/api/v1/deckies/{decky_name}/mutate")
async def api_mutate_decky(decky_name: str, current_user: str = Depends(get_current_user)) -> dict[str, str]:
from decnet.mutator import mutate_decky
success = mutate_decky(decky_name)
if success:
return {"message": f"Successfully mutated {decky_name}"}
raise HTTPException(status_code=404, detail=f"Decky {decky_name} not found or failed to mutate")
@app.put("/api/v1/deckies/{decky_name}/mutate-interval")
async def api_update_mutate_interval(decky_name: str, req: MutateIntervalRequest, current_user: str = Depends(get_current_user)) -> dict[str, str]:
from decnet.config import load_state, save_state
state = load_state()
if not state:
raise HTTPException(status_code=500, detail="No active deployment")
config, compose_path = state
decky = next((d for d in config.deckies if d.name == decky_name), None)
if not decky:
raise HTTPException(status_code=404, detail="Decky not found")
decky.mutate_interval = req.mutate_interval
save_state(config, compose_path)
return {"message": "Mutation interval updated"}
@app.get("/api/v1/stream")
async def stream_events(
request: Request,
last_event_id: int = Query(0, alias="lastEventId"),
search: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
current_user: str = Depends(get_current_user)
) -> StreamingResponse:
import json
import asyncio
async def event_generator() -> AsyncGenerator[str, None]:
# Start tracking from the provided ID, or current max if 0
last_id = last_event_id
if last_id == 0:
last_id = await repo.get_max_log_id()
stats_interval_sec = 10
loops_since_stats = 0
while True:
if await request.is_disconnected():
break
# Poll for new logs
new_logs = await repo.get_logs_after_id(last_id, limit=50, search=search, start_time=start_time, end_time=end_time)
if new_logs:
# Update last_id to the max id in the fetched batch
last_id = max(log["id"] for log in new_logs)
payload = json.dumps({"type": "logs", "data": new_logs})
yield f"event: message\ndata: {payload}\n\n"
# If we have new logs, stats probably changed, so force a stats update
loops_since_stats = stats_interval_sec
# Periodically poll for stats
if loops_since_stats >= stats_interval_sec:
stats = await repo.get_stats_summary()
payload = json.dumps({"type": "stats", "data": stats})
yield f"event: message\ndata: {payload}\n\n"
# Also yield histogram
histogram = await repo.get_log_histogram(search=search, start_time=start_time, end_time=end_time, interval_minutes=15)
hist_payload = json.dumps({"type": "histogram", "data": histogram})
yield f"event: message\ndata: {hist_payload}\n\n"
loops_since_stats = 0
loops_since_stats += 1
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
class DeployIniRequest(BaseModel):
ini_content: str = Field(..., min_length=5, max_length=512 * 1024)
@app.post("/api/v1/deckies/deploy")
async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends(get_current_user)) -> dict[str, str]:
from decnet.ini_loader import load_ini_from_string
from decnet.cli import _build_deckies_from_ini
from decnet.config import load_state, DecnetConfig, DEFAULT_MUTATE_INTERVAL
from decnet.network import detect_interface, detect_subnet, get_host_ip
from decnet.deployer import deploy as _deploy
import logging
import os
try:
ini = load_ini_from_string(req.ini_content)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse INI: {e}")
state = load_state()
ingest_log_file = os.environ.get("DECNET_INGEST_LOG_FILE")
if state:
config, _ = state
subnet_cidr = ini.subnet or config.subnet
gateway = ini.gateway or config.gateway
host_ip = get_host_ip(config.interface)
randomize_services = False
# Always sync config log_file with current API ingestion target
if ingest_log_file:
config.log_file = ingest_log_file
else:
# If no state exists, we need to infer network details
iface = ini.interface or detect_interface()
subnet_cidr, gateway = ini.subnet, ini.gateway
if not subnet_cidr or not gateway:
detected_subnet, detected_gateway = detect_subnet(iface)
subnet_cidr = subnet_cidr or detected_subnet
gateway = gateway or detected_gateway
host_ip = get_host_ip(iface)
randomize_services = False
config = DecnetConfig(
mode="unihost",
interface=iface,
subnet=subnet_cidr,
gateway=gateway,
deckies=[],
log_target=ini.log_target,
log_file=ingest_log_file,
ipvlan=False,
mutate_interval=ini.mutate_interval or DEFAULT_MUTATE_INTERVAL
)
try:
new_decky_configs = _build_deckies_from_ini(
ini, subnet_cidr, gateway, host_ip, randomize_services, cli_mutate_interval=None
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Merge deckies
existing_deckies_map = {d.name: d for d in config.deckies}
for new_decky in new_decky_configs:
existing_deckies_map[new_decky.name] = new_decky
config.deckies = list(existing_deckies_map.values())
# We call deploy(config) which regenerates docker-compose and runs `up -d --remove-orphans`.
try:
_deploy(config)
except Exception as e:
logging.getLogger("decnet.web.api").error(f"Deployment failed: {e}")
raise HTTPException(status_code=500, detail=f"Deployment failed: {e}")
return {"message": "Deckies deployed successfully"}