diff --git a/decnet/agent/topology_ops.py b/decnet/agent/topology_ops.py index 7a03233d..f83c6ddd 100644 --- a/decnet/agent/topology_ops.py +++ b/decnet/agent/topology_ops.py @@ -28,6 +28,7 @@ from decnet.engine.deployer import ( _compose_with_retry, _teardown_order, _topology_compose_path, + _topology_compose_project, ) from decnet.logging import get_logger from decnet.network import create_bridge_network, remove_bridge_network @@ -118,12 +119,16 @@ def _materialise(hydrated: dict[str, Any], topology_id: str) -> None: the base is the cheapest way to make this race impossible. """ compose_path = _topology_compose_path(topology_id) + compose_project = _topology_compose_project(topology_id) client = docker.from_env() for lan in hydrated["lans"]: net_name = _topology_network_name(topology_id, lan["name"]) create_bridge_network(client, net_name, lan["subnet"], internal=not lan["is_dmz"]) write_topology_compose(hydrated, compose_path) - _compose_with_retry("up", "--build", "-d", "--always-recreate-deps", compose_file=compose_path) + _compose_with_retry( + "up", "--build", "-d", "--always-recreate-deps", + compose_file=compose_path, project=compose_project, + ) async def apply( @@ -160,12 +165,16 @@ async def teardown( # LAN membership list via the hydrated blob if available. hydrated = row.hydrated if row and row.topology_id == topology_id else None compose_path = _topology_compose_path(topology_id) + compose_project = _topology_compose_project(topology_id) client = docker.from_env() def _dismantle() -> None: if compose_path.exists(): try: - _compose("down", "--remove-orphans", compose_file=compose_path) + _compose( + "down", "--remove-orphans", + compose_file=compose_path, project=compose_project, + ) except subprocess.CalledProcessError as exc: log.warning( "topology %s compose down failed (continuing): %s", diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index ba9a1295..08dcfac7 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -50,6 +50,7 @@ from decnet.topology.validate import ( log = get_logger("engine") console = Console() COMPOSE_FILE = Path("decnet-compose.yml") +FLEET_COMPOSE_PROJECT = "decnet" _CANONICAL_LOGGING = Path(__file__).parent.parent / "templates" / "syslog_bridge.py" _CANONICAL_INSTANCE_SEED = Path(__file__).parent.parent / "templates" / "instance_seed.py" _CANONICAL_SESSREC_DIR = Path(__file__).parent.parent / "templates" / "_shared" / "sessrec" @@ -222,7 +223,9 @@ def _sync_caddy_modules(config: DecnetConfig) -> None: _chown_tree(dest_child, src_dir) -def _compose_ps(compose_file: Path) -> list[dict[str, object]]: +def _compose_ps( + compose_file: Path, project: str = FLEET_COMPOSE_PROJECT, +) -> list[dict[str, object]]: """Return ``docker compose ps`` rows for *compose_file* as parsed JSON. Used for post-deploy verification: ``compose up -d`` returns 0 the @@ -232,7 +235,7 @@ def _compose_ps(compose_file: Path) -> list[dict[str, object]]: parse failure — caller treats that as 'unverifiable, don't gate'). """ cmd = [ - "docker", "compose", "-p", "decnet", "-f", str(compose_file), + "docker", "compose", "-p", project, "-f", str(compose_file), "ps", "--all", "--format", "json", ] try: @@ -264,13 +267,21 @@ def _compose_ps(compose_file: Path) -> list[dict[str, object]]: return rows -def _compose(*args: str, compose_file: Path = COMPOSE_FILE, env: dict | None = None) -> None: +def _compose( + *args: str, + compose_file: Path = COMPOSE_FILE, + env: dict | None = None, + project: str = FLEET_COMPOSE_PROJECT, +) -> None: import os - # -p decnet pins the compose project name. Without it, docker compose + # -p pins the compose project name. Without it, docker compose # derives the project from basename($PWD); when a daemon (systemd) runs # with WorkingDirectory=/ that basename is empty and compose aborts with - # "project name must not be empty". - cmd = ["docker", "compose", "-p", "decnet", "-f", str(compose_file), *args] + # "project name must not be empty". Each scope (fleet, individual + # topology) gets its OWN project so `--remove-orphans` only sweeps + # containers in that scope — without this, a fleet redeploy or a + # topology teardown blasts every other scope's containers as orphans. + cmd = ["docker", "compose", "-p", project, "-f", str(compose_file), *args] merged = {**os.environ, **(env or {})} result = subprocess.run(cmd, capture_output=True, text=True, env=merged) # nosec B603 if result.stdout: @@ -424,15 +435,13 @@ def _compose_with_retry( retries: int = 3, delay: float = 5.0, env: dict | None = None, + project: str = FLEET_COMPOSE_PROJECT, ) -> None: """Run a docker compose command, retrying on transient failures.""" import os last_exc: subprocess.CalledProcessError | None = None - # -p decnet pins the compose project name. Without it, docker compose - # derives the project from basename($PWD); when a daemon (systemd) runs - # with WorkingDirectory=/ that basename is empty and compose aborts with - # "project name must not be empty". - cmd = ["docker", "compose", "-p", "decnet", "-f", str(compose_file), *args] + # See ``_compose`` for the project-name rationale. + cmd = ["docker", "compose", "-p", project, "-f", str(compose_file), *args] merged = {**os.environ, **(env or {})} # Preflight: if buildx already looks wedged before the first attempt, @@ -825,6 +834,18 @@ def _topology_compose_path(topology_id: str) -> Path: return Path(f"decnet-topology-{topology_id[:8]}-compose.yml") +def _topology_compose_project(topology_id: str) -> str: + """Per-topology docker compose project name. + + Each topology is its OWN compose project so ``--remove-orphans`` + during teardown or rollback sweeps only that topology's containers, + not the flat fleet or sibling topologies. Sharing a project (the + historical default ``decnet``) meant any teardown blasted every + other scope's containers. + """ + return f"decnet-topo-{topology_id[:8]}" + + async def _resolve_swarm_host(repo, host_uuid: str) -> dict: host = await repo.get_swarm_host_by_uuid(host_uuid) if host is None: @@ -967,6 +988,7 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N lans = hydrated["lans"] compose_path = _topology_compose_path(topology_id) + compose_project = _topology_compose_project(topology_id) if dry_run: # Plan-only: don't touch repo status or Docker — write the compose @@ -1017,6 +1039,7 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N await anyio.to_thread.run_sync( lambda: _compose_with_retry( "up", "--build", "-d", compose_file=compose_path, + project=compose_project, ), ) compose_started = True @@ -1029,7 +1052,8 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N if compose_started or compose_path.exists(): try: _compose( - "down", "--remove-orphans", compose_file=compose_path + "down", "--remove-orphans", compose_file=compose_path, + project=compose_project, ) except Exception as rb_exc: # pragma: no cover log.warning( @@ -1063,7 +1087,7 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N # container isn't running — operators see real state instead of an # optimistic flag. ps_rows = await anyio.to_thread.run_sync( - lambda: _compose_ps(compose_path), + lambda: _compose_ps(compose_path, project=compose_project), ) bad: list[str] = [] # Build the per-decky state map. The base container's compose @@ -1155,12 +1179,14 @@ async def teardown_topology(repo, topology_id: str) -> None: client = docker.from_env() compose_path = _topology_compose_path(topology_id) + compose_project = _topology_compose_project(topology_id) if compose_path.exists(): try: await anyio.to_thread.run_sync( lambda: _compose( "down", "--remove-orphans", compose_file=compose_path, + project=compose_project, ), ) except subprocess.CalledProcessError as exc: diff --git a/decnet/engine/services_live.py b/decnet/engine/services_live.py index cb5dc266..7aa90ac3 100644 --- a/decnet/engine/services_live.py +++ b/decnet/engine/services_live.py @@ -40,7 +40,12 @@ from decnet.web.db.repository import BaseRepository # pattern in decnet.canary.planter for the same reason. -def _compose(*args: str, compose_file: Optional[Path] = None, env=None) -> None: +def _compose( + *args: str, + compose_file: Optional[Path] = None, + env=None, + project: Optional[str] = None, +) -> None: """Indirection so tests can ``monkeypatch.setattr(services_live, '_compose', ...)``. Real implementation lives in :mod:`decnet.engine.deployer`; we @@ -48,10 +53,12 @@ def _compose(*args: str, compose_file: Optional[Path] = None, env=None) -> None: clean (see module docstring above). """ from decnet.engine.deployer import _compose as _real_compose - if compose_file is None: - _real_compose(*args, env=env) - else: - _real_compose(*args, compose_file=compose_file, env=env) + kwargs: dict[str, Any] = {"env": env} + if compose_file is not None: + kwargs["compose_file"] = compose_file + if project is not None: + kwargs["project"] = project + _real_compose(*args, **kwargs) def _topology_compose_path(topology_id: str) -> Path: @@ -59,6 +66,11 @@ def _topology_compose_path(topology_id: str) -> Path: return _real_path(topology_id) +def _topology_compose_project(topology_id: str) -> str: + from decnet.engine.deployer import _topology_compose_project as _real + return _real(topology_id) + + def _write_topology_compose(hydrated, path: Path) -> Path: from decnet.topology.compose import write_topology_compose return write_topology_compose(hydrated, path) @@ -262,12 +274,13 @@ async def _add_topology_service( await _resync_agent_topology(repo, topology_id) else: target = f"{decky_name}-{service_name}" + project = _topology_compose_project(topology_id) # Run compose in a worker thread so the API event loop stays # responsive — same pattern as engine/deployer.deploy_topology. await anyio.to_thread.run_sync( lambda: _compose( "up", "-d", "--no-deps", "--build", target, - compose_file=compose_path, + compose_file=compose_path, project=project, ), ) return services @@ -295,16 +308,21 @@ async def _remove_topology_service( services = [s for s in services if s != service_name] target = f"{decky_name}-{service_name}" compose_path = _topology_compose_path(topology_id) + project = _topology_compose_project(topology_id) agent_pinned = await _topology_is_agent_pinned(repo, topology_id) if not agent_pinned: # Stop + rm before persisting + re-rendering so a half-completed # mutation leaves the operator a clear state to retry from # (container still running; DB still says service is on). await anyio.to_thread.run_sync( - lambda: _compose("stop", target, compose_file=compose_path), + lambda: _compose( + "stop", target, compose_file=compose_path, project=project, + ), ) await anyio.to_thread.run_sync( - lambda: _compose("rm", "-f", target, compose_file=compose_path), + lambda: _compose( + "rm", "-f", target, compose_file=compose_path, project=project, + ), ) await repo.update_topology_decky(decky["uuid"], {"services": services}) await _rerender_topology_compose(repo, topology_id) @@ -568,10 +586,11 @@ async def _update_topology_service_config( await _resync_agent_topology(repo, topology_id) else: target = f"{decky_name}-{service_name}" + project = _topology_compose_project(topology_id) await anyio.to_thread.run_sync( lambda: _compose( "up", "-d", "--no-deps", "--force-recreate", "--build", target, - compose_file=compose_path, + compose_file=compose_path, project=project, ), ) diff --git a/tests/engine/test_services_live.py b/tests/engine/test_services_live.py index c0329d7a..a4b0f0e7 100644 --- a/tests/engine/test_services_live.py +++ b/tests/engine/test_services_live.py @@ -75,7 +75,7 @@ async def test_topology_add_service_persists_and_runs_compose_up( ) -> None: captured: list[tuple[str, ...]] = [] - def fake_compose(*args, compose_file=None, env=None): + def fake_compose(*args, compose_file=None, env=None, project=None): captured.append(args) monkeypatch.setattr(services_live, "_compose", fake_compose) diff --git a/tests/topology/test_compose_project_isolation.py b/tests/topology/test_compose_project_isolation.py new file mode 100644 index 00000000..02a3b9cb --- /dev/null +++ b/tests/topology/test_compose_project_isolation.py @@ -0,0 +1,122 @@ +"""Each topology runs under its own docker compose project. + +The shared ``-p decnet`` project meant that ``--remove-orphans`` on +either a fleet redeploy or a topology teardown swept every container in +the project — wiping sibling topologies and the flat fleet along with +the intended target. Each topology now gets ``decnet-topo-`` so +the orphan sweep is scoped to that one topology. +""" +from __future__ import annotations + +import subprocess +from unittest.mock import patch + +import pytest + +from decnet.engine.deployer import ( + _compose, + _compose_with_retry, + _topology_compose_project, + FLEET_COMPOSE_PROJECT, +) + + +def test_topology_project_name_is_per_topology(): + p1 = _topology_compose_project("abcdef12-3456-7890-aaaa-bbbbbbbbbbbb") + p2 = _topology_compose_project("cafef00d-1111-2222-3333-444444444444") + assert p1 == "decnet-topo-abcdef12" + assert p2 == "decnet-topo-cafef00d" + assert p1 != FLEET_COMPOSE_PROJECT + assert p2 != FLEET_COMPOSE_PROJECT + assert p1 != p2 + + +def _run_compose_capturing_cmd(**kwargs): + """Invoke _compose with subprocess.run mocked and return the argv.""" + fake = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="") + with patch("decnet.engine.deployer.subprocess.run", return_value=fake) as mr: + _compose("down", **kwargs) + assert mr.called + return list(mr.call_args[0][0]) + + +def test_compose_defaults_to_fleet_project(): + cmd = _run_compose_capturing_cmd() + assert "-p" in cmd + assert cmd[cmd.index("-p") + 1] == FLEET_COMPOSE_PROJECT + + +def test_compose_accepts_topology_project(): + project = _topology_compose_project("deadbeef-0000-0000-0000-000000000000") + cmd = _run_compose_capturing_cmd(project=project) + assert "-p" in cmd + assert cmd[cmd.index("-p") + 1] == project + assert cmd[cmd.index("-p") + 1] != FLEET_COMPOSE_PROJECT + + +def test_compose_with_retry_uses_passed_project(): + project = _topology_compose_project("feedface-0000-0000-0000-000000000000") + fake = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="") + with patch("decnet.engine.deployer.subprocess.run", return_value=fake) as mr: + _compose_with_retry("up", "-d", project=project) + cmd = list(mr.call_args[0][0]) + assert "-p" in cmd + assert cmd[cmd.index("-p") + 1] == project + + +@pytest.fixture +async def repo(tmp_path): + from decnet.web.db.factory import get_repository + r = get_repository(db_path=str(tmp_path / "iso.db")) + await r.initialize() + return r + + +@pytest.mark.anyio +async def test_teardown_topology_uses_per_topo_project(repo, tmp_path, monkeypatch): + """The real teardown path must pass the per-topology project so the + fleet (-p decnet) is untouched by the orphan sweep.""" + from decnet.engine.deployer import teardown_topology + from decnet.topology.generator import generate + from decnet.topology.persistence import persist, transition_status + from decnet.topology.status import TopologyStatus + from decnet.topology.config import TopologyConfig + + monkeypatch.chdir(tmp_path) + plan = generate(TopologyConfig( + name="iso", depth=2, branching_factor=2, + deckies_per_lan_min=1, deckies_per_lan_max=1, + cross_edge_probability=0.0, randomize_services=False, + services_explicit=["ssh"], seed=7, + )) + tid = await persist(repo, plan) + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + + # Drop a compose file so teardown's `if compose_path.exists()` branch + # fires and we capture the project argument. + from decnet.engine.deployer import _topology_compose_path + compose_path = _topology_compose_path(tid) + compose_path.write_text("services: {}\n") + + expected_project = _topology_compose_project(tid) + + class _StubClient: + def __init__(self): + self.networks = self + def list(self, names=None, filters=None): # noqa: ARG002 + return [] + + captured_projects: list[str] = [] + + def _fake_compose(*args, compose_file=None, env=None, project=FLEET_COMPOSE_PROJECT): # noqa: ARG001 + captured_projects.append(project) + + with patch("decnet.engine.deployer.docker.from_env", return_value=_StubClient()): + with patch("decnet.engine.deployer._compose", side_effect=_fake_compose): + await teardown_topology(repo, tid) + + assert captured_projects, "teardown should have invoked compose" + assert all(p == expected_project for p in captured_projects), ( + f"teardown leaked into another project: {captured_projects}" + )