feat(deckies): live service add/remove without full redeploy

decnet.engine.services_live exposes add_service / remove_service for
both fleet and topology decky scopes.  The host's _compose() wrapper
already supported per-service targeting (up --no-deps -d <svc>,
stop, rm -f); what was missing was the orchestration around it:

* add: validate against decnet.services.registry (rejects unknown +
  fleet_singleton); persist the new services list; re-render the
  per-scope compose file (so future redeploys reflect the change);
  run docker compose up -d --no-deps --build <decky>-<svc>.
* remove: stop + rm -f the service container; persist; re-render
  compose so a future up -d doesn't bring it back.

Both publish decky.<name>.service.added / .removed on the bus, with
the post-mutation services list.  Topic constants added to
decnet.bus.topics; the matching wiki entry in wiki-checkout/Service-Bus.md
ships in a separate commit on the wiki repo (wiki-checkout/ is gitignored).

Four new admin endpoints:

* POST/DELETE /api/v1/deckies/{name}/services{,/svc}
* POST/DELETE /api/v1/topologies/{id}/deckies/{name}/services{,/svc}

ServiceMutationError messages are mapped at the API boundary to 404
(decky/topology missing), 409 (idempotency violation), 422 (unknown
or fleet_singleton service).
This commit is contained in:
2026-04-28 22:51:42 -04:00
parent 0bc4b05c73
commit 6ac8cac908
9 changed files with 965 additions and 0 deletions

View File

@@ -83,6 +83,13 @@ DECKY_MUTATE_REQUEST = "mutate_request"
# syslog sidechannel too) to interleave substrate-change markers into
# attacker traversals.
DECKY_MUTATION = "mutation"
# Per-service add/remove on a deployed decky (live; no full redeploy).
# Payload carries ``decky_name``, ``service_name``, optional
# ``topology_id``, and ``services`` (the post-mutation list). Consumers
# that watch substrate shape (correlator, dashboard, profiler) reconcile
# off these without waiting for the next decnet-state.json snapshot.
DECKY_SERVICE_ADDED = "service.added"
DECKY_SERVICE_REMOVED = "service.removed"
# Attacker event types (second token under the ``attacker`` root). First
# sighting, session boundary transitions, and score-threshold crossings

View File

