feat: add /health endpoint for microservice monitoring

Checks database, background workers (ingestion, collector, attacker,
sniffer), and Docker daemon. Reports healthy/degraded/unhealthy status
with per-component details. Returns 503 when required services fail,
200 for healthy or degraded (only optional services down).
This commit is contained in:
2026-04-14 16:56:20 -04:00
parent 3eab6e8773
commit a2ba7a7f3c
7 changed files with 305 additions and 1 deletions

View File

@@ -24,6 +24,16 @@ attacker_task: Optional[asyncio.Task[Any]] = None
sniffer_task: Optional[asyncio.Task[Any]] = None
def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]:
"""Expose background task handles for the health endpoint."""
return {
"ingestion_worker": ingestion_task,
"collector_worker": collector_task,
"attacker_worker": attacker_task,
"sniffer_worker": sniffer_task,
}
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
global ingestion_task, collector_task, attacker_task, sniffer_task

View File

@@ -1,5 +1,5 @@
from datetime import datetime, timezone
from typing import Optional, Any, List, Annotated
from typing import Literal, Optional, Any, List, Annotated
from sqlmodel import SQLModel, Field
from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator
from decnet.models import IniContent
@@ -121,3 +121,47 @@ class DeployIniRequest(BaseModel):
# This field now enforces strict INI structure during Pydantic initialization.
# The OpenAPI schema correctly shows it as a required string.
ini_content: IniContent = PydanticField(..., description="A valid INI formatted string")
# --- Configuration Models ---
class CreateUserRequest(BaseModel):
username: str = PydanticField(..., min_length=1, max_length=64)
password: str = PydanticField(..., min_length=8, max_length=72)
role: Literal["admin", "viewer"] = "viewer"
class UpdateUserRoleRequest(BaseModel):
role: Literal["admin", "viewer"]
class ResetUserPasswordRequest(BaseModel):
new_password: str = PydanticField(..., min_length=8, max_length=72)
class DeploymentLimitRequest(BaseModel):
deployment_limit: int = PydanticField(..., ge=1, le=500)
class GlobalMutationIntervalRequest(BaseModel):
global_mutation_interval: str = PydanticField(..., pattern=r"^[1-9]\d*[mdMyY]$")
class UserResponse(BaseModel):
uuid: str
username: str
role: str
must_change_password: bool
class ConfigResponse(BaseModel):
role: str
deployment_limit: int
global_mutation_interval: str
class AdminConfigResponse(ConfigResponse):
users: List[UserResponse]
class ComponentHealth(BaseModel):
status: Literal["ok", "failing"]
detail: Optional[str] = None
class HealthResponse(BaseModel):
status: Literal["healthy", "degraded", "unhealthy"]
components: dict[str, ComponentHealth]

View File

@@ -14,6 +14,11 @@ from .stream.api_stream_events import router as stream_router
from .attackers.api_get_attackers import router as attackers_router
from .attackers.api_get_attacker_detail import router as attacker_detail_router
from .attackers.api_get_attacker_commands import router as attacker_commands_router
from .config.api_get_config import router as config_get_router
from .config.api_update_config import router as config_update_router
from .config.api_manage_users import router as config_users_router
from .config.api_reinit import router as config_reinit_router
from .health.api_get_health import router as health_router
api_router = APIRouter()
@@ -42,3 +47,10 @@ api_router.include_router(attacker_commands_router)
# Observability
api_router.include_router(stats_router)
api_router.include_router(stream_router)
api_router.include_router(health_router)
# Configuration
api_router.include_router(config_get_router)
api_router.include_router(config_update_router)
api_router.include_router(config_users_router)
api_router.include_router(config_reinit_router)

View File

View File

@@ -0,0 +1,80 @@
from typing import Any
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from decnet.web.dependencies import require_viewer, repo
from decnet.web.db.models import HealthResponse, ComponentHealth
router = APIRouter()
_OPTIONAL_SERVICES = {"sniffer_worker"}
@router.get(
"/health",
response_model=HealthResponse,
tags=["Observability"],
responses={
401: {"description": "Could not validate credentials"},
503: {"model": HealthResponse, "description": "System unhealthy"},
},
)
async def get_health(user: dict = Depends(require_viewer)) -> Any:
components: dict[str, ComponentHealth] = {}
# 1. Database
try:
await repo.get_total_logs()
components["database"] = ComponentHealth(status="ok")
except Exception as exc:
components["database"] = ComponentHealth(status="failing", detail=str(exc))
# 2. Background workers
from decnet.web.api import get_background_tasks
for name, task in get_background_tasks().items():
if task is None:
components[name] = ComponentHealth(status="failing", detail="not started")
elif task.done():
if task.cancelled():
detail = "cancelled"
else:
exc = task.exception()
detail = f"exited: {exc}" if exc else "exited unexpectedly"
components[name] = ComponentHealth(status="failing", detail=detail)
else:
components[name] = ComponentHealth(status="ok")
# 3. Docker daemon
try:
import docker
client = docker.from_env()
client.ping()
client.close()
components["docker"] = ComponentHealth(status="ok")
except Exception as exc:
components["docker"] = ComponentHealth(status="failing", detail=str(exc))
# Compute overall status
required_failing = any(
c.status == "failing"
for name, c in components.items()
if name not in _OPTIONAL_SERVICES
)
optional_failing = any(
c.status == "failing"
for name, c in components.items()
if name in _OPTIONAL_SERVICES
)
if required_failing:
overall = "unhealthy"
elif optional_failing:
overall = "degraded"
else:
overall = "healthy"
result = HealthResponse(status=overall, components=components)
status_code = 503 if overall == "unhealthy" else 200
return JSONResponse(content=result.model_dump(), status_code=status_code)

