feat(mutator): route mutate_decky through emit_decky_mutated with trigger

Mutator now emits one decky_mutated event (RFC 5424 + bus) per
successful mutation instead of the inline decky.<id>.state bus
publish.  The previous state topic published new_services only;
mutation events carry old/new/trigger, which is what the correlation
engine needs to interleave substrate-change markers into attacker
traversals.

- mutate_decky gains trigger: MutationTrigger = "operator" and
  captures old_services before the shuffle; replaces the inline
  _publish_safely(decky.<id>.state) with emit_decky_mutated(...).
- mutate_all derives trigger internally: operator when force or
  only-filter is set (CLI --all, API mutate-now, UI bus request);
  scheduled on interval ticks.  Passed through to each mutate_decky
  call.
- Tests updated: the old decky.<id>.state assertion is replaced
  with decky.<id>.mutation topic + mutation payload shape; 3 new
  tests cover trigger derivation for scheduled / force / only paths.
  26 tests in test_mutator.py green; 116 across mutator + topology
  + bus.
This commit is contained in:
2026-04-21 19:31:31 -04:00
parent f875350d75
commit fa0cdb3ab5
2 changed files with 75 additions and 16 deletions

View File

@@ -29,6 +29,7 @@ from decnet.bus.publish import (
publish_safely as _publish_safely, publish_safely as _publish_safely,
run_health_heartbeat as _run_health_heartbeat, run_health_heartbeat as _run_health_heartbeat,
) )
from decnet.mutator.events import MutationTrigger, emit_decky_mutated
from decnet.web.db.repository import BaseRepository from decnet.web.db.repository import BaseRepository
log = get_logger("mutator") log = get_logger("mutator")
@@ -40,6 +41,7 @@ async def mutate_decky(
decky_name: str, decky_name: str,
repo: BaseRepository, repo: BaseRepository,
bus: BaseBus | None = None, bus: BaseBus | None = None,
trigger: MutationTrigger = "operator",
) -> bool: ) -> bool:
""" """
Perform an Intra-Archetype Shuffle for a specific decky. Perform an Intra-Archetype Shuffle for a specific decky.
@@ -73,6 +75,7 @@ async def mutate_decky(
console.print(f"[yellow]No services available for mutating '{decky_name}'.[/]") console.print(f"[yellow]No services available for mutating '{decky_name}'.[/]")
return False return False
old_services = list(decky.services)
current_services = set(decky.services) current_services = set(decky.services)
attempts = 0 attempts = 0
@@ -103,15 +106,12 @@ async def mutate_decky(
console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]")
return False return False
await _publish_safely( await emit_decky_mutated(
bus, bus,
_topics.decky(decky_name, _topics.DECKY_STATE), decky=decky_name,
{ old_services=old_services,
"name": decky_name, new_services=list(decky.services),
"services": list(decky.services), trigger=trigger,
"last_mutated": decky.last_mutated,
},
event_type=_topics.DECKY_STATE,
) )
return True return True
@@ -143,6 +143,11 @@ async def mutate_all(
config = DecnetConfig(**state_dict["config"]) config = DecnetConfig(**state_dict["config"])
now = time.time() now = time.time()
# Trigger derivation: explicit force / targeted only-list come from
# an operator action (CLI --all, API mutate-now, UI bus request).
# Scheduled-interval ticks carry trigger=scheduled.
trigger: MutationTrigger = "operator" if (force or only is not None) else "scheduled"
mutated_count = 0 mutated_count = 0
next_due_in: float | None = None next_due_in: float | None = None
for decky in config.deckies: for decky in config.deckies:
@@ -162,7 +167,9 @@ async def mutate_all(
next_due_in = remaining next_due_in = remaining
if due: if due:
success = await mutate_decky(decky.name, repo=repo, bus=bus) success = await mutate_decky(
decky.name, repo=repo, bus=bus, trigger=trigger,
)
if success: if success:
mutated_count += 1 mutated_count += 1

View File

@@ -199,32 +199,84 @@ class TestMutateAll:
class TestMutateDeckyBusPublish: class TestMutateDeckyBusPublish:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_publishes_decky_state_on_success(self, mock_repo): async def test_publishes_decky_mutation_on_success(self, mock_repo, tmp_path):
cfg = _make_config() cfg = _make_config()
mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"}
bus = AsyncMock() bus = AsyncMock()
log_path = tmp_path / "decnet.log"
with patch("decnet.mutator.engine.write_compose"), \ with patch("decnet.mutator.engine.write_compose"), \
patch("anyio.to_thread.run_sync", new_callable=AsyncMock): patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \
patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)):
ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus) ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus)
assert ok is True assert ok is True
bus.publish.assert_awaited_once() bus.publish.assert_awaited_once()
topic = bus.publish.await_args.args[0] topic = bus.publish.await_args.args[0]
payload = bus.publish.await_args.args[1] payload = bus.publish.await_args.args[1]
assert topic == "decky.decky-01.state" assert topic == "decky.decky-01.mutation"
assert payload["name"] == "decky-01" assert payload["decky"] == "decky-01"
assert isinstance(payload["services"], list) assert payload["old_services"] == ["ssh"]
assert isinstance(payload["new_services"], list)
assert payload["trigger"] == "operator" # direct mutate_decky call
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_no_publish_on_compose_failure(self, mock_repo): async def test_no_publish_on_compose_failure(self, mock_repo, tmp_path):
cfg = _make_config() cfg = _make_config()
mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"} mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"}
bus = AsyncMock() bus = AsyncMock()
log_path = tmp_path / "decnet.log"
with patch("decnet.mutator.engine.write_compose"), \ with patch("decnet.mutator.engine.write_compose"), \
patch("anyio.to_thread.run_sync", patch("anyio.to_thread.run_sync",
new_callable=AsyncMock, side_effect=RuntimeError("boom")): new_callable=AsyncMock, side_effect=RuntimeError("boom")), \
patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)):
ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus) ok = await mutate_decky("decky-01", repo=mock_repo, bus=bus)
assert ok is False assert ok is False
bus.publish.assert_not_awaited() bus.publish.assert_not_awaited()
# No syslog line either — mutation didn't land
assert not log_path.exists()
@pytest.mark.asyncio
async def test_scheduled_trigger_on_interval_tick(self, mock_repo, tmp_path):
"""mutate_all on an interval tick stamps trigger=scheduled."""
old_ts = time.time() - 7200 # due
cfg = _make_config(deckies=[_make_decky(mutate_interval=30, last_mutated=old_ts)])
mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"}
bus = AsyncMock()
log_path = tmp_path / "decnet.log"
with patch("decnet.mutator.engine.write_compose"), \
patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \
patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)):
await mutate_all(repo=mock_repo, bus=bus, force=False)
bus.publish.assert_awaited_once()
assert bus.publish.await_args.args[1]["trigger"] == "scheduled"
@pytest.mark.asyncio
async def test_operator_trigger_on_force(self, mock_repo, tmp_path):
"""mutate_all(force=True) stamps trigger=operator."""
cfg = _make_config()
mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"}
bus = AsyncMock()
log_path = tmp_path / "decnet.log"
with patch("decnet.mutator.engine.write_compose"), \
patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \
patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)):
await mutate_all(repo=mock_repo, bus=bus, force=True)
bus.publish.assert_awaited_once()
assert bus.publish.await_args.args[1]["trigger"] == "operator"
@pytest.mark.asyncio
async def test_operator_trigger_on_only_filter(self, mock_repo, tmp_path):
"""mutate_all(only={'d1'}) is a targeted operator action."""
now = time.time()
cfg = _make_config(deckies=[_make_decky("d1", mutate_interval=30, last_mutated=now)])
mock_repo.get_state.return_value = {"config": cfg.model_dump(), "compose_path": "c.yml"}
bus = AsyncMock()
log_path = tmp_path / "decnet.log"
with patch("decnet.mutator.engine.write_compose"), \
patch("anyio.to_thread.run_sync", new_callable=AsyncMock), \
patch("decnet.mutator.events.DECNET_INGEST_LOG_FILE", str(log_path)):
await mutate_all(repo=mock_repo, bus=bus, only={"d1"})
bus.publish.assert_awaited_once()
assert bus.publish.await_args.args[1]["trigger"] == "operator"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------