From e23c6c4ee489539ae34f66e693feef8f989ba2e1 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 19:28:01 -0400 Subject: [PATCH] feat(mutator): bus-wake on decky mutate_request; adaptive sleep; heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The flat-fleet mutator was DB-poll-only and noisy — it logged "no active deployment found" every 10s on idle hosts and ran mutate_all at a fixed tick regardless of when the next decky was due. - mutate_all returns seconds-until-next-due; watch loop sleeps min(next_due, poll_interval_secs) with a 1s floor. - "No deployment" is now idle, not an error: edge-triggered log on present<->absent transition instead of every tick. - mutate_decky publishes decky..state on successful compose so UIs react in real time. - New decky.*.mutate_request subscription lets API/CLI/UI force an immediate mutation of a specific decky without waiting for its interval; target name feeds mutate_all(only={...}). - system.mutator.health heartbeat via run_health_heartbeat helper, bringing the mutator in line with DEBT-031 workers. Tests: next_due return, only= filter, decky..state publish on success, no publish on compose failure. Full mutator+topology- mutator+bus suite (109) green. --- decnet/bus/topics.py | 5 ++ decnet/mutator/engine.py | 166 +++++++++++++++++++++++++++++++++------ tests/test_mutator.py | 61 ++++++++++++++ 3 files changed, 206 insertions(+), 26 deletions(-) diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 602a15fd..4a5f505c 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -50,6 +50,11 @@ TOPOLOGY_STATUS = "status" # Decky-level event types (second token). DECKY_STATE = "state" DECKY_TRAFFIC = "traffic" +# On-demand mutation request — published by the API/CLI/UI, consumed by +# the mutator's watch loop to force an immediate mutation of one decky +# without waiting for its scheduled interval. Underscored (not dotted) +# to stay a single NATS token so the builder's validator accepts it. +DECKY_MUTATE_REQUEST = "mutate_request" # Attacker event types (second token under the ``attacker`` root). First # sighting, session boundary transitions, and score-threshold crossings diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index f0c22ede..f518ffa2 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -25,7 +25,10 @@ import contextlib from decnet.bus import topics as _topics from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus -from decnet.bus.publish import publish_safely as _publish_safely +from decnet.bus.publish import ( + publish_safely as _publish_safely, + run_health_heartbeat as _run_health_heartbeat, +) from decnet.web.db.repository import BaseRepository log = get_logger("mutator") @@ -33,7 +36,11 @@ console = Console() @_traced("mutator.mutate_decky") -async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool: +async def mutate_decky( + decky_name: str, + repo: BaseRepository, + bus: BaseBus | None = None, +) -> bool: """ Perform an Intra-Archetype Shuffle for a specific decky. Returns True if mutation succeeded, False otherwise. @@ -96,47 +103,74 @@ async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool: console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") return False + await _publish_safely( + bus, + _topics.decky(decky_name, _topics.DECKY_STATE), + { + "name": decky_name, + "services": list(decky.services), + "last_mutated": decky.last_mutated, + }, + event_type=_topics.DECKY_STATE, + ) return True @_traced("mutator.mutate_all") -async def mutate_all(repo: BaseRepository, force: bool = False) -> None: +async def mutate_all( + repo: BaseRepository, + force: bool = False, + bus: BaseBus | None = None, + only: set[str] | None = None, +) -> float | None: + """Mutate all deckies that are due (or *only* the named ones). + + Returns the number of seconds until the next scheduled mutation, or + ``None`` if no deployment exists / no decky has an interval set. The + watch loop uses this to adaptively sleep instead of hard-polling at a + fixed cadence. + + A missing ``deployment`` state row is *not* an error any more — the + host may simply not have run ``decnet deploy`` yet. The watch loop + edge-triggers the user-facing log for that state. """ - Check all deckies and mutate those that are due. - If force=True, mutates all deckies regardless of schedule. - """ - log.debug("mutate_all: start force=%s", force) + log.debug("mutate_all: start force=%s only=%s", force, only) state_dict = await repo.get_state("deployment") if state_dict is None: - log.error("mutate_all: no active deployment found") - console.print("[red]No active deployment found.[/]") - return + log.debug("mutate_all: no active deployment found") + return None config = DecnetConfig(**state_dict["config"]) now = time.time() mutated_count = 0 + next_due_in: float | None = None for decky in config.deckies: + if only is not None and decky.name not in only: + continue interval_mins = decky.mutate_interval or config.mutate_interval if interval_mins is None and not force: continue - if force: + if force or only is not None: due = True else: elapsed_secs = now - decky.last_mutated due = elapsed_secs >= (interval_mins * 60) + remaining = (interval_mins * 60) - elapsed_secs + if not due and (next_due_in is None or remaining < next_due_in): + next_due_in = remaining if due: - success = await mutate_decky(decky.name, repo=repo) + success = await mutate_decky(decky.name, repo=repo, bus=bus) if success: mutated_count += 1 - if mutated_count == 0 and not force: - log.debug("mutate_all: no deckies due for mutation") - console.print("[dim]No deckies are due for mutation.[/]") - else: + if mutated_count: log.info("mutate_all: complete mutated_count=%d", mutated_count) + else: + log.debug("mutate_all: no deckies due for mutation") + return next_due_in @_traced("mutator.reconcile_topologies") @@ -277,18 +311,51 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> # poll-interval latency and doesn't push notifications to UI clients. bus: BaseBus | None = None wake = asyncio.Event() - wake_task: asyncio.Task | None = None + mutate_requests: set[str] = set() + wake_tasks: list[asyncio.Task] = [] + heartbeat_task: asyncio.Task | None = None try: candidate = get_bus(client_name="mutator") await candidate.connect() bus = candidate - wake_task = asyncio.create_task(_wake_on_enqueue(bus, wake)) + wake_tasks.append(asyncio.create_task(_wake_on_enqueue(bus, wake))) + wake_tasks.append(asyncio.create_task( + _wake_on_mutate_request(bus, wake, mutate_requests), + )) + heartbeat_task = asyncio.create_task( + _run_health_heartbeat(bus, "mutator"), + ) except Exception as exc: # noqa: BLE001 log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc) + # Edge-triggered "no deployment" state so we don't spam the console + # every 10 seconds on a host that hasn't deployed yet. Start as None + # so the first observation fires exactly one line. + deployment_present: bool | None = None + try: while True: - await mutate_all(force=False, repo=repo) + requested = mutate_requests.copy() + mutate_requests.clear() + + next_due = await mutate_all( + repo=repo, + force=False, + bus=bus, + only=requested or None, + ) + has_deployment = ( + next_due is not None or await repo.get_state("deployment") is not None + ) + if has_deployment and deployment_present is not True: + log.info("mutator: active deployment observed — entering normal cadence") + console.print("[green]Active deployment observed.[/]") + deployment_present = True + elif not has_deployment and deployment_present is not False: + log.info("mutator: no active deployment — idling until one lands") + console.print("[dim]No active deployment; mutator idling.[/]") + deployment_present = False + # Gate reconciler on the O(log n) guard query — avoids # entering the dispatch body when there's nothing to do. try: @@ -303,11 +370,17 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> pass except Exception: log.exception("reconcile_agent_resyncs tick raised") - # Wait until either poll_interval_secs elapses OR an enqueued - # mutation wakes us early. Clearing before the next tick - # means a second wake during the tick will re-fire after. + + # Adaptive sleep: wake at the earlier of (next decky due) or + # (poll_interval_secs), bounded below by 1s so a thrashing + # schedule can't spin the loop. A bus wake (enqueue or + # mutate_request) short-circuits the wait. + if next_due is None or next_due > poll_interval_secs: + timeout = float(poll_interval_secs) + else: + timeout = max(1.0, next_due) try: - await asyncio.wait_for(wake.wait(), timeout=poll_interval_secs) + await asyncio.wait_for(wake.wait(), timeout=timeout) except asyncio.TimeoutError: pass wake.clear() @@ -315,10 +388,15 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> log.info("mutator watch loop stopped") console.print("\n[dim]Mutator watcher stopped.[/]") finally: - if wake_task is not None: - wake_task.cancel() + for t in wake_tasks: + t.cancel() + if heartbeat_task is not None: + heartbeat_task.cancel() + for t in (*wake_tasks, heartbeat_task): + if t is None: + continue with contextlib.suppress(asyncio.CancelledError, Exception): - await wake_task + await t if bus is not None: with contextlib.suppress(Exception): await bus.close() @@ -342,3 +420,39 @@ async def _wake_on_enqueue(bus: BaseBus, wake: asyncio.Event) -> None: raise except Exception as exc: # noqa: BLE001 log.warning("mutator: wake subscriber died (%s); falling back to poll", exc) + + +async def _wake_on_mutate_request( + bus: BaseBus, + wake: asyncio.Event, + pending: set[str], +) -> None: + """Collect on-demand ``decky..mutate_request`` events. + + API/CLI/UI callers publish to ``decky.{name}.mutate_request`` to force + an immediate mutation without waiting for the scheduled interval. We + stash the target decky name in *pending* so the next tick can feed it + to ``mutate_all(only=...)``, then flip *wake* to short-circuit the + sleep. Payload is optional — the topic's second token is the name. + """ + pattern = f"{_topics.DECKY}.*.{_topics.DECKY_MUTATE_REQUEST}" + try: + sub = bus.subscribe(pattern) + async with sub: + async for event in sub: + topic = getattr(event, "topic", "") or "" + parts = topic.split(".") + name = parts[1] if len(parts) >= 3 else "" + payload = getattr(event, "payload", None) or {} + if not name and isinstance(payload, dict): + name = payload.get("name", "") or "" + if name: + pending.add(name) + wake.set() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning( + "mutator: mutate_request subscriber died (%s); falling back to poll", + exc, + ) diff --git a/tests/test_mutator.py b/tests/test_mutator.py index f45c7586..3df8931b 100644 --- a/tests/test_mutator.py +++ b/tests/test_mutator.py @@ -162,6 +162,67 @@ class TestMutateAll: await mutate_all(repo=mock_repo, force=False) mock_mutate.assert_called_once() + async def test_no_state_returns_none_not_error(self, mock_repo): + """Missing deployment is idle, not an error — must return None.""" + mock_repo.get_state.return_value = None + assert await mutate_all(repo=mock_repo) is None + + async def test_returns_seconds_until_next_due(self, mock_repo): + # Two deckies: one 10 min to go, one 25 min to go → min is ~600s + now = time.time() + cfg = _make_config(deckies=[ + _make_decky("d1", mutate_interval=30, last_mutated=now - 20 * 60), + _make_decky("d2", mutate_interval=30, last_mutated=now - 5 * 60), + ]) + mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} + with patch("decnet.mutator.engine.mutate_decky", new_callable=AsyncMock): + next_due = await mutate_all(repo=mock_repo, force=False) + assert next_due is not None + assert 590 < next_due < 610 # ~10 min + + async def test_only_filter_forces_named_decky(self, mock_repo): + """only={'d1'} mutates d1 regardless of schedule, skips others.""" + now = time.time() + cfg = _make_config(deckies=[ + _make_decky("d1", mutate_interval=30, last_mutated=now), # not due + _make_decky("d2", mutate_interval=30, last_mutated=now), # not due + ]) + mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} + with patch("decnet.mutator.engine.mutate_decky", new_callable=AsyncMock, return_value=True) as mock_mutate: + await mutate_all(repo=mock_repo, force=False, only={"d1"}) + assert mock_mutate.call_count == 1 + assert mock_mutate.call_args.args[0] == "d1" + + +class TestMutateDeckyBusPublish: + @pytest.mark.asyncio + async def test_publishes_decky_state_on_success(self, mock_repo): + cfg = _make_config() + mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} + bus = AsyncMock() + with patch("decnet.mutator.engine.write_compose"), \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock): + ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus) + assert ok is True + bus.publish.assert_awaited_once() + topic = bus.publish.await_args.args[0] + payload = bus.publish.await_args.args[1] + assert topic == "decky.decky-01.state" + assert payload["name"] == "decky-01" + assert isinstance(payload["services"], list) + + @pytest.mark.asyncio + async def test_no_publish_on_compose_failure(self, mock_repo): + cfg = _make_config() + mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} + bus = AsyncMock() + with patch("decnet.mutator.engine.write_compose"), \ + patch("anyio.to_thread.run_sync", + new_callable=AsyncMock, side_effect=RuntimeError("boom")): + ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus) + assert ok is False + bus.publish.assert_not_awaited() + # --------------------------------------------------------------------------- # _compose_with_retry (Sync tests, keep as is or minimal update)