@@ -0,0 +1,398 @@
"""Add/remove a single service on a deployed decky without full redeploy.
The ``_compose()`` wrapper in :mod:`decnet.engine.deployer` already
supports per-service targeting (``up --no-deps -d <svc>``,
``stop <svc>``, ``rm -f <svc>``). What was missing was the
orchestration: regenerate the compose file (so future redeploys reflect
the change), persist the new ``services`` list, and run the targeted
compose command.
Two scopes:
* **Topology** — source of truth is the ``topology_deckies`` table; the
compose file is per-topology (``decnet-topology-<id8>-compose.yml``).
* **Fleet** — source of truth is ``decnet-state.json`` (with the
``fleet_deckies`` table mirroring it); compose is the unihost
``decnet-compose.yml``.
Both publish ``decky.<name>.service.added`` /
``decky.<name>.service.removed`` on the bus. The new topic constants
are documented in ``wiki-checkout/Service-Bus.md``.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Literal, Optional
import anyio
from decnet.bus import topics
from decnet.logging import get_logger
from decnet.services.base import BaseService
from decnet.services.registry import get_service
from decnet.topology.persistence import hydrate
from decnet.web.db.repository import BaseRepository
# Heavy imports (composer/deployer pull in decnet.network → docker) are
# deferred to call-sites via the ``_compose`` / ``_topology_compose_path``
# / ``_load_state`` indirection helpers below. Mirrors the lazy-import
# pattern in decnet.canary.planter for the same reason.
def _compose(*args: str, compose_file: Optional[Path] = None, env=None) -> None:
"""Indirection so tests can ``monkeypatch.setattr(services_live, '_compose', ...)``.
Real implementation lives in :mod:`decnet.engine.deployer`; we
import-and-delegate at call time to keep this module's import graph
clean (see module docstring above).
"""
from decnet.engine.deployer import _compose as _real_compose
if compose_file is None:
_real_compose(*args, env=env)
else:
_real_compose(*args, compose_file=compose_file, env=env)
def _topology_compose_path(topology_id: str) -> Path:
from decnet.engine.deployer import _topology_compose_path as _real_path
return _real_path(topology_id)
def _write_topology_compose(hydrated, path: Path) -> Path:
from decnet.topology.compose import write_topology_compose
return write_topology_compose(hydrated, path)
def _load_state():
from decnet.config import load_state as _real_load_state
return _real_load_state()
def _save_state(config, compose_path) -> None:
from decnet.config import save_state as _real_save_state
_real_save_state(config, compose_path)
def _write_compose(config, compose_path) -> None:
from decnet.composer import write_compose as _real_write_compose
_real_write_compose(config, compose_path)
def _get_bus():
from decnet.bus.factory import get_bus
return get_bus()
log = get_logger("engine.services_live")
DeckyKind = Literal["fleet", "topology"]
class ServiceMutationError(ValueError):
"""Raised for caller-correctable failures (unknown service, idempotency
violation, missing decky). The API layer maps subclasses / message
contents to 4xx codes; everything else surfaces as 500.
"""
def _validate_service_for_per_decky(name: str) -> BaseService:
"""Return the registered service or raise ``ServiceMutationError``.
``fleet_singleton`` services run once per fleet (e.g. an LLMNR
responder), not per-decky — we reject the per-decky add/remove
request rather than silently producing a no-op compose entry.
"""
try:
svc = get_service(name)
except KeyError as exc:
raise ServiceMutationError(f"unknown service {name!r}") from exc
if svc.fleet_singleton:
raise ServiceMutationError(
f"service {name!r} is fleet_singleton; not addable per-decky"
)
return svc
async def _publish(topic: str, payload: dict[str, Any]) -> None:
"""Best-effort bus publish — same shape as the canary planter's helper."""
try:
bus = _get_bus()
await bus.connect()
await bus.publish(topic, payload)
await bus.close()
except Exception as e: # noqa: BLE001
log.warning("services_live bus publish failed topic=%s err=%s", topic, e)
# ---------------------------------------------------------- topology path
async def _topology_decky(
repo: BaseRepository, topology_id: str, decky_name: str,
) -> dict[str, Any]:
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
raise ServiceMutationError(f"topology {topology_id!r} not found")
for d in hydrated["deckies"]:
cfg = d.get("decky_config") or {}
name = cfg.get("name") or d.get("name")
if name == decky_name:
return d
raise ServiceMutationError(
f"decky {decky_name!r} is not in topology {topology_id!r}"
)
async def _rerender_topology_compose(
repo: BaseRepository, topology_id: str,
) -> Path:
"""Re-hydrate + re-render the per-topology compose file.
Called after a successful DB update so future deploys reflect the
change; without this the file would still describe the old service
set and a subsequent ``up -d`` would resurrect the removed service.
"""
hydrated = await hydrate(repo, topology_id)
if hydrated is None: # pragma: no cover — narrow race
raise ServiceMutationError(
f"topology {topology_id!r} disappeared mid-mutation"
)
path = _topology_compose_path(topology_id)
_write_topology_compose(hydrated, path)
return path
async def _add_topology_service(
repo: BaseRepository,
topology_id: str,
decky_name: str,
service_name: str,
) -> list[str]:
decky = await _topology_decky(repo, topology_id, decky_name)
services: list[str] = list(decky.get("services") or [])
if service_name in services:
raise ServiceMutationError(
f"service {service_name!r} already on decky {decky_name!r}"
)
services.append(service_name)
await repo.update_topology_decky(decky["uuid"], {"services": services})
compose_path = await _rerender_topology_compose(repo, topology_id)
target = f"{decky_name}-{service_name}"
# Run compose in a worker thread so the API event loop stays
# responsive — same pattern as engine/deployer.deploy_topology.
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--build", target,
compose_file=compose_path,
),
)
return services
async def _remove_topology_service(
repo: BaseRepository,
topology_id: str,
decky_name: str,
service_name: str,
) -> list[str]:
decky = await _topology_decky(repo, topology_id, decky_name)
services: list[str] = list(decky.get("services") or [])
if service_name not in services:
raise ServiceMutationError(
f"service {service_name!r} not on decky {decky_name!r}"
)
services = [s for s in services if s != service_name]
target = f"{decky_name}-{service_name}"
compose_path = _topology_compose_path(topology_id)
# Stop + rm before persisting + re-rendering so a half-completed
# mutation leaves the operator a clear state to retry from
# (container still running; DB still says service is on).
await anyio.to_thread.run_sync(
lambda: _compose("stop", target, compose_file=compose_path),
)
await anyio.to_thread.run_sync(
lambda: _compose("rm", "-f", target, compose_file=compose_path),
)
await repo.update_topology_decky(decky["uuid"], {"services": services})
await _rerender_topology_compose(repo, topology_id)
return services
# ---------------------------------------------------------- fleet path
def _fleet_state_or_raise() -> tuple[Any, Path]:
state = _load_state()
if state is None:
raise ServiceMutationError(
"no fleet state on disk — run `decnet up` first"
)
return state
def _fleet_find_decky(config: Any, decky_name: str) -> Any:
for d in config.deckies:
if d.name == decky_name:
return d
raise ServiceMutationError(f"fleet decky {decky_name!r} not found")
async def _persist_fleet_change(
repo: BaseRepository, decky: Any, services: list[str], compose_path: Path,
) -> None:
"""Persist the mutation to JSON state, compose file, and the DB row."""
config, _ = _load_state() # type: ignore[misc] — checked earlier
target = _fleet_find_decky(config, decky.name)
target.services = services
_save_state(config, compose_path)
_write_compose(config, compose_path)
# Mirror to the DB row so DB-only consumers (dashboard, API) see the
# change without waiting for the reconciler.
from decnet.web.db.models import LOCAL_HOST_SENTINEL
await repo.upsert_fleet_decky({
"host_uuid": getattr(decky, "host_uuid", None) or LOCAL_HOST_SENTINEL,
"name": decky.name,
"services": services,
"decky_config": target.model_dump(mode="json"),
"decky_ip": decky.ip,
"state": "running",
})
async def _add_fleet_service(
repo: BaseRepository, decky_name: str, service_name: str,
) -> list[str]:
config, compose_path = _fleet_state_or_raise()
decky = _fleet_find_decky(config, decky_name)
services: list[str] = list(decky.services or [])
if service_name in services:
raise ServiceMutationError(
f"service {service_name!r} already on decky {decky_name!r}"
)
services.append(service_name)
await _persist_fleet_change(repo, decky, services, compose_path)
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--build", target,
compose_file=compose_path,
),
)
return services
async def _remove_fleet_service(
repo: BaseRepository, decky_name: str, service_name: str,
) -> list[str]:
config, compose_path = _fleet_state_or_raise()
decky = _fleet_find_decky(config, decky_name)
services: list[str] = list(decky.services or [])
if service_name not in services:
raise ServiceMutationError(
f"service {service_name!r} not on decky {decky_name!r}"
)
services = [s for s in services if s != service_name]
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose("stop", target, compose_file=compose_path),
)
await anyio.to_thread.run_sync(
lambda: _compose("rm", "-f", target, compose_file=compose_path),
)
await _persist_fleet_change(repo, decky, services, compose_path)
return services
# ---------------------------------------------------------- public api
async def add_service(
repo: BaseRepository,
*,
decky_kind: DeckyKind,
decky_name: str,
service_name: str,
topology_id: Optional[str] = None,
) -> list[str]:
"""Add *service_name* to a deployed decky.
Validates the service registry (rejects unknown / fleet_singleton
names), persists the change, regenerates the compose file, runs
``up -d --no-deps --build <decky>-<service>`` in a worker thread,
and publishes ``decky.<name>.service.added`` on the bus.
Returns the post-mutation services list.
"""
_validate_service_for_per_decky(service_name)
if decky_kind == "topology":
if not topology_id:
raise ServiceMutationError(
"decky_kind=topology requires topology_id",
)
services = await _add_topology_service(
repo, topology_id, decky_name, service_name,
)
elif decky_kind == "fleet":
services = await _add_fleet_service(repo, decky_name, service_name)
else: # pragma: no cover — Literal narrows
raise ServiceMutationError(f"unknown decky_kind {decky_kind!r}")
await _publish(
topics.decky(decky_name, topics.DECKY_SERVICE_ADDED),
{
"decky_name": decky_name,
"service_name": service_name,
"topology_id": topology_id,
"services": services,
},
)
log.info(
"services_live.add decky=%s topology=%s service=%s",
decky_name, topology_id, service_name,
)
return services
async def remove_service(
repo: BaseRepository,
*,
decky_kind: DeckyKind,
decky_name: str,
service_name: str,
topology_id: Optional[str] = None,
) -> list[str]:
"""Remove *service_name* from a deployed decky.
Stops + removes the service container, persists the new services
list, re-renders the compose file (so the next ``up -d`` doesn't
bring it back), and publishes ``decky.<name>.service.removed``.
Returns the post-mutation services list.
"""
if decky_kind == "topology":
if not topology_id:
raise ServiceMutationError(
"decky_kind=topology requires topology_id",
)
services = await _remove_topology_service(
repo, topology_id, decky_name, service_name,
)
elif decky_kind == "fleet":
services = await _remove_fleet_service(repo, decky_name, service_name)
else: # pragma: no cover
raise ServiceMutationError(f"unknown decky_kind {decky_kind!r}")
await _publish(
topics.decky(decky_name, topics.DECKY_SERVICE_REMOVED),
{
"decky_name": decky_name,
"service_name": service_name,
"topology_id": topology_id,
"services": services,
},
)
log.info(
"services_live.remove decky=%s topology=%s service=%s",
decky_name, topology_id, service_name,
)
return services

