diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index fd349d26..52fb9881 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -2,6 +2,7 @@ Deploy, teardown, and status via Docker SDK + subprocess docker compose. """ +import asyncio import shutil import subprocess # nosec B404 import time @@ -147,6 +148,39 @@ def _compose_with_retry( raise last_exc +def _emit_lifecycle_event( + *, + decky_name: str, + old_services: list[str], + new_services: list[str], + trigger: str, +) -> None: + """Fire a ``decky_mutated`` event from a sync code path. + + Deploy/teardown are sync functions; ``emit_decky_mutated`` is async + because its bus half awaits. Bus is ``None`` here (CLI has no live + client), so only the syslog side actually does work — but running + the coroutine keeps the emission site a single call regardless. + Soft-fails: a missing log path or broken bus must not abort the + deploy. The import is lazy to dodge the circular dependency between + ``decnet.mutator`` (which imports engine helpers) and this module. + """ + try: + from decnet.mutator.events import emit_decky_mutated + asyncio.run( + emit_decky_mutated( + bus=None, + decky=decky_name, + old_services=old_services, + new_services=new_services, + trigger=trigger, # type: ignore[arg-type] + ) + ) + except Exception as exc: # noqa: BLE001 + log.warning("lifecycle event emission failed decky=%s trigger=%s: %s", + decky_name, trigger, exc) + + @_traced("engine.deploy") def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None: log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run) @@ -192,6 +226,17 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, save_state(config, compose_path) + # Emit one creation event per decky so the correlation graph has a + # well-formed lifecycle start (old_services=[] ⇒ new_services=). + # Bus is None here — the syslog line is what the correlator consumes. + for decky in config.deckies: + _emit_lifecycle_event( + decky_name=decky.name, + old_services=[], + new_services=list(decky.services), + trigger="creation", + ) + # Pre-up cleanup: a prior half-failed `up` can leave containers still # holding the IPs/ports this run wants, which surfaces as the recurring # "Address already in use" from Docker's IPAM. Best-effort — ignore @@ -242,9 +287,22 @@ def teardown(decky_id: str | None = None) -> None: if not svc_names: log.warning("teardown: decky %s has no services to stop", decky_id) return + _emit_lifecycle_event( + decky_name=decky.name, + old_services=list(decky.services), + new_services=[], + trigger="retirement", + ) _compose("stop", *svc_names, compose_file=compose_path) _compose("rm", "-f", *svc_names, compose_file=compose_path) else: + for decky in config.deckies: + _emit_lifecycle_event( + decky_name=decky.name, + old_services=list(decky.services), + new_services=[], + trigger="retirement", + ) _compose("down", compose_file=compose_path) ip_list = [d.ip for d in config.deckies] diff --git a/tests/test_deployer.py b/tests/test_deployer.py index d220a2d0..bf244310 100644 --- a/tests/test_deployer.py +++ b/tests/test_deployer.py @@ -168,6 +168,57 @@ class TestDeploy: mock_save.assert_called_once() mock_retry.assert_called() + @patch("decnet.engine.deployer._emit_lifecycle_event") + @patch("decnet.engine.deployer._print_status") + @patch("decnet.engine.deployer._compose_with_retry") + @patch("decnet.engine.deployer.save_state") + @patch("decnet.engine.deployer.write_compose", return_value=Path("test.yml")) + @patch("decnet.engine.deployer._sync_logging_helper") + @patch("decnet.engine.deployer.setup_host_macvlan") + @patch("decnet.engine.deployer.create_macvlan_network") + @patch("decnet.engine.deployer.get_host_ip", return_value="192.168.1.2") + @patch("decnet.engine.deployer.ips_to_range", return_value="192.168.1.10/32") + @patch("decnet.engine.deployer.docker.from_env") + def test_emits_creation_event_per_decky( + self, mock_docker, mock_range, mock_hip, mock_create, mock_setup, + mock_sync, mock_compose, mock_save, mock_retry, mock_print, mock_emit, + ): + from decnet.engine.deployer import deploy + deckies = [ + _decky(name="decky-01", ip="192.168.1.10", services=["ssh"]), + _decky(name="decky-02", ip="192.168.1.11", services=["http", "ftp"]), + ] + deploy(_config(deckies=deckies)) + assert mock_emit.call_count == 2 + triggers = [c.kwargs["trigger"] for c in mock_emit.call_args_list] + assert triggers == ["creation", "creation"] + names = [c.kwargs["decky_name"] for c in mock_emit.call_args_list] + assert names == ["decky-01", "decky-02"] + # empty-set symmetry: creation has old=[] ⇒ new= + for call in mock_emit.call_args_list: + assert call.kwargs["old_services"] == [] + assert mock_emit.call_args_list[0].kwargs["new_services"] == ["ssh"] + assert mock_emit.call_args_list[1].kwargs["new_services"] == ["http", "ftp"] + + @patch("decnet.engine.deployer._emit_lifecycle_event") + @patch("decnet.engine.deployer._print_status") + @patch("decnet.engine.deployer._compose_with_retry") + @patch("decnet.engine.deployer.save_state") + @patch("decnet.engine.deployer.write_compose", return_value=Path("test.yml")) + @patch("decnet.engine.deployer._sync_logging_helper") + @patch("decnet.engine.deployer.setup_host_macvlan") + @patch("decnet.engine.deployer.create_macvlan_network") + @patch("decnet.engine.deployer.get_host_ip", return_value="192.168.1.2") + @patch("decnet.engine.deployer.ips_to_range", return_value="192.168.1.10/32") + @patch("decnet.engine.deployer.docker.from_env") + def test_dry_run_skips_creation_events( + self, mock_docker, mock_range, mock_hip, mock_create, mock_setup, + mock_sync, mock_compose, mock_save, mock_retry, mock_print, mock_emit, + ): + from decnet.engine.deployer import deploy + deploy(_config(), dry_run=True) + mock_emit.assert_not_called() + @patch("decnet.engine.deployer._print_status") @patch("decnet.engine.deployer._compose_with_retry") @patch("decnet.engine.deployer.save_state") @@ -296,6 +347,53 @@ class TestTeardown: for name in svc_names: assert "[" not in name and "'" not in name + @patch("decnet.engine.deployer._emit_lifecycle_event") + @patch("decnet.engine.deployer.clear_state") + @patch("decnet.engine.deployer.remove_macvlan_network") + @patch("decnet.engine.deployer.teardown_host_macvlan") + @patch("decnet.engine.deployer._compose") + @patch("decnet.engine.deployer.ips_to_range", return_value="192.168.1.10/32") + @patch("decnet.engine.deployer.docker.from_env") + @patch("decnet.engine.deployer.load_state") + def test_full_teardown_emits_retirement_per_decky( + self, mock_load, mock_docker, mock_range, mock_compose, + mock_td_macvlan, mock_rm_net, mock_clear, mock_emit, + ): + deckies = [ + _decky(name="decky-01", ip="192.168.1.10", services=["ssh"]), + _decky(name="decky-02", ip="192.168.1.11", services=["http"]), + ] + mock_load.return_value = (_config(deckies=deckies), Path("test.yml")) + from decnet.engine.deployer import teardown + teardown() + assert mock_emit.call_count == 2 + for call in mock_emit.call_args_list: + assert call.kwargs["trigger"] == "retirement" + assert call.kwargs["new_services"] == [] + assert mock_emit.call_args_list[0].kwargs["old_services"] == ["ssh"] + assert mock_emit.call_args_list[1].kwargs["old_services"] == ["http"] + + @patch("decnet.engine.deployer._emit_lifecycle_event") + @patch("decnet.engine.deployer._compose") + @patch("decnet.engine.deployer.docker.from_env") + @patch("decnet.engine.deployer.load_state") + def test_single_decky_teardown_emits_one_retirement( + self, mock_load, mock_docker, mock_compose, mock_emit, + ): + deckies = [ + _decky(name="decky-01", ip="192.168.1.10", services=["ssh", "ftp"]), + _decky(name="decky-02", ip="192.168.1.11", services=["http"]), + ] + mock_load.return_value = (_config(deckies=deckies), Path("test.yml")) + from decnet.engine.deployer import teardown + teardown(decky_id="decky-01") + assert mock_emit.call_count == 1 + call = mock_emit.call_args_list[0] + assert call.kwargs["decky_name"] == "decky-01" + assert call.kwargs["trigger"] == "retirement" + assert call.kwargs["old_services"] == ["ssh", "ftp"] + assert call.kwargs["new_services"] == [] + @patch("decnet.engine.deployer._compose") @patch("decnet.engine.deployer.docker.from_env") @patch("decnet.engine.deployer.load_state")