From f775223a83349f290a78df9d56473ccdb5383302 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 21:14:48 -0400 Subject: [PATCH] =?UTF-8?q?feat(fleet):=20reconciler=20converges=20JSON=20?= =?UTF-8?q?=E2=86=94=20DB=20=E2=86=94=20docker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds decnet.fleet.reconciler — a pure async function plus a long-lived worker — that periodically reconciles the three sources of truth on a DECNET host: 1. decnet-state.json (CLI-canonical fleet record) 2. fleet_deckies table (DB mirror, written by engine.deployer) 3. docker inspect (actual per-container runtime state) Drift handling: * JSON has X, DB doesn't → INSERT (deploy ran with DB offline) * DB has X (this host), JSON doesn't → DELETE (teardown ran with DB offline) * Both have X, docker disagrees → flip state to running/failed/degraded * Docker socket unreachable → leave existing state alone (don't torch every row to torn_down) Cross-host safety: deletions are scoped to host_uuid for the local host; a master that runs both a local fleet and swarm workers will never clobber a peer's slice. CLI: decnet reconcile --once # one-shot, prints counts decnet reconcile [--interval N] # long-lived worker, mirrors # orchestrator's lifecycle (control # listener + heartbeat + tick loop) Promotes decnet/fleet.py → decnet/fleet/ package so the reconciler can live alongside it without name collision (build_deckies_from_ini and all_service_names re-exported unchanged via __init__.py). 14 new tests cover state aggregation rules, all four drift directions, host_uuid scoping, docker-unreachable safety, and worker shutdown via the bus control event. --- decnet/cli/__init__.py | 3 +- decnet/cli/reconciler.py | 62 ++++++ decnet/{fleet.py => fleet/__init__.py} | 0 decnet/fleet/reconciler.py | 171 +++++++++++++++ decnet/fleet/reconciler_worker.py | 84 ++++++++ tests/fleet/test_reconciler.py | 285 +++++++++++++++++++++++++ tests/fleet/test_reconciler_worker.py | 72 +++++++ 7 files changed, 676 insertions(+), 1 deletion(-) create mode 100644 decnet/cli/reconciler.py rename decnet/{fleet.py => fleet/__init__.py} (100%) create mode 100644 decnet/fleet/reconciler.py create mode 100644 decnet/fleet/reconciler_worker.py create mode 100644 tests/fleet/test_reconciler.py create mode 100644 tests/fleet/test_reconciler_worker.py diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index 45a703e5..2231bea1 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -32,6 +32,7 @@ from . import ( listener, orchestrator, profiler, + reconciler, sniffer, swarm, swarmctl, @@ -55,7 +56,7 @@ for _mod in ( api, swarmctl, agent, updater, listener, forwarder, swarm, deploy, lifecycle, workers, inventory, - web, profiler, orchestrator, sniffer, db, + web, profiler, orchestrator, reconciler, sniffer, db, topology, bus, geoip, init, webhook, ): _mod.register(app) diff --git a/decnet/cli/reconciler.py b/decnet/cli/reconciler.py new file mode 100644 index 00000000..0eba674a --- /dev/null +++ b/decnet/cli/reconciler.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import typer + +from . import utils as _utils +from .utils import console, log + + +def register(app: typer.Typer) -> None: + @app.command(name="reconcile") + def reconcile_cmd( + once: bool = typer.Option( + False, "--once", + help="Run a single reconcile pass and exit (no daemon loop).", + ), + interval: int = typer.Option( + 30, "--interval", "-i", + help="Seconds between reconcile passes (ignored with --once).", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process (long-lived only).", + ), + ) -> None: + """Converge fleet state across decnet-state.json, the DB, and docker.""" + import asyncio + from decnet.web.dependencies import repo + + if once: + from decnet.fleet.reconciler import reconcile_once + + async def _one() -> None: + await repo.initialize() + counts = await reconcile_once(repo) + console.print( + f"[bold cyan]reconcile:[/] " + f"inserted={counts['inserted']} " + f"deleted={counts['deleted']} " + f"state_updated={counts['state_updated']}" + ) + asyncio.run(_one()) + return + + from decnet.fleet.reconciler_worker import fleet_reconciler_worker + + if daemon: + log.info("reconciler daemonizing interval=%d", interval) + _utils._daemonize() + + log.info("reconciler starting interval=%d", interval) + console.print( + f"[bold cyan]Fleet reconciler starting[/] (interval: {interval}s)" + ) + + async def _run() -> None: + await repo.initialize() + await fleet_reconciler_worker(repo, interval=interval) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Reconciler stopped.[/]") diff --git a/decnet/fleet.py b/decnet/fleet/__init__.py similarity index 100% rename from decnet/fleet.py rename to decnet/fleet/__init__.py diff --git a/decnet/fleet/reconciler.py b/decnet/fleet/reconciler.py new file mode 100644 index 00000000..e54d2e99 --- /dev/null +++ b/decnet/fleet/reconciler.py @@ -0,0 +1,171 @@ +"""Fleet reconciler — converges JSON ↔ DB ↔ docker. + +Three sources of truth on a DECNET host can disagree: + +1. ``decnet-state.json`` — written by ``engine.deployer.deploy/teardown``; + the canonical record for offline / no-API consumers (``decnet status``, + ``decnet teardown``, sniffer, collector). +2. ``fleet_deckies`` table — DB mirror written by the same deployer; what + the orchestrator, web dashboard, and REST API see. +3. ``docker inspect`` — actual per-container runtime state. + +Drift sources we accept and correct: + +* CLI deploy on a host whose DB was unreachable → JSON ahead of DB. +* CLI teardown on a host whose DB was unreachable → DB ahead of JSON. +* Operator hand-edited ``decnet-state.json`` → JSON ahead of DB. +* Container crashed / was killed externally → DB state stale until docker + is observed. + +Resolution: + +* JSON has X, DB doesn't → INSERT. +* DB has X (this host), JSON doesn't → DELETE. +* Both have X → state := docker-aggregated state. + +Cross-host safety: deletions are scoped to ``host_uuid == this host``. +A multi-host master that runs swarm workers (each with their own +reconciler) must never delete a peer's rows. + +The reconciler intentionally does NOT publish bus events for state +changes today — the dashboard reads the DB on every render. A +``fleet.{name}.state`` topic is a natural follow-up if SSE consumers +appear, but is out of scope for this PR. +""" +from __future__ import annotations + +import asyncio +from typing import Any, Callable, Optional + +from decnet.config import DecnetConfig, load_state as _real_load_state +from decnet.logging import get_logger +from decnet.web.db.models import LOCAL_HOST_SENTINEL +from decnet.web.db.repository import BaseRepository + +logger = get_logger("fleet.reconciler") + + +# ── docker observation ──────────────────────────────────────────────────────── + +def _collect_container_states( + docker_client_factory: Optional[Callable[[], Any]] = None, +) -> Optional[dict[str, str]]: + """Return ``{container_name: status}`` or ``None`` if docker is unreachable. + + ``None`` is the explicit "unknown" signal — callers must NOT treat + docker failure as "every container is gone" (that would torch every + fleet row to ``torn_down`` whenever the docker socket is busy). + """ + if docker_client_factory is None: + try: + import docker # local import — keeps tests import-clean + docker_client_factory = docker.from_env + except ImportError: + return None + try: + client = docker_client_factory() + return { + c.name: c.status + for c in client.containers.list(all=True, ignore_removed=True) + } + except Exception as exc: # noqa: BLE001 + logger.debug("reconciler: docker query failed: %s", exc) + return None + + +def _aggregate_decky_state( + decky_name: str, + services: list[str], + container_states: dict[str, str], +) -> str: + """Aggregate per-decky state from per-service container statuses. + + ``running`` — every expected service container is ``running``. + ``failed`` — every observed container is non-running. + ``degraded`` — partial: some running, some not (or some missing). + ``torn_down`` — no expected container observed at all. + """ + expected = {f"{decky_name}-{svc.replace('_', '-')}" for svc in services} + seen = {n: s for n, s in container_states.items() if n in expected} + if not seen: + return "torn_down" + statuses = set(seen.values()) + if statuses == {"running"} and len(seen) == len(expected): + return "running" + if "running" not in statuses: + return "failed" + return "degraded" + + +# ── reconcile pass ──────────────────────────────────────────────────────────── + +async def reconcile_once( + repo: BaseRepository, + *, + host_uuid: str = LOCAL_HOST_SENTINEL, + load_state_fn: Callable[[], Optional[tuple[DecnetConfig, Any]]] = _real_load_state, + docker_client_factory: Optional[Callable[[], Any]] = None, +) -> dict[str, int]: + """Single reconciliation pass. Returns counts of work done.""" + counts = {"inserted": 0, "deleted": 0, "state_updated": 0} + + state = await asyncio.to_thread(load_state_fn) + json_deckies: list[Any] = list(state[0].deckies) if state else [] + + db_rows = await repo.list_fleet_deckies(host_uuid=host_uuid) + db_by_name = {r["name"]: r for r in db_rows} + + container_states = await asyncio.to_thread( + _collect_container_states, docker_client_factory, + ) + docker_known = container_states is not None + + json_names = {d.name for d in json_deckies} + + # 1. INSERT: present in JSON, absent from DB. + for d in json_deckies: + if d.name in db_by_name: + continue + new_state = ( + _aggregate_decky_state(d.name, list(d.services), container_states) + if docker_known else "running" + ) + await repo.upsert_fleet_decky({ + "host_uuid": d.host_uuid or host_uuid, + "name": d.name, + "services": list(d.services), + "decky_config": d.model_dump(mode="json"), + "decky_ip": d.ip, + "state": new_state, + }) + counts["inserted"] += 1 + + # 2. DELETE: present in DB (this host), absent from JSON. + # Scoped to host_uuid by list_fleet_deckies(host_uuid=...) call above — + # peer-host rows are never visible here, so we can't accidentally + # clobber another worker's slice. + for r in db_rows: + if r["name"] not in json_names: + await repo.delete_fleet_decky( + host_uuid=r["host_uuid"], name=r["name"], + ) + counts["deleted"] += 1 + + # 3. STATE: present in both, docker says something fresh. + if docker_known: + for d in json_deckies: + existing = db_by_name.get(d.name) + if existing is None: + continue # already handled in step 1 + new_state = _aggregate_decky_state( + d.name, list(d.services), container_states, + ) + if existing.get("state") != new_state: + await repo.update_fleet_decky_state( + host_uuid=existing["host_uuid"], + name=d.name, + state=new_state, + ) + counts["state_updated"] += 1 + + return counts diff --git a/decnet/fleet/reconciler_worker.py b/decnet/fleet/reconciler_worker.py new file mode 100644 index 00000000..e2aeae80 --- /dev/null +++ b/decnet/fleet/reconciler_worker.py @@ -0,0 +1,84 @@ +"""Long-lived periodic reconciler worker. + +Modeled on :mod:`decnet.orchestrator.worker`: same control listener, same +heartbeat helper, same shutdown semantics. One tick = one +:func:`reconcile_once` pass. + +Default interval is short (30s) because reconciliation is cheap when +nothing has drifted (three reads, no writes), and a short cadence keeps +the dashboard's view of crashed containers fresh. +""" +from __future__ import annotations + +import asyncio +import contextlib + +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + run_control_listener, + run_health_heartbeat, +) +from decnet.fleet.reconciler import reconcile_once +from decnet.logging import get_logger +from decnet.web.db.models import LOCAL_HOST_SENTINEL +from decnet.web.db.repository import BaseRepository + +logger = get_logger("fleet.reconciler") + + +async def fleet_reconciler_worker( + repo: BaseRepository, + *, + interval: int = 30, + host_uuid: str = LOCAL_HOST_SENTINEL, +) -> None: + """Periodically converge JSON ↔ DB ↔ docker for the local host. + + Honours the bus control topic (``system.reconciler.control``) for + graceful shutdown — same lifecycle contract as every other DECNET + worker. + """ + logger.info("fleet reconciler started interval=%ds host=%s", interval, host_uuid) + + bus = None + try: + bus = get_bus(client_name="reconciler") + await bus.connect() + except Exception as exc: # noqa: BLE001 + logger.warning( + "reconciler: bus unavailable, continuing without publish: %s", exc, + ) + bus = None + + shutdown = asyncio.Event() + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "reconciler")) + control_task = asyncio.create_task( + run_control_listener(bus, "reconciler", shutdown), + ) + + try: + while not shutdown.is_set(): + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + except asyncio.TimeoutError: + pass # normal tick + if shutdown.is_set(): + break + try: + counts = await reconcile_once(repo, host_uuid=host_uuid) + if any(counts.values()): + logger.info( + "reconcile inserted=%d deleted=%d state_updated=%d", + counts["inserted"], counts["deleted"], + counts["state_updated"], + ) + except Exception as exc: # noqa: BLE001 + logger.error("reconcile tick failed: %s", exc) + finally: + for t in (heartbeat_task, control_task): + t.cancel() + with contextlib.suppress(Exception, asyncio.CancelledError): + await t + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() diff --git a/tests/fleet/test_reconciler.py b/tests/fleet/test_reconciler.py new file mode 100644 index 00000000..abfdf53e --- /dev/null +++ b/tests/fleet/test_reconciler.py @@ -0,0 +1,285 @@ +"""Tests for decnet.fleet.reconciler — pure-function reconcile pass. + +Uses a fake repository (in-memory dict) and a stub docker client so the +suite never touches MySQL/SQLite or a real docker socket. +""" +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from decnet.config import DeckyConfig, DecnetConfig +from decnet.fleet.reconciler import ( + _aggregate_decky_state, + reconcile_once, +) + + +# ── Fakes ───────────────────────────────────────────────────────────────────── + +class FakeRepo: + """Minimal in-memory stand-in for the fleet portion of BaseRepository.""" + + def __init__(self, rows: list[dict[str, Any]] | None = None): + self.rows = list(rows or []) + self.upserts: list[dict] = [] + self.deletes: list[tuple[str, str]] = [] + self.state_updates: list[dict] = [] + + async def list_fleet_deckies(self, *, host_uuid: str | None = None): + return [ + r for r in self.rows + if host_uuid is None or r.get("host_uuid") == host_uuid + ] + + async def upsert_fleet_decky(self, data: dict[str, Any]) -> None: + self.upserts.append(data) + # Reflect into rows so subsequent calls see it + self.rows = [ + r for r in self.rows + if not (r["host_uuid"] == data["host_uuid"] and r["name"] == data["name"]) + ] + self.rows.append(data) + + async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None: + self.deletes.append((host_uuid, name)) + self.rows = [ + r for r in self.rows + if not (r["host_uuid"] == host_uuid and r["name"] == name) + ] + + async def update_fleet_decky_state( + self, *, host_uuid: str, name: str, state: str, + last_error: str | None = None, + ) -> None: + self.state_updates.append({ + "host_uuid": host_uuid, "name": name, "state": state, + }) + for r in self.rows: + if r["host_uuid"] == host_uuid and r["name"] == name: + r["state"] = state + + +def _decky(name: str = "decky-01", ip: str = "10.0.0.10", + services: list[str] | None = None) -> DeckyConfig: + return DeckyConfig( + name=name, ip=ip, services=services or ["ssh"], + distro="debian", base_image="debian", hostname="h", + build_base="debian:bookworm-slim", nmap_os="linux", + ) + + +def _config(deckies: list[DeckyConfig]) -> DecnetConfig: + return DecnetConfig( + mode="unihost", interface="eth0", subnet="10.0.0.0/24", + gateway="10.0.0.1", deckies=deckies, ipvlan=False, + ) + + +def _state_loader(deckies: list[DeckyConfig] | None): + """Return a fake load_state callable.""" + if deckies is None: + return lambda: None + return lambda: (_config(deckies), None) + + +def _docker_factory(container_states: dict[str, str]): + """Return a docker client factory that yields the given container states. + + The factory's product mimics ``docker.from_env()`` enough that + ``_collect_container_states`` can iterate ``client.containers.list(...)``. + """ + containers = [ + type("C", (), {"name": name, "status": status})() + for name, status in container_states.items() + ] + client = MagicMock() + client.containers.list.return_value = containers + return lambda: client + + +# ── _aggregate_decky_state ──────────────────────────────────────────────────── + +class TestAggregate: + def test_all_running(self): + s = _aggregate_decky_state("d", ["ssh", "http"], { + "d-ssh": "running", "d-http": "running", + }) + assert s == "running" + + def test_partial_running_is_degraded(self): + s = _aggregate_decky_state("d", ["ssh", "http"], { + "d-ssh": "running", "d-http": "exited", + }) + assert s == "degraded" + + def test_one_service_missing_is_degraded(self): + s = _aggregate_decky_state("d", ["ssh", "http"], { + "d-ssh": "running", # d-http never started + }) + assert s == "degraded" + + def test_all_dead_is_failed(self): + s = _aggregate_decky_state("d", ["ssh"], {"d-ssh": "exited"}) + assert s == "failed" + + def test_no_containers_is_torn_down(self): + assert _aggregate_decky_state("d", ["ssh"], {}) == "torn_down" + + def test_underscore_in_service_name_normalized_to_dash(self): + # The deployer creates container "-" with underscores + # rewritten to dashes (see deployer.status()). Aggregate must + # follow the same convention or it'll never match. + s = _aggregate_decky_state("d", ["smtp_relay"], { + "d-smtp-relay": "running", + }) + assert s == "running" + + +# ── reconcile_once ──────────────────────────────────────────────────────────── + +@pytest.mark.anyio +@pytest.fixture +def anyio_backend(): + return "asyncio" + + +class TestReconcileOnce: + @pytest.mark.anyio + async def test_inserts_when_json_has_decky_db_does_not(self): + repo = FakeRepo() # DB empty + d = _decky(name="solo", ip="10.0.0.5", services=["ssh"]) + counts = await reconcile_once( + repo, + load_state_fn=_state_loader([d]), + docker_client_factory=_docker_factory({"solo-ssh": "running"}), + ) + assert counts == {"inserted": 1, "deleted": 0, "state_updated": 0} + assert len(repo.upserts) == 1 + u = repo.upserts[0] + assert u["host_uuid"] == "local" + assert u["name"] == "solo" + assert u["services"] == ["ssh"] + assert u["decky_ip"] == "10.0.0.5" + assert u["state"] == "running" + + @pytest.mark.anyio + async def test_deletes_when_db_has_decky_json_does_not(self): + repo = FakeRepo([ + {"host_uuid": "local", "name": "ghost", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.99"}, + ]) + counts = await reconcile_once( + repo, + load_state_fn=lambda: None, # no JSON state + docker_client_factory=_docker_factory({}), + ) + assert counts == {"inserted": 0, "deleted": 1, "state_updated": 0} + assert repo.deletes == [("local", "ghost")] + + @pytest.mark.anyio + async def test_updates_state_when_docker_disagrees(self): + repo = FakeRepo([ + {"host_uuid": "local", "name": "d1", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.10"}, + ]) + d = _decky(name="d1", services=["ssh"]) + counts = await reconcile_once( + repo, + load_state_fn=_state_loader([d]), + docker_client_factory=_docker_factory({"d1-ssh": "exited"}), + ) + assert counts == {"inserted": 0, "deleted": 0, "state_updated": 1} + assert repo.state_updates[0]["state"] == "failed" + + @pytest.mark.anyio + async def test_no_writes_when_already_converged(self): + repo = FakeRepo([ + {"host_uuid": "local", "name": "d1", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.10"}, + ]) + d = _decky(name="d1", services=["ssh"]) + counts = await reconcile_once( + repo, + load_state_fn=_state_loader([d]), + docker_client_factory=_docker_factory({"d1-ssh": "running"}), + ) + assert counts == {"inserted": 0, "deleted": 0, "state_updated": 0} + assert repo.upserts == [] and repo.deletes == [] + assert repo.state_updates == [] + + @pytest.mark.anyio + async def test_skips_state_updates_when_docker_unreachable(self): + """Docker socket failure must not torch every row to torn_down — + the reconciler returns ``None`` from _collect_container_states and + leaves existing DB state alone.""" + repo = FakeRepo([ + {"host_uuid": "local", "name": "d1", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.10"}, + ]) + d = _decky(name="d1", services=["ssh"]) + + def broken_factory(): + raise RuntimeError("docker socket unreachable") + + counts = await reconcile_once( + repo, + load_state_fn=_state_loader([d]), + docker_client_factory=broken_factory, + ) + assert counts == {"inserted": 0, "deleted": 0, "state_updated": 0} + assert repo.state_updates == [] + + @pytest.mark.anyio + async def test_host_uuid_scoping_protects_peer_rows(self): + """A reconcile on host A must NOT delete rows belonging to host B.""" + repo = FakeRepo([ + {"host_uuid": "host-a", "name": "d1", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.10"}, + {"host_uuid": "host-b", "name": "d2", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.1.10"}, + ]) + # Reconciling on host-a with no JSON state + counts = await reconcile_once( + repo, + host_uuid="host-a", + load_state_fn=lambda: None, + docker_client_factory=_docker_factory({}), + ) + assert counts["deleted"] == 1 + # Only host-a's row was touched + assert repo.deletes == [("host-a", "d1")] + # host-b's row survives + assert any(r["host_uuid"] == "host-b" for r in repo.rows) + + @pytest.mark.anyio + async def test_combined_drift_in_one_pass(self): + """JSON has new decky AND DB has stale decky AND third decky's + container died — all three converge in a single tick.""" + repo = FakeRepo([ + {"host_uuid": "local", "name": "stale", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.99"}, + {"host_uuid": "local", "name": "d-existing", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.20"}, + ]) + json_deckies = [ + _decky(name="d-new", ip="10.0.0.30", services=["http"]), + _decky(name="d-existing", ip="10.0.0.20", services=["ssh"]), + ] + counts = await reconcile_once( + repo, + load_state_fn=_state_loader(json_deckies), + docker_client_factory=_docker_factory({ + "d-new-http": "running", + "d-existing-ssh": "exited", # crashed + }), + ) + assert counts == {"inserted": 1, "deleted": 1, "state_updated": 1} + names_inserted = [u["name"] for u in repo.upserts] + assert "d-new" in names_inserted + assert ("local", "stale") in repo.deletes + assert any(s["name"] == "d-existing" and s["state"] == "failed" + for s in repo.state_updates) diff --git a/tests/fleet/test_reconciler_worker.py b/tests/fleet/test_reconciler_worker.py new file mode 100644 index 00000000..a7b97429 --- /dev/null +++ b/tests/fleet/test_reconciler_worker.py @@ -0,0 +1,72 @@ +"""Worker shutdown smoke test for fleet_reconciler_worker. + +The reconcile logic itself is exercised in test_reconciler.py. This file +just verifies the worker's lifecycle wrapper (control listener + heartbeat ++ tick loop) exits cleanly when the bus shutdown signal fires. +""" +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest + +from decnet.fleet.reconciler_worker import fleet_reconciler_worker + + +class _FakeRepo: + async def list_fleet_deckies(self, *, host_uuid=None): + return [] + async def upsert_fleet_decky(self, data): pass + async def delete_fleet_decky(self, **kw): pass + async def update_fleet_decky_state(self, **kw): pass + + +@pytest.mark.anyio +async def test_worker_exits_on_shutdown_event(monkeypatch): + # Patch the bus + control listener so the worker doesn't try to bind + # to a real socket. The control_task will set `shutdown` once we fire it. + fake_bus = AsyncMock() + monkeypatch.setattr( + "decnet.fleet.reconciler_worker.get_bus", + lambda **kw: fake_bus, + ) + + captured: dict = {} + + async def _capturing_control_listener(bus, name, shutdown_event): + captured["shutdown_event"] = shutdown_event + # Hold the event loop briefly so the worker enters its tick wait, + # then trigger shutdown. + await asyncio.sleep(0.05) + shutdown_event.set() + + async def _noop_heartbeat(bus, name): + await asyncio.sleep(3600) # never returns naturally + + monkeypatch.setattr( + "decnet.fleet.reconciler_worker.run_control_listener", + _capturing_control_listener, + ) + monkeypatch.setattr( + "decnet.fleet.reconciler_worker.run_health_heartbeat", + _noop_heartbeat, + ) + # Skip docker observation entirely — we just need the loop to exit. + monkeypatch.setattr( + "decnet.fleet.reconciler._real_load_state", + lambda: None, + ) + with patch("decnet.fleet.reconciler._collect_container_states", + return_value=None): + # interval=10 (long) so we exit via shutdown, not via tick completion + await asyncio.wait_for( + fleet_reconciler_worker(_FakeRepo(), interval=10), + timeout=2.0, + ) + assert captured["shutdown_event"].is_set() + + +@pytest.fixture +def anyio_backend(): + return "asyncio"