feat(engine): emit creation/retirement mutation events on deploy/teardown

Close the lifecycle loop for the correlation graph: every decky now
enters the substrate with an explicit `trigger=creation` event
(old_services=[] ⇒ new_services=<initial>) and leaves it with
`trigger=retirement` (old=<current> ⇒ new=[]).  With scheduled/operator
mutations already flowing through emit_decky_mutated, the entire decky
lifecycle is now a well-formed sequence of mutation events — the
correlator can fold substrate_state(t) at any T by replaying them.

Lazy-imports mutator.events to dodge the engine↔mutator circular
dependency.  Bus is None at CLI sites; the syslog write is what the
correlator consumes.  Emission is soft-failing so a broken log path
never aborts a deploy.
This commit is contained in:
2026-04-21 19:35:05 -04:00
parent fa0cdb3ab5
commit bf5ed7abbb
2 changed files with 156 additions and 0 deletions

View File

@@ -2,6 +2,7 @@
Deploy, teardown, and status via Docker SDK + subprocess docker compose.
"""
import asyncio
import shutil
import subprocess # nosec B404
import time
@@ -147,6 +148,39 @@ def _compose_with_retry(
raise last_exc
def _emit_lifecycle_event(
*,
decky_name: str,
old_services: list[str],
new_services: list[str],
trigger: str,
) -> None:
"""Fire a ``decky_mutated`` event from a sync code path.
Deploy/teardown are sync functions; ``emit_decky_mutated`` is async
because its bus half awaits. Bus is ``None`` here (CLI has no live
client), so only the syslog side actually does work — but running
the coroutine keeps the emission site a single call regardless.
Soft-fails: a missing log path or broken bus must not abort the
deploy. The import is lazy to dodge the circular dependency between
``decnet.mutator`` (which imports engine helpers) and this module.
"""
try:
from decnet.mutator.events import emit_decky_mutated
asyncio.run(
emit_decky_mutated(
bus=None,
decky=decky_name,
old_services=old_services,
new_services=new_services,
trigger=trigger, # type: ignore[arg-type]
)
)
except Exception as exc: # noqa: BLE001
log.warning("lifecycle event emission failed decky=%s trigger=%s: %s",
decky_name, trigger, exc)
@_traced("engine.deploy")
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
@@ -192,6 +226,17 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
save_state(config, compose_path)
# Emit one creation event per decky so the correlation graph has a
# well-formed lifecycle start (old_services=[] ⇒ new_services=<initial>).
# Bus is None here — the syslog line is what the correlator consumes.
for decky in config.deckies:
_emit_lifecycle_event(
decky_name=decky.name,
old_services=[],
new_services=list(decky.services),
trigger="creation",
)
# Pre-up cleanup: a prior half-failed `up` can leave containers still
# holding the IPs/ports this run wants, which surfaces as the recurring
# "Address already in use" from Docker's IPAM. Best-effort — ignore
@@ -242,9 +287,22 @@ def teardown(decky_id: str | None = None) -> None:
if not svc_names:
log.warning("teardown: decky %s has no services to stop", decky_id)
return
_emit_lifecycle_event(
decky_name=decky.name,
old_services=list(decky.services),
new_services=[],
trigger="retirement",
)
_compose("stop", *svc_names, compose_file=compose_path)
_compose("rm", "-f", *svc_names, compose_file=compose_path)
else:
for decky in config.deckies:
_emit_lifecycle_event(
decky_name=decky.name,
old_services=list(decky.services),
new_services=[],
trigger="retirement",
)
_compose("down", compose_file=compose_path)
ip_list = [d.ip for d in config.deckies]

View File

@@ -168,6 +168,57 @@ class TestDeploy:
mock_save.assert_called_once()
mock_retry.assert_called()
@patch("decnet.engine.deployer._emit_lifecycle_event")
@patch("decnet.engine.deployer._print_status")
@patch("decnet.engine.deployer._compose_with_retry")
@patch("decnet.engine.deployer.save_state")
@patch("decnet.engine.deployer.write_compose", return_value=Path("test.yml"))
@patch("decnet.engine.deployer._sync_logging_helper")
@patch("decnet.engine.deployer.setup_host_macvlan")
@patch("decnet.engine.deployer.create_macvlan_network")
@patch("decnet.engine.deployer.get_host_ip", return_value="192.168.1.2")
@patch("decnet.engine.deployer.ips_to_range", return_value="192.168.1.10/32")
@patch("decnet.engine.deployer.docker.from_env")
def test_emits_creation_event_per_decky(
self, mock_docker, mock_range, mock_hip, mock_create, mock_setup,
mock_sync, mock_compose, mock_save, mock_retry, mock_print, mock_emit,
):
from decnet.engine.deployer import deploy
deckies = [
_decky(name="decky-01", ip="192.168.1.10", services=["ssh"]),
_decky(name="decky-02", ip="192.168.1.11", services=["http", "ftp"]),
]
deploy(_config(deckies=deckies))
assert mock_emit.call_count == 2
triggers = [c.kwargs["trigger"] for c in mock_emit.call_args_list]
assert triggers == ["creation", "creation"]
names = [c.kwargs["decky_name"] for c in mock_emit.call_args_list]
assert names == ["decky-01", "decky-02"]
# empty-set symmetry: creation has old=[] ⇒ new=<initial>
for call in mock_emit.call_args_list:
assert call.kwargs["old_services"] == []
assert mock_emit.call_args_list[0].kwargs["new_services"] == ["ssh"]
assert mock_emit.call_args_list[1].kwargs["new_services"] == ["http", "ftp"]
@patch("decnet.engine.deployer._emit_lifecycle_event")
@patch("decnet.engine.deployer._print_status")
@patch("decnet.engine.deployer._compose_with_retry")
@patch("decnet.engine.deployer.save_state")
@patch("decnet.engine.deployer.write_compose", return_value=Path("test.yml"))
@patch("decnet.engine.deployer._sync_logging_helper")
@patch("decnet.engine.deployer.setup_host_macvlan")
@patch("decnet.engine.deployer.create_macvlan_network")
@patch("decnet.engine.deployer.get_host_ip", return_value="192.168.1.2")
@patch("decnet.engine.deployer.ips_to_range", return_value="192.168.1.10/32")
@patch("decnet.engine.deployer.docker.from_env")
def test_dry_run_skips_creation_events(
self, mock_docker, mock_range, mock_hip, mock_create, mock_setup,
mock_sync, mock_compose, mock_save, mock_retry, mock_print, mock_emit,
):
from decnet.engine.deployer import deploy
deploy(_config(), dry_run=True)
mock_emit.assert_not_called()
@patch("decnet.engine.deployer._print_status")
@patch("decnet.engine.deployer._compose_with_retry")
@patch("decnet.engine.deployer.save_state")
@@ -296,6 +347,53 @@ class TestTeardown:
for name in svc_names:
assert "[" not in name and "'" not in name
@patch("decnet.engine.deployer._emit_lifecycle_event")
@patch("decnet.engine.deployer.clear_state")
@patch("decnet.engine.deployer.remove_macvlan_network")
@patch("decnet.engine.deployer.teardown_host_macvlan")
@patch("decnet.engine.deployer._compose")
@patch("decnet.engine.deployer.ips_to_range", return_value="192.168.1.10/32")
@patch("decnet.engine.deployer.docker.from_env")
@patch("decnet.engine.deployer.load_state")
def test_full_teardown_emits_retirement_per_decky(
self, mock_load, mock_docker, mock_range, mock_compose,
mock_td_macvlan, mock_rm_net, mock_clear, mock_emit,
):
deckies = [
_decky(name="decky-01", ip="192.168.1.10", services=["ssh"]),
_decky(name="decky-02", ip="192.168.1.11", services=["http"]),
]
mock_load.return_value = (_config(deckies=deckies), Path("test.yml"))
from decnet.engine.deployer import teardown
teardown()
assert mock_emit.call_count == 2
for call in mock_emit.call_args_list:
assert call.kwargs["trigger"] == "retirement"
assert call.kwargs["new_services"] == []
assert mock_emit.call_args_list[0].kwargs["old_services"] == ["ssh"]
assert mock_emit.call_args_list[1].kwargs["old_services"] == ["http"]
@patch("decnet.engine.deployer._emit_lifecycle_event")
@patch("decnet.engine.deployer._compose")
@patch("decnet.engine.deployer.docker.from_env")
@patch("decnet.engine.deployer.load_state")
def test_single_decky_teardown_emits_one_retirement(
self, mock_load, mock_docker, mock_compose, mock_emit,
):
deckies = [
_decky(name="decky-01", ip="192.168.1.10", services=["ssh", "ftp"]),
_decky(name="decky-02", ip="192.168.1.11", services=["http"]),
]
mock_load.return_value = (_config(deckies=deckies), Path("test.yml"))
from decnet.engine.deployer import teardown
teardown(decky_id="decky-01")
assert mock_emit.call_count == 1
call = mock_emit.call_args_list[0]
assert call.kwargs["decky_name"] == "decky-01"
assert call.kwargs["trigger"] == "retirement"
assert call.kwargs["old_services"] == ["ssh", "ftp"]
assert call.kwargs["new_services"] == []
@patch("decnet.engine.deployer._compose")
@patch("decnet.engine.deployer.docker.from_env")
@patch("decnet.engine.deployer.load_state")