import asyncio import os from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Optional from fastapi import FastAPI, Request, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from pydantic import ValidationError from fastapi.middleware.cors import CORSMiddleware from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE from decnet.logging import get_logger from decnet.web.dependencies import repo from decnet.collector import log_collector_worker from decnet.web.ingester import log_ingestion_worker from decnet.profiler import attacker_profile_worker from decnet.web.router import api_router log = get_logger("api") ingestion_task: Optional[asyncio.Task[Any]] = None collector_task: Optional[asyncio.Task[Any]] = None attacker_task: Optional[asyncio.Task[Any]] = None sniffer_task: Optional[asyncio.Task[Any]] = None def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]: """Expose background task handles for the health endpoint.""" return { "ingestion_worker": ingestion_task, "collector_worker": collector_task, "attacker_worker": attacker_task, "sniffer_worker": sniffer_task, } @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: global ingestion_task, collector_task, attacker_task, sniffer_task log.info("API startup initialising database") for attempt in range(1, 6): try: await repo.initialize() log.debug("API startup DB initialised attempt=%d", attempt) break except Exception as exc: log.warning("DB init attempt %d/5 failed: %s", attempt, exc) if attempt == 5: log.error("DB failed to initialize after 5 attempts — startup may be degraded") await asyncio.sleep(0.5) # Start background tasks only if not in contract test mode if os.environ.get("DECNET_CONTRACT_TEST") != "true": # Start background ingestion task if ingestion_task is None or ingestion_task.done(): ingestion_task = asyncio.create_task(log_ingestion_worker(repo)) log.debug("API startup ingest worker started") # Start Docker log collector (writes to log file; ingester reads from it) _log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE) if _log_file and (collector_task is None or collector_task.done()): collector_task = asyncio.create_task(log_collector_worker(_log_file)) log.debug("API startup collector worker started log_file=%s", _log_file) elif not _log_file: log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.") # Start attacker profile rebuild worker if attacker_task is None or attacker_task.done(): attacker_task = asyncio.create_task(attacker_profile_worker(repo)) log.debug("API startup attacker profile worker started") # Start fleet-wide MACVLAN sniffer (fault-isolated — never crashes the API) try: from decnet.sniffer import sniffer_worker if sniffer_task is None or sniffer_task.done(): sniffer_task = asyncio.create_task(sniffer_worker(_log_file)) log.debug("API startup sniffer worker started") except Exception as exc: log.warning("Sniffer worker failed to start — API continues without sniffing: %s", exc) else: log.info("Contract Test Mode: skipping background worker startup") yield log.info("API shutdown cancelling background tasks") for task in (ingestion_task, collector_task, attacker_task, sniffer_task): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass except Exception as exc: log.warning("Task shutdown error: %s", exc) log.info("API shutdown complete") app: FastAPI = FastAPI( title="DECNET Web Dashboard API", version="1.0.0", lifespan=lifespan, docs_url="/docs" if DECNET_DEVELOPER else None, redoc_url="/redoc" if DECNET_DEVELOPER else None, openapi_url="/openapi.json" if DECNET_DEVELOPER else None ) app.add_middleware( CORSMiddleware, allow_origins=DECNET_CORS_ORIGINS, allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Authorization", "Content-Type", "Last-Event-ID"], ) # Include the modular API router app.include_router(api_router, prefix="/api/v1") @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse: """ Handle validation errors with targeted status codes to satisfy contract tests. Tiered Prioritization: 1. 400 Bad Request: For structural schema violations (extra fields, wrong types, missing fields). This satisfies Schemathesis 'Negative Data' checks. 2. 409 Conflict: For semantic/structural INI content violations in valid strings. This satisfies Schemathesis 'Positive Data' checks. 3. 422 Unprocessable: Default for other validation edge cases. """ errors = exc.errors() # 1. Prioritize Structural Format Violations (Negative Data) # This catches: sending an object instead of a string, extra unknown properties, or empty-string length violations. is_structural_violation = any( err.get("type") in ("type_error", "extra_forbidden", "missing", "string_too_short", "string_type") or "must be a string" in err.get("msg", "") # Catch our validator's type check for err in errors ) if is_structural_violation: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"detail": "Bad Request: Schema structural violation (wrong type, extra fields, or invalid length)."}, ) # 2. Targeted INI Error Rejections # We distinguishes between different failure modes for precise contract compliance. # Empty INI content (Valid string but semantically empty) is_ini_empty = any("INI content is empty" in err.get("msg", "") for err in errors) if is_ini_empty: return JSONResponse( status_code=status.HTTP_409_CONFLICT, content={"detail": "Configuration conflict: INI content is empty."}, ) # Invalid characters/syntax (Valid-length string but invalid INI syntax) # Mapping to 409 for Positive Data compliance. is_invalid_characters = any("Invalid INI format" in err.get("msg", "") for err in errors) if is_invalid_characters: return JSONResponse( status_code=status.HTTP_409_CONFLICT, content={"detail": "Configuration conflict: INI syntax or characters are invalid."}, ) # Logical invalidity (Valid string, valid syntax, but missing required DECNET logic like sections) is_ini_invalid_logic = any("at least one section" in err.get("msg", "") for err in errors) if is_ini_invalid_logic: return JSONResponse( status_code=status.HTTP_409_CONFLICT, content={"detail": "Invalid INI config structure: No decky sections found."}, ) # Developer Mode fallback if DECNET_DEVELOPER: from fastapi.exception_handlers import request_validation_exception_handler return await request_validation_exception_handler(request, exc) # Production/Strict mode fallback: Sanitize remaining 422s message = "Invalid request parameters" if "/deckies/deploy" in request.url.path: message = "Invalid INI config" return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={"detail": message}, ) @app.exception_handler(ValidationError) async def pydantic_validation_exception_handler(request: Request, exc: ValidationError) -> JSONResponse: """ Handle Pydantic errors that occur during manual model instantiation (e.g. state hydration). Prevents 500 errors when the database contains inconsistent or outdated schema data. """ log.error("Internal Pydantic validation error: %s", exc) return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={ "detail": "Internal data consistency error", "type": "internal_validation_error" }, )