From fa0cdb3ab536b0f6d95dd9f3fbd9b3c01af935e0 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 19:31:31 -0400 Subject: [PATCH] feat(mutator): route mutate_decky through emit_decky_mutated with trigger Mutator now emits one decky_mutated event (RFC 5424 + bus) per successful mutation instead of the inline decky..state bus publish. The previous state topic published new_services only; mutation events carry old/new/trigger, which is what the correlation engine needs to interleave substrate-change markers into attacker traversals. - mutate_decky gains trigger: MutationTrigger = "operator" and captures old_services before the shuffle; replaces the inline _publish_safely(decky..state) with emit_decky_mutated(...). - mutate_all derives trigger internally: operator when force or only-filter is set (CLI --all, API mutate-now, UI bus request); scheduled on interval ticks. Passed through to each mutate_decky call. - Tests updated: the old decky..state assertion is replaced with decky..mutation topic + mutation payload shape; 3 new tests cover trigger derivation for scheduled / force / only paths. 26 tests in test_mutator.py green; 116 across mutator + topology + bus. --- decnet/mutator/engine.py | 25 +++++++++------ tests/test_mutator.py | 66 +++++++++++++++++++++++++++++++++++----- 2 files changed, 75 insertions(+), 16 deletions(-) diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index f518ffa2..3e839c03 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -29,6 +29,7 @@ from decnet.bus.publish import ( publish_safely as _publish_safely, run_health_heartbeat as _run_health_heartbeat, ) +from decnet.mutator.events import MutationTrigger, emit_decky_mutated from decnet.web.db.repository import BaseRepository log = get_logger("mutator") @@ -40,6 +41,7 @@ async def mutate_decky( decky_name: str, repo: BaseRepository, bus: BaseBus | None = None, + trigger: MutationTrigger = "operator", ) -> bool: """ Perform an Intra-Archetype Shuffle for a specific decky. @@ -73,6 +75,7 @@ async def mutate_decky( console.print(f"[yellow]No services available for mutating '{decky_name}'.[/]") return False + old_services = list(decky.services) current_services = set(decky.services) attempts = 0 @@ -103,15 +106,12 @@ async def mutate_decky( console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") return False - await _publish_safely( + await emit_decky_mutated( bus, - _topics.decky(decky_name, _topics.DECKY_STATE), - { - "name": decky_name, - "services": list(decky.services), - "last_mutated": decky.last_mutated, - }, - event_type=_topics.DECKY_STATE, + decky=decky_name, + old_services=old_services, + new_services=list(decky.services), + trigger=trigger, ) return True @@ -143,6 +143,11 @@ async def mutate_all( config = DecnetConfig(**state_dict["config"]) now = time.time() + # Trigger derivation: explicit force / targeted only-list come from + # an operator action (CLI --all, API mutate-now, UI bus request). + # Scheduled-interval ticks carry trigger=scheduled. + trigger: MutationTrigger = "operator" if (force or only is not None) else "scheduled" + mutated_count = 0 next_due_in: float | None = None for decky in config.deckies: @@ -162,7 +167,9 @@ async def mutate_all( next_due_in = remaining if due: - success = await mutate_decky(decky.name, repo=repo, bus=bus) + success = await mutate_decky( + decky.name, repo=repo, bus=bus, trigger=trigger, + ) if success: mutated_count += 1 diff --git a/tests/test_mutator.py b/tests/test_mutator.py index 12b55459..7e034742 100644 --- a/tests/test_mutator.py +++ b/tests/test_mutator.py @@ -199,32 +199,84 @@ class TestMutateAll: class TestMutateDeckyBusPublish: @pytest.mark.asyncio - async def test_publishes_decky_state_on_success(self, mock_repo): + async def test_publishes_decky_mutation_on_success(self, mock_repo, tmp_path): cfg = _make_config() mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} bus = AsyncMock() + log_path = tmp_path / "decnet.log" with patch("decnet.mutator.engine.write_compose"), \ - patch("anyio.to_thread.run_sync", new_callable=AsyncMock): + patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \ + patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)): ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus) assert ok is True bus.publish.assert_awaited_once() topic = bus.publish.await_args.args[0] payload = bus.publish.await_args.args[1] - assert topic == "decky.decky-01.state" - assert payload["name"] == "decky-01" - assert isinstance(payload["services"], list) + assert topic == "decky.decky-01.mutation" + assert payload["decky"] == "decky-01" + assert payload["old_services"] == ["ssh"] + assert isinstance(payload["new_services"], list) + assert payload["trigger"] == "operator" # direct mutate_decky call @pytest.mark.asyncio - async def test_no_publish_on_compose_failure(self, mock_repo): + async def test_no_publish_on_compose_failure(self, mock_repo, tmp_path): cfg = _make_config() mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} bus = AsyncMock() + log_path = tmp_path / "decnet.log" with patch("decnet.mutator.engine.write_compose"), \ patch("anyio.to_thread.run_sync", - new_callable=AsyncMock, side_effect=RuntimeError("boom")): + new_callable=AsyncMock, side_effect=RuntimeError("boom")), \ + patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)): ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus) assert ok is False bus.publish.assert_not_awaited() + # No syslog line either — mutation didn't land + assert not log_path.exists() + + @pytest.mark.asyncio + async def test_scheduled_trigger_on_interval_tick(self, mock_repo, tmp_path): + """mutate_all on an interval tick stamps trigger=scheduled.""" + old_ts = time.time() - 7200 # due + cfg = _make_config(deckies=[_make_decky(mutate_interval=30, last_mutated=old_ts)]) + mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} + bus = AsyncMock() + log_path = tmp_path / "decnet.log" + with patch("decnet.mutator.engine.write_compose"), \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \ + patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)): + await mutate_all(repo=mock_repo, bus=bus, force=False) + bus.publish.assert_awaited_once() + assert bus.publish.await_args.args[1]["trigger"] == "scheduled" + + @pytest.mark.asyncio + async def test_operator_trigger_on_force(self, mock_repo, tmp_path): + """mutate_all(force=True) stamps trigger=operator.""" + cfg = _make_config() + mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} + bus = AsyncMock() + log_path = tmp_path / "decnet.log" + with patch("decnet.mutator.engine.write_compose"), \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \ + patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)): + await mutate_all(repo=mock_repo, bus=bus, force=True) + bus.publish.assert_awaited_once() + assert bus.publish.await_args.args[1]["trigger"] == "operator" + + @pytest.mark.asyncio + async def test_operator_trigger_on_only_filter(self, mock_repo, tmp_path): + """mutate_all(only={'d1'}) is a targeted operator action.""" + now = time.time() + cfg = _make_config(deckies=[_make_decky("d1", mutate_interval=30, last_mutated=now)]) + mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} + bus = AsyncMock() + log_path = tmp_path / "decnet.log" + with patch("decnet.mutator.engine.write_compose"), \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \ + patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)): + await mutate_all(repo=mock_repo, bus=bus, only={"d1"}) + bus.publish.assert_awaited_once() + assert bus.publish.await_args.args[1]["trigger"] == "operator" # ---------------------------------------------------------------------------