Files
DECNET/decnet/web/api.py

58 lines
1.5 KiB
Python

import asyncio
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from decnet.env import DECNET_DEVELOPER
from decnet.web.dependencies import repo
from decnet.web.ingester import log_ingestion_worker
from decnet.web.router import api_router
ingestion_task: Optional[asyncio.Task[Any]] = None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
global ingestion_task
# Retry initialization a few times if DB is locked (common in tests)
for _ in range(5):
try:
await repo.initialize()
break
except Exception:
await asyncio.sleep(0.5)
# Start background ingestion task
if ingestion_task is None or ingestion_task.done():
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
yield
# Shutdown ingestion task
if ingestion_task:
ingestion_task.cancel()
app: FastAPI = FastAPI(
title="DECNET Web Dashboard API",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs" if DECNET_DEVELOPER else None,
redoc_url="/redoc" if DECNET_DEVELOPER else None,
openapi_url="/openapi.json" if DECNET_DEVELOPER else None
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# Include the modular API router
app.include_router(api_router, prefix="/api/v1")