View File

@@ -66,6 +66,8 @@ from .deploy import (
from .decky import (
DeckyFileDeleteRequest,
DeckyFileDropRequest,
DeckyServiceAddRequest,
DeckyServicesResponse,
)
from .fleet import (
LOCAL_HOST_SENTINEL,
@@ -228,6 +230,8 @@ __all__ = [
"LOCAL_HOST_SENTINEL",
"DeckyFileDeleteRequest",
"DeckyFileDropRequest",
"DeckyServiceAddRequest",
"DeckyServicesResponse",
"FleetDecky",
# health
"ComponentHealth",

View File

@@ -44,6 +44,27 @@ class DeckyFileDropRequest(BaseModel):
return v
class DeckyServiceAddRequest(BaseModel):
"""Add a single service to an already-deployed decky.
The service must be registered (see :mod:`decnet.services.registry`)
and must NOT be ``fleet_singleton`` — those run once fleet-wide,
not per-decky. Validation happens server-side in the engine layer
and surfaces as 422.
"""
name: str = PydanticField(..., min_length=1)
class DeckyServicesResponse(BaseModel):
"""Post-mutation services list, returned by the live add/remove API.
Lets the dashboard reflect the new shape without a follow-up GET.
"""
decky_name: str
topology_id: Optional[str] = None
services: list[str]
class DeckyFileDeleteRequest(BaseModel):
"""Best-effort ``rm -f`` of an absolute path inside a decky container."""
decky_name: str = PydanticField(..., min_length=1)

View File

@@ -14,8 +14,18 @@ from __future__ import annotations
from fastapi import APIRouter
from .api_file_drop import router as file_drop_router
from .api_services import (
fleet_services_router,
topology_services_router,
)
deckies_router = APIRouter()
deckies_router.include_router(file_drop_router)
deckies_router.include_router(fleet_services_router)
# Topology service routes live under /topologies/{id}/... — the prefix
# is set on the router itself. Mounted under the same `deckies_router`
# umbrella because the *operation* (add/remove a service on a deployed
# decky) is identical; only the addressing scheme differs.
deckies_router.include_router(topology_services_router)
__all__ = ["deckies_router"]

View File

@@ -0,0 +1,165 @@
"""POST/DELETE …/{decky}/services — live service add/remove.
Two scopes mounted here:
* fleet: ``/api/v1/deckies/{decky_name}/services``
* topology: ``/api/v1/topologies/{topology_id}/deckies/{decky_name}/services``
Both return the post-mutation services list so the dashboard can
re-render without a follow-up GET.
Auth: ``require_admin`` everywhere (matches every other write op on
deckies — see :mod:`decnet.web.router.fleet.api_mutate_decky`).
"""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, Path
from decnet.engine.services_live import (
ServiceMutationError,
add_service,
remove_service,
)
from decnet.logging import get_logger
from decnet.web.db.models import (
DeckyServiceAddRequest,
DeckyServicesResponse,
)
from decnet.web.dependencies import repo, require_admin
log = get_logger("api.deckies.services")
fleet_services_router = APIRouter(tags=["Deckies"])
topology_services_router = APIRouter(prefix="/topologies", tags=["Deckies"])
def _map_mutation_error(exc: ServiceMutationError) -> HTTPException:
"""Translate engine-layer errors into 4xx codes.
Three cases the API reasonably distinguishes:
* ``not found`` (decky / topology missing) → 404
* ``already on`` / ``not on`` (idempotency violation) → 409
* everything else (unknown service, fleet_singleton) → 422
"""
msg = str(exc)
if "not found" in msg:
return HTTPException(status_code=404, detail=msg)
if "already on" in msg or "not on" in msg:
return HTTPException(status_code=409, detail=msg)
return HTTPException(status_code=422, detail=msg)
# ---------------------------------------------------------- fleet
@fleet_services_router.post(
"/deckies/{decky_name}/services",
response_model=DeckyServicesResponse,
responses={
400: {"description": "Malformed request body"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Decky not found"},
409: {"description": "Service already on decky"},
422: {"description": "Unknown or fleet_singleton service"},
},
)
async def api_fleet_add_service(
req: DeckyServiceAddRequest,
decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"),
admin: dict = Depends(require_admin),
) -> DeckyServicesResponse:
try:
services = await add_service(
repo, decky_kind="fleet",
decky_name=decky_name, service_name=req.name,
)
except ServiceMutationError as exc:
raise _map_mutation_error(exc) from exc
return DeckyServicesResponse(decky_name=decky_name, services=services)
@fleet_services_router.delete(
"/deckies/{decky_name}/services/{service_name}",
response_model=DeckyServicesResponse,
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Decky not found"},
409: {"description": "Service not on decky"},
},
)
async def api_fleet_remove_service(
decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"),
service_name: str = Path(..., pattern=r"^[a-z0-9_\-]{1,64}$"),
admin: dict = Depends(require_admin),
) -> DeckyServicesResponse:
try:
services = await remove_service(
repo, decky_kind="fleet",
decky_name=decky_name, service_name=service_name,
)
except ServiceMutationError as exc:
raise _map_mutation_error(exc) from exc
return DeckyServicesResponse(decky_name=decky_name, services=services)
# ---------------------------------------------------------- topology
@topology_services_router.post(
"/{topology_id}/deckies/{decky_name}/services",
response_model=DeckyServicesResponse,
responses={
400: {"description": "Malformed request body"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Topology or decky not found"},
409: {"description": "Service already on decky"},
422: {"description": "Unknown or fleet_singleton service"},
},
)
async def api_topology_add_service(
req: DeckyServiceAddRequest,
topology_id: str = Path(...),
decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"),
admin: dict = Depends(require_admin),
) -> DeckyServicesResponse:
try:
services = await add_service(
repo, decky_kind="topology", topology_id=topology_id,
decky_name=decky_name, service_name=req.name,
)
except ServiceMutationError as exc:
raise _map_mutation_error(exc) from exc
return DeckyServicesResponse(
decky_name=decky_name, topology_id=topology_id, services=services,
)
@topology_services_router.delete(
"/{topology_id}/deckies/{decky_name}/services/{service_name}",
response_model=DeckyServicesResponse,
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Topology or decky not found"},
409: {"description": "Service not on decky"},
},
)
async def api_topology_remove_service(
topology_id: str = Path(...),
decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"),
service_name: str = Path(..., pattern=r"^[a-z0-9_\-]{1,64}$"),
admin: dict = Depends(require_admin),
) -> DeckyServicesResponse:
try:
services = await remove_service(
repo, decky_kind="topology", topology_id=topology_id,
decky_name=decky_name, service_name=service_name,
)
except ServiceMutationError as exc:
raise _map_mutation_error(exc) from exc
return DeckyServicesResponse(
decky_name=decky_name, topology_id=topology_id, services=services,
)