From 6ac8cac908b92fd20ac3da6d6688be761a80a800 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 28 Apr 2026 22:51:42 -0400 Subject: [PATCH] feat(deckies): live service add/remove without full redeploy decnet.engine.services_live exposes add_service / remove_service for both fleet and topology decky scopes. The host's _compose() wrapper already supported per-service targeting (up --no-deps -d , stop, rm -f); what was missing was the orchestration around it: * add: validate against decnet.services.registry (rejects unknown + fleet_singleton); persist the new services list; re-render the per-scope compose file (so future redeploys reflect the change); run docker compose up -d --no-deps --build -. * remove: stop + rm -f the service container; persist; re-render compose so a future up -d doesn't bring it back. Both publish decky..service.added / .removed on the bus, with the post-mutation services list. Topic constants added to decnet.bus.topics; the matching wiki entry in wiki-checkout/Service-Bus.md ships in a separate commit on the wiki repo (wiki-checkout/ is gitignored). Four new admin endpoints: * POST/DELETE /api/v1/deckies/{name}/services{,/svc} * POST/DELETE /api/v1/topologies/{id}/deckies/{name}/services{,/svc} ServiceMutationError messages are mapped at the API boundary to 404 (decky/topology missing), 409 (idempotency violation), 422 (unknown or fleet_singleton service). --- decnet/bus/topics.py | 7 + decnet/engine/services_live.py | 398 ++++++++++++++++++++++ decnet/web/db/models/__init__.py | 4 + decnet/web/db/models/decky.py | 21 ++ decnet/web/router/deckies/__init__.py | 10 + decnet/web/router/deckies/api_services.py | 165 +++++++++ tests/api/deckies/test_services_api.py | 161 +++++++++ tests/engine/__init__.py | 0 tests/engine/test_services_live.py | 199 +++++++++++ 9 files changed, 965 insertions(+) create mode 100644 decnet/engine/services_live.py create mode 100644 decnet/web/router/deckies/api_services.py create mode 100644 tests/api/deckies/test_services_api.py create mode 100644 tests/engine/__init__.py create mode 100644 tests/engine/test_services_live.py diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 3c89d7e4..d06492eb 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -83,6 +83,13 @@ DECKY_MUTATE_REQUEST = "mutate_request" # syslog sidechannel too) to interleave substrate-change markers into # attacker traversals. DECKY_MUTATION = "mutation" +# Per-service add/remove on a deployed decky (live; no full redeploy). +# Payload carries ``decky_name``, ``service_name``, optional +# ``topology_id``, and ``services`` (the post-mutation list). Consumers +# that watch substrate shape (correlator, dashboard, profiler) reconcile +# off these without waiting for the next decnet-state.json snapshot. +DECKY_SERVICE_ADDED = "service.added" +DECKY_SERVICE_REMOVED = "service.removed" # Attacker event types (second token under the ``attacker`` root). First # sighting, session boundary transitions, and score-threshold crossings diff --git a/decnet/engine/services_live.py b/decnet/engine/services_live.py new file mode 100644 index 00000000..c46f0ec5 --- /dev/null +++ b/decnet/engine/services_live.py @@ -0,0 +1,398 @@ +"""Add/remove a single service on a deployed decky without full redeploy. + +The ``_compose()`` wrapper in :mod:`decnet.engine.deployer` already +supports per-service targeting (``up --no-deps -d ``, +``stop ``, ``rm -f ``). What was missing was the +orchestration: regenerate the compose file (so future redeploys reflect +the change), persist the new ``services`` list, and run the targeted +compose command. + +Two scopes: + +* **Topology** — source of truth is the ``topology_deckies`` table; the + compose file is per-topology (``decnet-topology--compose.yml``). +* **Fleet** — source of truth is ``decnet-state.json`` (with the + ``fleet_deckies`` table mirroring it); compose is the unihost + ``decnet-compose.yml``. + +Both publish ``decky..service.added`` / +``decky..service.removed`` on the bus. The new topic constants +are documented in ``wiki-checkout/Service-Bus.md``. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any, Literal, Optional + +import anyio + +from decnet.bus import topics +from decnet.logging import get_logger +from decnet.services.base import BaseService +from decnet.services.registry import get_service +from decnet.topology.persistence import hydrate +from decnet.web.db.repository import BaseRepository + +# Heavy imports (composer/deployer pull in decnet.network → docker) are +# deferred to call-sites via the ``_compose`` / ``_topology_compose_path`` +# / ``_load_state`` indirection helpers below. Mirrors the lazy-import +# pattern in decnet.canary.planter for the same reason. + + +def _compose(*args: str, compose_file: Optional[Path] = None, env=None) -> None: + """Indirection so tests can ``monkeypatch.setattr(services_live, '_compose', ...)``. + + Real implementation lives in :mod:`decnet.engine.deployer`; we + import-and-delegate at call time to keep this module's import graph + clean (see module docstring above). + """ + from decnet.engine.deployer import _compose as _real_compose + if compose_file is None: + _real_compose(*args, env=env) + else: + _real_compose(*args, compose_file=compose_file, env=env) + + +def _topology_compose_path(topology_id: str) -> Path: + from decnet.engine.deployer import _topology_compose_path as _real_path + return _real_path(topology_id) + + +def _write_topology_compose(hydrated, path: Path) -> Path: + from decnet.topology.compose import write_topology_compose + return write_topology_compose(hydrated, path) + + +def _load_state(): + from decnet.config import load_state as _real_load_state + return _real_load_state() + + +def _save_state(config, compose_path) -> None: + from decnet.config import save_state as _real_save_state + _real_save_state(config, compose_path) + + +def _write_compose(config, compose_path) -> None: + from decnet.composer import write_compose as _real_write_compose + _real_write_compose(config, compose_path) + + +def _get_bus(): + from decnet.bus.factory import get_bus + return get_bus() + +log = get_logger("engine.services_live") + +DeckyKind = Literal["fleet", "topology"] + + +class ServiceMutationError(ValueError): + """Raised for caller-correctable failures (unknown service, idempotency + violation, missing decky). The API layer maps subclasses / message + contents to 4xx codes; everything else surfaces as 500. + """ + + +def _validate_service_for_per_decky(name: str) -> BaseService: + """Return the registered service or raise ``ServiceMutationError``. + + ``fleet_singleton`` services run once per fleet (e.g. an LLMNR + responder), not per-decky — we reject the per-decky add/remove + request rather than silently producing a no-op compose entry. + """ + try: + svc = get_service(name) + except KeyError as exc: + raise ServiceMutationError(f"unknown service {name!r}") from exc + if svc.fleet_singleton: + raise ServiceMutationError( + f"service {name!r} is fleet_singleton; not addable per-decky" + ) + return svc + + +async def _publish(topic: str, payload: dict[str, Any]) -> None: + """Best-effort bus publish — same shape as the canary planter's helper.""" + try: + bus = _get_bus() + await bus.connect() + await bus.publish(topic, payload) + await bus.close() + except Exception as e: # noqa: BLE001 + log.warning("services_live bus publish failed topic=%s err=%s", topic, e) + + +# ---------------------------------------------------------- topology path + + +async def _topology_decky( + repo: BaseRepository, topology_id: str, decky_name: str, +) -> dict[str, Any]: + hydrated = await hydrate(repo, topology_id) + if hydrated is None: + raise ServiceMutationError(f"topology {topology_id!r} not found") + for d in hydrated["deckies"]: + cfg = d.get("decky_config") or {} + name = cfg.get("name") or d.get("name") + if name == decky_name: + return d + raise ServiceMutationError( + f"decky {decky_name!r} is not in topology {topology_id!r}" + ) + + +async def _rerender_topology_compose( + repo: BaseRepository, topology_id: str, +) -> Path: + """Re-hydrate + re-render the per-topology compose file. + + Called after a successful DB update so future deploys reflect the + change; without this the file would still describe the old service + set and a subsequent ``up -d`` would resurrect the removed service. + """ + hydrated = await hydrate(repo, topology_id) + if hydrated is None: # pragma: no cover — narrow race + raise ServiceMutationError( + f"topology {topology_id!r} disappeared mid-mutation" + ) + path = _topology_compose_path(topology_id) + _write_topology_compose(hydrated, path) + return path + + +async def _add_topology_service( + repo: BaseRepository, + topology_id: str, + decky_name: str, + service_name: str, +) -> list[str]: + decky = await _topology_decky(repo, topology_id, decky_name) + services: list[str] = list(decky.get("services") or []) + if service_name in services: + raise ServiceMutationError( + f"service {service_name!r} already on decky {decky_name!r}" + ) + services.append(service_name) + await repo.update_topology_decky(decky["uuid"], {"services": services}) + + compose_path = await _rerender_topology_compose(repo, topology_id) + target = f"{decky_name}-{service_name}" + # Run compose in a worker thread so the API event loop stays + # responsive — same pattern as engine/deployer.deploy_topology. + await anyio.to_thread.run_sync( + lambda: _compose( + "up", "-d", "--no-deps", "--build", target, + compose_file=compose_path, + ), + ) + return services + + +async def _remove_topology_service( + repo: BaseRepository, + topology_id: str, + decky_name: str, + service_name: str, +) -> list[str]: + decky = await _topology_decky(repo, topology_id, decky_name) + services: list[str] = list(decky.get("services") or []) + if service_name not in services: + raise ServiceMutationError( + f"service {service_name!r} not on decky {decky_name!r}" + ) + services = [s for s in services if s != service_name] + target = f"{decky_name}-{service_name}" + compose_path = _topology_compose_path(topology_id) + # Stop + rm before persisting + re-rendering so a half-completed + # mutation leaves the operator a clear state to retry from + # (container still running; DB still says service is on). + await anyio.to_thread.run_sync( + lambda: _compose("stop", target, compose_file=compose_path), + ) + await anyio.to_thread.run_sync( + lambda: _compose("rm", "-f", target, compose_file=compose_path), + ) + await repo.update_topology_decky(decky["uuid"], {"services": services}) + await _rerender_topology_compose(repo, topology_id) + return services + + +# ---------------------------------------------------------- fleet path + + +def _fleet_state_or_raise() -> tuple[Any, Path]: + state = _load_state() + if state is None: + raise ServiceMutationError( + "no fleet state on disk — run `decnet up` first" + ) + return state + + +def _fleet_find_decky(config: Any, decky_name: str) -> Any: + for d in config.deckies: + if d.name == decky_name: + return d + raise ServiceMutationError(f"fleet decky {decky_name!r} not found") + + +async def _persist_fleet_change( + repo: BaseRepository, decky: Any, services: list[str], compose_path: Path, +) -> None: + """Persist the mutation to JSON state, compose file, and the DB row.""" + config, _ = _load_state() # type: ignore[misc] — checked earlier + target = _fleet_find_decky(config, decky.name) + target.services = services + _save_state(config, compose_path) + _write_compose(config, compose_path) + # Mirror to the DB row so DB-only consumers (dashboard, API) see the + # change without waiting for the reconciler. + from decnet.web.db.models import LOCAL_HOST_SENTINEL + await repo.upsert_fleet_decky({ + "host_uuid": getattr(decky, "host_uuid", None) or LOCAL_HOST_SENTINEL, + "name": decky.name, + "services": services, + "decky_config": target.model_dump(mode="json"), + "decky_ip": decky.ip, + "state": "running", + }) + + +async def _add_fleet_service( + repo: BaseRepository, decky_name: str, service_name: str, +) -> list[str]: + config, compose_path = _fleet_state_or_raise() + decky = _fleet_find_decky(config, decky_name) + services: list[str] = list(decky.services or []) + if service_name in services: + raise ServiceMutationError( + f"service {service_name!r} already on decky {decky_name!r}" + ) + services.append(service_name) + await _persist_fleet_change(repo, decky, services, compose_path) + target = f"{decky_name}-{service_name}" + await anyio.to_thread.run_sync( + lambda: _compose( + "up", "-d", "--no-deps", "--build", target, + compose_file=compose_path, + ), + ) + return services + + +async def _remove_fleet_service( + repo: BaseRepository, decky_name: str, service_name: str, +) -> list[str]: + config, compose_path = _fleet_state_or_raise() + decky = _fleet_find_decky(config, decky_name) + services: list[str] = list(decky.services or []) + if service_name not in services: + raise ServiceMutationError( + f"service {service_name!r} not on decky {decky_name!r}" + ) + services = [s for s in services if s != service_name] + target = f"{decky_name}-{service_name}" + await anyio.to_thread.run_sync( + lambda: _compose("stop", target, compose_file=compose_path), + ) + await anyio.to_thread.run_sync( + lambda: _compose("rm", "-f", target, compose_file=compose_path), + ) + await _persist_fleet_change(repo, decky, services, compose_path) + return services + + +# ---------------------------------------------------------- public api + + +async def add_service( + repo: BaseRepository, + *, + decky_kind: DeckyKind, + decky_name: str, + service_name: str, + topology_id: Optional[str] = None, +) -> list[str]: + """Add *service_name* to a deployed decky. + + Validates the service registry (rejects unknown / fleet_singleton + names), persists the change, regenerates the compose file, runs + ``up -d --no-deps --build -`` in a worker thread, + and publishes ``decky..service.added`` on the bus. + + Returns the post-mutation services list. + """ + _validate_service_for_per_decky(service_name) + if decky_kind == "topology": + if not topology_id: + raise ServiceMutationError( + "decky_kind=topology requires topology_id", + ) + services = await _add_topology_service( + repo, topology_id, decky_name, service_name, + ) + elif decky_kind == "fleet": + services = await _add_fleet_service(repo, decky_name, service_name) + else: # pragma: no cover — Literal narrows + raise ServiceMutationError(f"unknown decky_kind {decky_kind!r}") + + await _publish( + topics.decky(decky_name, topics.DECKY_SERVICE_ADDED), + { + "decky_name": decky_name, + "service_name": service_name, + "topology_id": topology_id, + "services": services, + }, + ) + log.info( + "services_live.add decky=%s topology=%s service=%s", + decky_name, topology_id, service_name, + ) + return services + + +async def remove_service( + repo: BaseRepository, + *, + decky_kind: DeckyKind, + decky_name: str, + service_name: str, + topology_id: Optional[str] = None, +) -> list[str]: + """Remove *service_name* from a deployed decky. + + Stops + removes the service container, persists the new services + list, re-renders the compose file (so the next ``up -d`` doesn't + bring it back), and publishes ``decky..service.removed``. + + Returns the post-mutation services list. + """ + if decky_kind == "topology": + if not topology_id: + raise ServiceMutationError( + "decky_kind=topology requires topology_id", + ) + services = await _remove_topology_service( + repo, topology_id, decky_name, service_name, + ) + elif decky_kind == "fleet": + services = await _remove_fleet_service(repo, decky_name, service_name) + else: # pragma: no cover + raise ServiceMutationError(f"unknown decky_kind {decky_kind!r}") + + await _publish( + topics.decky(decky_name, topics.DECKY_SERVICE_REMOVED), + { + "decky_name": decky_name, + "service_name": service_name, + "topology_id": topology_id, + "services": services, + }, + ) + log.info( + "services_live.remove decky=%s topology=%s service=%s", + decky_name, topology_id, service_name, + ) + return services diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 79bb5b2c..9e1a3a17 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -66,6 +66,8 @@ from .deploy import ( from .decky import ( DeckyFileDeleteRequest, DeckyFileDropRequest, + DeckyServiceAddRequest, + DeckyServicesResponse, ) from .fleet import ( LOCAL_HOST_SENTINEL, @@ -228,6 +230,8 @@ __all__ = [ "LOCAL_HOST_SENTINEL", "DeckyFileDeleteRequest", "DeckyFileDropRequest", + "DeckyServiceAddRequest", + "DeckyServicesResponse", "FleetDecky", # health "ComponentHealth", diff --git a/decnet/web/db/models/decky.py b/decnet/web/db/models/decky.py index d53cee24..252ff4f2 100644 --- a/decnet/web/db/models/decky.py +++ b/decnet/web/db/models/decky.py @@ -44,6 +44,27 @@ class DeckyFileDropRequest(BaseModel): return v +class DeckyServiceAddRequest(BaseModel): + """Add a single service to an already-deployed decky. + + The service must be registered (see :mod:`decnet.services.registry`) + and must NOT be ``fleet_singleton`` — those run once fleet-wide, + not per-decky. Validation happens server-side in the engine layer + and surfaces as 422. + """ + name: str = PydanticField(..., min_length=1) + + +class DeckyServicesResponse(BaseModel): + """Post-mutation services list, returned by the live add/remove API. + + Lets the dashboard reflect the new shape without a follow-up GET. + """ + decky_name: str + topology_id: Optional[str] = None + services: list[str] + + class DeckyFileDeleteRequest(BaseModel): """Best-effort ``rm -f`` of an absolute path inside a decky container.""" decky_name: str = PydanticField(..., min_length=1) diff --git a/decnet/web/router/deckies/__init__.py b/decnet/web/router/deckies/__init__.py index 6194f59f..1df6dbab 100644 --- a/decnet/web/router/deckies/__init__.py +++ b/decnet/web/router/deckies/__init__.py @@ -14,8 +14,18 @@ from __future__ import annotations from fastapi import APIRouter from .api_file_drop import router as file_drop_router +from .api_services import ( + fleet_services_router, + topology_services_router, +) deckies_router = APIRouter() deckies_router.include_router(file_drop_router) +deckies_router.include_router(fleet_services_router) +# Topology service routes live under /topologies/{id}/... — the prefix +# is set on the router itself. Mounted under the same `deckies_router` +# umbrella because the *operation* (add/remove a service on a deployed +# decky) is identical; only the addressing scheme differs. +deckies_router.include_router(topology_services_router) __all__ = ["deckies_router"] diff --git a/decnet/web/router/deckies/api_services.py b/decnet/web/router/deckies/api_services.py new file mode 100644 index 00000000..7cfd58ae --- /dev/null +++ b/decnet/web/router/deckies/api_services.py @@ -0,0 +1,165 @@ +"""POST/DELETE …/{decky}/services — live service add/remove. + +Two scopes mounted here: + +* fleet: ``/api/v1/deckies/{decky_name}/services`` +* topology: ``/api/v1/topologies/{topology_id}/deckies/{decky_name}/services`` + +Both return the post-mutation services list so the dashboard can +re-render without a follow-up GET. + +Auth: ``require_admin`` everywhere (matches every other write op on +deckies — see :mod:`decnet.web.router.fleet.api_mutate_decky`). +""" +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException, Path + +from decnet.engine.services_live import ( + ServiceMutationError, + add_service, + remove_service, +) +from decnet.logging import get_logger +from decnet.web.db.models import ( + DeckyServiceAddRequest, + DeckyServicesResponse, +) +from decnet.web.dependencies import repo, require_admin + +log = get_logger("api.deckies.services") + + +fleet_services_router = APIRouter(tags=["Deckies"]) +topology_services_router = APIRouter(prefix="/topologies", tags=["Deckies"]) + + +def _map_mutation_error(exc: ServiceMutationError) -> HTTPException: + """Translate engine-layer errors into 4xx codes. + + Three cases the API reasonably distinguishes: + + * ``not found`` (decky / topology missing) → 404 + * ``already on`` / ``not on`` (idempotency violation) → 409 + * everything else (unknown service, fleet_singleton) → 422 + """ + msg = str(exc) + if "not found" in msg: + return HTTPException(status_code=404, detail=msg) + if "already on" in msg or "not on" in msg: + return HTTPException(status_code=409, detail=msg) + return HTTPException(status_code=422, detail=msg) + + +# ---------------------------------------------------------- fleet + +@fleet_services_router.post( + "/deckies/{decky_name}/services", + response_model=DeckyServicesResponse, + responses={ + 400: {"description": "Malformed request body"}, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Decky not found"}, + 409: {"description": "Service already on decky"}, + 422: {"description": "Unknown or fleet_singleton service"}, + }, +) +async def api_fleet_add_service( + req: DeckyServiceAddRequest, + decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"), + admin: dict = Depends(require_admin), +) -> DeckyServicesResponse: + try: + services = await add_service( + repo, decky_kind="fleet", + decky_name=decky_name, service_name=req.name, + ) + except ServiceMutationError as exc: + raise _map_mutation_error(exc) from exc + return DeckyServicesResponse(decky_name=decky_name, services=services) + + +@fleet_services_router.delete( + "/deckies/{decky_name}/services/{service_name}", + response_model=DeckyServicesResponse, + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Decky not found"}, + 409: {"description": "Service not on decky"}, + }, +) +async def api_fleet_remove_service( + decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"), + service_name: str = Path(..., pattern=r"^[a-z0-9_\-]{1,64}$"), + admin: dict = Depends(require_admin), +) -> DeckyServicesResponse: + try: + services = await remove_service( + repo, decky_kind="fleet", + decky_name=decky_name, service_name=service_name, + ) + except ServiceMutationError as exc: + raise _map_mutation_error(exc) from exc + return DeckyServicesResponse(decky_name=decky_name, services=services) + + +# ---------------------------------------------------------- topology + +@topology_services_router.post( + "/{topology_id}/deckies/{decky_name}/services", + response_model=DeckyServicesResponse, + responses={ + 400: {"description": "Malformed request body"}, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Topology or decky not found"}, + 409: {"description": "Service already on decky"}, + 422: {"description": "Unknown or fleet_singleton service"}, + }, +) +async def api_topology_add_service( + req: DeckyServiceAddRequest, + topology_id: str = Path(...), + decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"), + admin: dict = Depends(require_admin), +) -> DeckyServicesResponse: + try: + services = await add_service( + repo, decky_kind="topology", topology_id=topology_id, + decky_name=decky_name, service_name=req.name, + ) + except ServiceMutationError as exc: + raise _map_mutation_error(exc) from exc + return DeckyServicesResponse( + decky_name=decky_name, topology_id=topology_id, services=services, + ) + + +@topology_services_router.delete( + "/{topology_id}/deckies/{decky_name}/services/{service_name}", + response_model=DeckyServicesResponse, + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Topology or decky not found"}, + 409: {"description": "Service not on decky"}, + }, +) +async def api_topology_remove_service( + topology_id: str = Path(...), + decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"), + service_name: str = Path(..., pattern=r"^[a-z0-9_\-]{1,64}$"), + admin: dict = Depends(require_admin), +) -> DeckyServicesResponse: + try: + services = await remove_service( + repo, decky_kind="topology", topology_id=topology_id, + decky_name=decky_name, service_name=service_name, + ) + except ServiceMutationError as exc: + raise _map_mutation_error(exc) from exc + return DeckyServicesResponse( + decky_name=decky_name, topology_id=topology_id, services=services, + ) diff --git a/tests/api/deckies/test_services_api.py b/tests/api/deckies/test_services_api.py new file mode 100644 index 00000000..ba192dbb --- /dev/null +++ b/tests/api/deckies/test_services_api.py @@ -0,0 +1,161 @@ +"""End-to-end coverage for the live service add/remove endpoints. + +Covers both scopes: + +* fleet: POST/DELETE /api/v1/deckies/{decky}/services +* topology: POST/DELETE /api/v1/topologies/{id}/deckies/{decky}/services + +The engine layer's ``add_service``/``remove_service`` is patched so the +tests don't shell out to docker; the auth + routing + 4xx-mapping path +runs for real. +""" +from __future__ import annotations + +import httpx +import pytest + +from decnet.engine import services_live +from decnet.engine.services_live import ServiceMutationError + + +_FLEET_BASE = "/api/v1/deckies" +_TOPO_BASE = "/api/v1/topologies" + + +def _hdr(token: str) -> dict[str, str]: + return {"Authorization": f"Bearer {token}"} + + +# ---------------- fleet --------------------------------------------------- + + +@pytest.mark.asyncio +async def test_fleet_add_service_returns_post_mutation_list( + client: httpx.AsyncClient, auth_token: str, monkeypatch +) -> None: + async def _fake_add(repo, *, decky_kind, decky_name, service_name, topology_id=None): + assert decky_kind == "fleet" and topology_id is None + return ["http", service_name] + monkeypatch.setattr(services_live, "add_service", _fake_add) + + res = await client.post( + f"{_FLEET_BASE}/web1/services", + json={"name": "ssh"}, + headers=_hdr(auth_token), + ) + assert res.status_code == 200, res.text + body = res.json() + assert body["decky_name"] == "web1" + assert body["services"] == ["http", "ssh"] + assert body.get("topology_id") is None + + +@pytest.mark.asyncio +async def test_fleet_add_service_422_unknown_service( + client: httpx.AsyncClient, auth_token: str, monkeypatch +) -> None: + async def _fake_add(*a, **kw): + raise ServiceMutationError("unknown service 'bogus'") + monkeypatch.setattr(services_live, "add_service", _fake_add) + res = await client.post( + f"{_FLEET_BASE}/web1/services", + json={"name": "bogus"}, + headers=_hdr(auth_token), + ) + assert res.status_code == 422 + + +@pytest.mark.asyncio +async def test_fleet_add_service_409_already_present( + client: httpx.AsyncClient, auth_token: str, monkeypatch +) -> None: + async def _fake_add(*a, **kw): + raise ServiceMutationError("service 'ssh' already on decky 'web1'") + monkeypatch.setattr(services_live, "add_service", _fake_add) + res = await client.post( + f"{_FLEET_BASE}/web1/services", + json={"name": "ssh"}, + headers=_hdr(auth_token), + ) + assert res.status_code == 409 + + +@pytest.mark.asyncio +async def test_fleet_remove_service_returns_remaining( + client: httpx.AsyncClient, auth_token: str, monkeypatch +) -> None: + async def _fake_remove(repo, *, decky_kind, decky_name, service_name, topology_id=None): + return ["http"] + monkeypatch.setattr(services_live, "remove_service", _fake_remove) + res = await client.delete( + f"{_FLEET_BASE}/web1/services/ssh", + headers=_hdr(auth_token), + ) + assert res.status_code == 200 + assert res.json()["services"] == ["http"] + + +@pytest.mark.asyncio +async def test_fleet_remove_service_404_decky_missing( + client: httpx.AsyncClient, auth_token: str, monkeypatch +) -> None: + async def _fake_remove(*a, **kw): + raise ServiceMutationError("fleet decky 'ghost' not found") + monkeypatch.setattr(services_live, "remove_service", _fake_remove) + res = await client.delete( + f"{_FLEET_BASE}/ghost/services/ssh", + headers=_hdr(auth_token), + ) + assert res.status_code == 404 + + +# ---------------- topology ------------------------------------------------ + + +@pytest.mark.asyncio +async def test_topology_add_service_returns_post_mutation_list( + client: httpx.AsyncClient, auth_token: str, monkeypatch +) -> None: + async def _fake_add(repo, *, decky_kind, topology_id, decky_name, service_name): + assert decky_kind == "topology" + assert topology_id == "abc123" + return ["http", service_name] + monkeypatch.setattr(services_live, "add_service", _fake_add) + res = await client.post( + f"{_TOPO_BASE}/abc123/deckies/web1/services", + json={"name": "ssh"}, + headers=_hdr(auth_token), + ) + assert res.status_code == 200, res.text + body = res.json() + assert body["decky_name"] == "web1" + assert body["topology_id"] == "abc123" + assert body["services"] == ["http", "ssh"] + + +@pytest.mark.asyncio +async def test_topology_remove_service_round_trip( + client: httpx.AsyncClient, auth_token: str, monkeypatch +) -> None: + async def _fake_remove(repo, *, decky_kind, topology_id, decky_name, service_name): + return [] + monkeypatch.setattr(services_live, "remove_service", _fake_remove) + res = await client.delete( + f"{_TOPO_BASE}/abc123/deckies/router/services/dns", + headers=_hdr(auth_token), + ) + assert res.status_code == 200 + assert res.json()["services"] == [] + + +# ---------------- auth ---------------------------------------------------- + + +@pytest.mark.asyncio +async def test_unauthenticated_service_mutation_rejected( + client: httpx.AsyncClient, +) -> None: + res = await client.post( + f"{_FLEET_BASE}/web1/services", json={"name": "ssh"}, + ) + assert res.status_code in (401, 403) diff --git a/tests/engine/__init__.py b/tests/engine/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/engine/test_services_live.py b/tests/engine/test_services_live.py new file mode 100644 index 00000000..bdee7d73 --- /dev/null +++ b/tests/engine/test_services_live.py @@ -0,0 +1,199 @@ +"""Unit coverage for engine.services_live add/remove flows. + +We don't shell out to docker — :func:`engine.deployer._compose` is +patched to a no-op recorder. The DB (SQLite) and the topology +hydrator run for real so the persistence path is exercised end-to-end. +""" +from __future__ import annotations + +import json +from typing import AsyncIterator + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.engine import services_live +from decnet.engine.services_live import ServiceMutationError +from decnet.web.db.sqlite.repository import SQLiteRepository +import decnet.web.db.models # noqa: F401 — register tables + + +@pytest_asyncio.fixture +async def repo(tmp_path) -> AsyncIterator[SQLiteRepository]: + r = SQLiteRepository(str(tmp_path / "p.db")) + await r.initialize() + yield r + + +@pytest_asyncio.fixture +async def fake_bus(monkeypatch) -> AsyncIterator[FakeBus]: + bus = FakeBus() + await bus.connect() + # services_live publishes via get_bus(); rebind to the fake. + from decnet.bus import factory + monkeypatch.setattr(factory, "get_bus", lambda: bus) + yield bus + await bus.close() + + +@pytest_asyncio.fixture +async def topology_with_decky(repo: SQLiteRepository) -> dict: + """Persist one topology + one decky and return the IDs.""" + topo_id = await repo.create_topology({ + "name": "test-topo", "description": "", + }) + decky_uuid = await repo.create_topology_decky({ + "topology_id": topo_id, + "name": "web1", + "ip": "10.0.0.5", + "decky_config": {"name": "web1", "ips_by_lan": {}}, + "services": ["http"], + "state": "running", + }) + return {"topology_id": topo_id, "decky_uuid": decky_uuid} + + +# ---------------- topology add -------------------------------------------- + + +@pytest.mark.asyncio +async def test_topology_add_service_persists_and_runs_compose_up( + repo: SQLiteRepository, topology_with_decky: dict, fake_bus: FakeBus, + monkeypatch, tmp_path, +) -> None: + captured: list[tuple[str, ...]] = [] + + def fake_compose(*args, compose_file=None, env=None): + captured.append(args) + + monkeypatch.setattr(services_live, "_compose", fake_compose) + # Avoid touching the real per-topology compose file path on disk. + monkeypatch.setattr( + services_live, "_topology_compose_path", + lambda topo_id: tmp_path / f"compose-{topo_id[:8]}.yml", + ) + sub = fake_bus.subscribe("decky.>") + services = await services_live.add_service( + repo, decky_kind="topology", + topology_id=topology_with_decky["topology_id"], + decky_name="web1", service_name="ssh", + ) + assert services == ["http", "ssh"] + # Compose up was called targeting just the new service container. + assert captured and captured[0][:5] == ( + "up", "-d", "--no-deps", "--build", "web1-ssh", + ) + # Persisted to the DB. + row = await repo.get_topology_decky(topology_with_decky["decky_uuid"]) + persisted_services = json.loads(row["services"]) if isinstance(row["services"], str) else row["services"] + assert "ssh" in persisted_services + # Bus event published. + import asyncio + event = await asyncio.wait_for(sub.__anext__(), timeout=1.0) + assert event.topic == "decky.web1.service.added" + assert event.payload["service_name"] == "ssh" + assert event.payload["topology_id"] == topology_with_decky["topology_id"] + + +@pytest.mark.asyncio +async def test_topology_add_service_rejects_unknown( + repo: SQLiteRepository, topology_with_decky: dict, fake_bus: FakeBus, +) -> None: + with pytest.raises(ServiceMutationError, match="unknown service"): + await services_live.add_service( + repo, decky_kind="topology", + topology_id=topology_with_decky["topology_id"], + decky_name="web1", service_name="not-a-real-service", + ) + + +@pytest.mark.asyncio +async def test_topology_add_service_rejects_duplicate( + repo: SQLiteRepository, topology_with_decky: dict, fake_bus: FakeBus, + monkeypatch, tmp_path, +) -> None: + monkeypatch.setattr(services_live, "_compose", lambda *a, **kw: None) + monkeypatch.setattr( + services_live, "_topology_compose_path", + lambda topo_id: tmp_path / f"compose-{topo_id[:8]}.yml", + ) + with pytest.raises(ServiceMutationError, match="already on"): + await services_live.add_service( + repo, decky_kind="topology", + topology_id=topology_with_decky["topology_id"], + decky_name="web1", service_name="http", # already on + ) + + +@pytest.mark.asyncio +async def test_topology_add_service_404_decky_not_in_topology( + repo: SQLiteRepository, topology_with_decky: dict, fake_bus: FakeBus, +) -> None: + with pytest.raises(ServiceMutationError, match="not in topology"): + await services_live.add_service( + repo, decky_kind="topology", + topology_id=topology_with_decky["topology_id"], + decky_name="ghost", service_name="ssh", + ) + + +# ---------------- topology remove ----------------------------------------- + + +@pytest.mark.asyncio +async def test_topology_remove_service_runs_stop_then_rm( + repo: SQLiteRepository, topology_with_decky: dict, fake_bus: FakeBus, + monkeypatch, tmp_path, +) -> None: + captured: list[tuple[str, ...]] = [] + monkeypatch.setattr( + services_live, "_compose", + lambda *a, **kw: captured.append(a), + ) + monkeypatch.setattr( + services_live, "_topology_compose_path", + lambda topo_id: tmp_path / f"compose-{topo_id[:8]}.yml", + ) + services = await services_live.remove_service( + repo, decky_kind="topology", + topology_id=topology_with_decky["topology_id"], + decky_name="web1", service_name="http", + ) + assert services == [] + # Stop, then rm -f, in that order. + assert captured[0] == ("stop", "web1-http") + assert captured[1] == ("rm", "-f", "web1-http") + + +@pytest.mark.asyncio +async def test_topology_remove_service_rejects_when_absent( + repo: SQLiteRepository, topology_with_decky: dict, fake_bus: FakeBus, +) -> None: + with pytest.raises(ServiceMutationError, match="not on"): + await services_live.remove_service( + repo, decky_kind="topology", + topology_id=topology_with_decky["topology_id"], + decky_name="web1", service_name="ssh", # not on + ) + + +# ---------------- service registry validation ----------------------------- + + +def test_validate_rejects_fleet_singleton_services() -> None: + """``fleet_singleton`` services run once fleet-wide, not per-decky.""" + from decnet.services.registry import all_services + singletons = [ + name for name, svc in all_services().items() if svc.fleet_singleton + ] + if not singletons: + pytest.skip("no fleet_singleton services registered") + name = singletons[0] + with pytest.raises(ServiceMutationError, match="fleet_singleton"): + services_live._validate_service_for_per_decky(name) + + +def test_validate_accepts_per_decky_service() -> None: + svc = services_live._validate_service_for_per_decky("ssh") + assert svc.name == "ssh"