View File

View File

@@ -0,0 +1,158 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
@pytest.mark.anyio
async def test_health_requires_auth(client: httpx.AsyncClient) -> None:
resp = await client.get("/api/v1/health")
assert resp.status_code == 401
@pytest.mark.anyio
async def test_health_response_schema(client: httpx.AsyncClient, auth_token: str) -> None:
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker:
# All workers running
for name in ("ingestion_worker", "collector_worker", "attacker_worker", "sniffer_worker"):
task = MagicMock(spec=asyncio.Task)
task.done.return_value = False
mock_tasks.return_value = {name: task for name in
("ingestion_worker", "collector_worker", "attacker_worker", "sniffer_worker")}
mock_client = MagicMock()
mock_docker.return_value = mock_client
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
data = resp.json()
assert "status" in data
assert data["status"] in ("healthy", "degraded", "unhealthy")
assert "components" in data
expected_components = {"database", "ingestion_worker", "collector_worker",
"attacker_worker", "sniffer_worker", "docker"}
assert set(data["components"].keys()) == expected_components
for comp in data["components"].values():
assert comp["status"] in ("ok", "failing")
@pytest.mark.anyio
async def test_health_database_ok(client: httpx.AsyncClient, auth_token: str) -> None:
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker:
_make_all_running(mock_tasks)
mock_docker.return_value = MagicMock()
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
assert resp.json()["components"]["database"]["status"] == "ok"
@pytest.mark.anyio
async def test_health_all_healthy(client: httpx.AsyncClient, auth_token: str) -> None:
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker:
_make_all_running(mock_tasks)
mock_docker.return_value = MagicMock()
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
assert resp.status_code == 200
assert resp.json()["status"] == "healthy"
@pytest.mark.anyio
async def test_health_degraded_sniffer_only(client: httpx.AsyncClient, auth_token: str) -> None:
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker:
tasks = _make_running_tasks()
tasks["sniffer_worker"] = None # sniffer not started
mock_tasks.return_value = tasks
mock_docker.return_value = MagicMock()
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
assert resp.status_code == 200
assert resp.json()["status"] == "degraded"
assert resp.json()["components"]["sniffer_worker"]["status"] == "failing"
@pytest.mark.anyio
async def test_health_unhealthy_returns_503(client: httpx.AsyncClient, auth_token: str) -> None:
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker:
tasks = _make_running_tasks()
tasks["ingestion_worker"] = None # required worker down
mock_tasks.return_value = tasks
mock_docker.return_value = MagicMock()
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
assert resp.status_code == 503
assert resp.json()["status"] == "unhealthy"
@pytest.mark.anyio
async def test_health_docker_failing(client: httpx.AsyncClient, auth_token: str) -> None:
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env", side_effect=Exception("connection refused")):
_make_all_running(mock_tasks)
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
comp = resp.json()["components"]["docker"]
assert comp["status"] == "failing"
assert "connection refused" in comp["detail"]
@pytest.mark.anyio
async def test_health_database_failing(client: httpx.AsyncClient, auth_token: str) -> None:
from decnet.web.dependencies import repo as real_repo
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker, \
patch.object(real_repo, "get_total_logs", new=AsyncMock(side_effect=Exception("disk full"))):
_make_all_running(mock_tasks)
mock_docker.return_value = MagicMock()
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
comp = resp.json()["components"]["database"]
assert comp["status"] == "failing"
assert "disk full" in comp["detail"]
@pytest.mark.anyio
async def test_health_worker_exited_with_exception(client: httpx.AsyncClient, auth_token: str) -> None:
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker:
tasks = _make_running_tasks()
dead_task = MagicMock(spec=asyncio.Task)
dead_task.done.return_value = True
dead_task.cancelled.return_value = False
dead_task.exception.return_value = RuntimeError("segfault")
tasks["collector_worker"] = dead_task
mock_tasks.return_value = tasks
mock_docker.return_value = MagicMock()
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
comp = resp.json()["components"]["collector_worker"]
assert comp["status"] == "failing"
assert "segfault" in comp["detail"]
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_running_tasks() -> dict[str, MagicMock]:
tasks = {}
for name in ("ingestion_worker", "collector_worker", "attacker_worker", "sniffer_worker"):
t = MagicMock(spec=asyncio.Task)
t.done.return_value = False
tasks[name] = t
return tasks
def _make_all_running(mock_tasks: MagicMock) -> None:
mock_tasks.return_value = _make_running_tasks()