Files
DECNET/decnet/mutator/engine.py
anti e23c6c4ee4 feat(mutator): bus-wake on decky mutate_request; adaptive sleep; heartbeat
The flat-fleet mutator was DB-poll-only and noisy — it logged
"no active deployment found" every 10s on idle hosts and ran
mutate_all at a fixed tick regardless of when the next decky
was due.

- mutate_all returns seconds-until-next-due; watch loop sleeps
  min(next_due, poll_interval_secs) with a 1s floor.
- "No deployment" is now idle, not an error: edge-triggered log
  on present<->absent transition instead of every tick.
- mutate_decky publishes decky.<name>.state on successful compose
  so UIs react in real time.
- New decky.*.mutate_request subscription lets API/CLI/UI force
  an immediate mutation of a specific decky without waiting for
  its interval; target name feeds mutate_all(only={...}).
- system.mutator.health heartbeat via run_health_heartbeat helper,
  bringing the mutator in line with DEBT-031 workers.

Tests: next_due return, only= filter, decky.<name>.state publish
on success, no publish on compose failure. Full mutator+topology-
mutator+bus suite (109) green.
2026-04-21 19:28:01 -04:00

459 lines
17 KiB
Python

"""
Mutation Engine for DECNET.
Handles dynamic rotation of exposed honeypot services over time.
"""
import random
import time
from typing import Optional
from rich.console import Console
from decnet.archetypes import get_archetype
from decnet.fleet import all_service_names
from decnet.composer import write_compose
from decnet.config import DeckyConfig, DecnetConfig
from decnet.engine import _compose_with_retry
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from pathlib import Path
import anyio
import asyncio
import contextlib
from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus
from decnet.bus.publish import (
publish_safely as _publish_safely,
run_health_heartbeat as _run_health_heartbeat,
)
from decnet.web.db.repository import BaseRepository
log = get_logger("mutator")
console = Console()
@_traced("mutator.mutate_decky")
async def mutate_decky(
decky_name: str,
repo: BaseRepository,
bus: BaseBus | None = None,
) -> bool:
"""
Perform an Intra-Archetype Shuffle for a specific decky.
Returns True if mutation succeeded, False otherwise.
"""
log.debug("mutate_decky: start decky=%s", decky_name)
state_dict = await repo.get_state("deployment")
if state_dict is None:
log.error("mutate_decky: no active deployment found in database")
console.print("[red]No active deployment found in database.[/]")
return False
config = DecnetConfig(**state_dict["config"])
compose_path = Path(state_dict["compose_path"])
decky: Optional[DeckyConfig] = next((d for d in config.deckies if d.name == decky_name), None)
if not decky:
console.print(f"[red]Decky '{decky_name}' not found in state.[/]")
return False
if decky.archetype:
try:
arch = get_archetype(decky.archetype)
svc_pool = list(arch.services)
except ValueError:
svc_pool = all_service_names()
else:
svc_pool = all_service_names()
if not svc_pool:
console.print(f"[yellow]No services available for mutating '{decky_name}'.[/]")
return False
current_services = set(decky.services)
attempts = 0
while True:
count = random.randint(1, min(3, len(svc_pool))) # nosec B311
chosen = set(random.sample(svc_pool, count)) # nosec B311
attempts += 1
if chosen != current_services or attempts > 20:
break
decky.services = list(chosen)
decky.last_mutated = time.time()
# Save to DB
await repo.set_state("deployment", {"config": config.model_dump(), "compose_path": str(compose_path)})
# Still writes files for Docker to use
write_compose(config, compose_path)
log.info("mutation applied decky=%s services=%s", decky_name, ",".join(decky.services))
console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]")
try:
# Wrap blocking call in thread
await anyio.to_thread.run_sync(_compose_with_retry, "up", "-d", "--remove-orphans", compose_path)
except Exception as e:
log.error("mutation failed decky=%s error=%s", decky_name, e)
console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]")
return False
await _publish_safely(
bus,
_topics.decky(decky_name, _topics.DECKY_STATE),
{
"name": decky_name,
"services": list(decky.services),
"last_mutated": decky.last_mutated,
},
event_type=_topics.DECKY_STATE,
)
return True
@_traced("mutator.mutate_all")
async def mutate_all(
repo: BaseRepository,
force: bool = False,
bus: BaseBus | None = None,
only: set[str] | None = None,
) -> float | None:
"""Mutate all deckies that are due (or *only* the named ones).
Returns the number of seconds until the next scheduled mutation, or
``None`` if no deployment exists / no decky has an interval set. The
watch loop uses this to adaptively sleep instead of hard-polling at a
fixed cadence.
A missing ``deployment`` state row is *not* an error any more — the
host may simply not have run ``decnet deploy`` yet. The watch loop
edge-triggers the user-facing log for that state.
"""
log.debug("mutate_all: start force=%s only=%s", force, only)
state_dict = await repo.get_state("deployment")
if state_dict is None:
log.debug("mutate_all: no active deployment found")
return None
config = DecnetConfig(**state_dict["config"])
now = time.time()
mutated_count = 0
next_due_in: float | None = None
for decky in config.deckies:
if only is not None and decky.name not in only:
continue
interval_mins = decky.mutate_interval or config.mutate_interval
if interval_mins is None and not force:
continue
if force or only is not None:
due = True
else:
elapsed_secs = now - decky.last_mutated
due = elapsed_secs >= (interval_mins * 60)
remaining = (interval_mins * 60) - elapsed_secs
if not due and (next_due_in is None or remaining < next_due_in):
next_due_in = remaining
if due:
success = await mutate_decky(decky.name, repo=repo, bus=bus)
if success:
mutated_count += 1
if mutated_count:
log.info("mutate_all: complete mutated_count=%d", mutated_count)
else:
log.debug("mutate_all: no deckies due for mutation")
return next_due_in
@_traced("mutator.reconcile_topologies")
async def reconcile_topologies(
repo: BaseRepository, bus: BaseBus | None = None,
) -> int:
"""Drain pending ``topology_mutations`` rows against live topologies.
For every topology in ``active|degraded`` with at least one pending
mutation, atomically claim the oldest via
:meth:`BaseRepository.claim_next_mutation`, dispatch to the matching
``apply_<op>`` in :mod:`decnet.mutator.ops`, and write the outcome
back (``applied`` or ``failed``).
On ``MutationError`` the topology is flipped to ``degraded`` — the
same state the future Healer will target — so operators can see that
a requested change was rejected without the repo drifting into an
inconsistent state.
Returns the number of mutations drained this tick.
"""
# Local imports keep the flat-fleet hot path free of MazeNET cost.
from decnet.mutator.ops import MutationError, dispatch as _op_dispatch
from decnet.topology.persistence import transition_status
from decnet.topology.status import TopologyStatus, TopologyStatusError
drained = 0
for tid in await repo.list_live_topology_ids():
while True:
mut = await repo.claim_next_mutation(tid)
if mut is None:
break # no more work for this topology this tick.
await _publish_safely(
bus,
_topics.topology_mutation(tid, _topics.MUTATION_APPLYING),
{"mutation_id": mut["id"], "op": mut["op"], "payload": mut["payload"]},
event_type=_topics.MUTATION_APPLYING,
)
try:
await _op_dispatch(repo, tid, mut["op"], mut["payload"])
await repo.mark_mutation_applied(mut["id"])
drained += 1
log.info(
"topology %s mutation %s applied op=%s",
tid, mut["id"], mut["op"],
)
await _publish_safely(
bus,
_topics.topology_mutation(tid, _topics.MUTATION_APPLIED),
{"mutation_id": mut["id"], "op": mut["op"]},
event_type=_topics.MUTATION_APPLIED,
)
except (MutationError, Exception) as exc: # noqa: BLE001
reason = f"{type(exc).__name__}: {exc}"
await repo.mark_mutation_failed(mut["id"], reason)
log.warning(
"topology %s mutation %s failed: %s",
tid, mut["id"], reason,
)
await _publish_safely(
bus,
_topics.topology_mutation(tid, _topics.MUTATION_FAILED),
{"mutation_id": mut["id"], "op": mut["op"], "reason": reason},
event_type=_topics.MUTATION_FAILED,
)
try:
await transition_status(
repo, tid, TopologyStatus.DEGRADED, reason=reason,
)
await _publish_safely(
bus,
_topics.topology_status(tid),
{"state": TopologyStatus.DEGRADED, "reason": reason},
event_type=_topics.TOPOLOGY_STATUS,
)
except TopologyStatusError:
# Already degraded / in a state that can't degrade
# further — leave as is.
pass
# Stop draining this topology on first failure so the
# operator can inspect before a cascade.
break
return drained
@_traced("mutator.reconcile_agent_resyncs")
async def reconcile_agent_resyncs(repo: BaseRepository) -> int:
"""Re-push agent-targeted topologies flagged by the heartbeat handler.
The heartbeat sets ``needs_resync=True`` when an agent's reported
applied_version_hash diverges from master's expectation. Here we
re-run the agent branch of ``deploy_topology`` which pushes the
current hydrated blob back down over mTLS and clears the flag on
success. Any push failure leaves the flag set so the next tick
retries — it also logs loudly so ops can see that a specific agent
is stuck.
"""
from decnet.engine import deployer as _deployer
try:
pending = await repo.list_topologies_needing_resync()
except NotImplementedError:
return 0
drained = 0
for topo in pending:
tid = topo["id"]
try:
await _deployer.resync_agent_topology(repo, tid)
await repo.set_topology_resync(tid, False)
drained += 1
log.info("topology %s resynced to agent %s",
tid, topo.get("target_host_uuid"))
except Exception as exc: # noqa: BLE001
log.warning(
"topology %s resync failed (will retry): %s", tid, exc,
)
return drained
@_traced("mutator.watch_loop")
async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None:
"""Run an infinite loop checking for deckies that need mutation.
Two independent responsibilities, in strict order per tick:
1. Flat-fleet service rotation (``mutate_all``) — runs every tick
regardless of MazeNET state, preserving phase-1 timing.
2. MazeNET live-mutation reconciliation — runs only when the cheap
guard ``has_pending_topology_mutation`` (indexed composite
lookup) returns True. Zero-topology and idle-topology hosts pay
exactly one indexed query per tick.
"""
log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs)
console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]")
# Connect to the bus for publish + wake-on-enqueue. Failure here is
# non-fatal: a mutator without a bus still works, it just runs at
# poll-interval latency and doesn't push notifications to UI clients.
bus: BaseBus | None = None
wake = asyncio.Event()
mutate_requests: set[str] = set()
wake_tasks: list[asyncio.Task] = []
heartbeat_task: asyncio.Task | None = None
try:
candidate = get_bus(client_name="mutator")
await candidate.connect()
bus = candidate
wake_tasks.append(asyncio.create_task(_wake_on_enqueue(bus, wake)))
wake_tasks.append(asyncio.create_task(
_wake_on_mutate_request(bus, wake, mutate_requests),
))
heartbeat_task = asyncio.create_task(
_run_health_heartbeat(bus, "mutator"),
)
except Exception as exc: # noqa: BLE001
log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc)
# Edge-triggered "no deployment" state so we don't spam the console
# every 10 seconds on a host that hasn't deployed yet. Start as None
# so the first observation fires exactly one line.
deployment_present: bool | None = None
try:
while True:
requested = mutate_requests.copy()
mutate_requests.clear()
next_due = await mutate_all(
repo=repo,
force=False,
bus=bus,
only=requested or None,
)
has_deployment = (
next_due is not None or await repo.get_state("deployment") is not None
)
if has_deployment and deployment_present is not True:
log.info("mutator: active deployment observed — entering normal cadence")
console.print("[green]Active deployment observed.[/]")
deployment_present = True
elif not has_deployment and deployment_present is not False:
log.info("mutator: no active deployment — idling until one lands")
console.print("[dim]No active deployment; mutator idling.[/]")
deployment_present = False
# Gate reconciler on the O(log n) guard query — avoids
# entering the dispatch body when there's nothing to do.
try:
if await repo.has_pending_topology_mutation():
await reconcile_topologies(repo, bus=bus)
except NotImplementedError:
# Backend without MazeNET support — nothing to reconcile.
pass
try:
await reconcile_agent_resyncs(repo)
except NotImplementedError:
pass
except Exception:
log.exception("reconcile_agent_resyncs tick raised")
# Adaptive sleep: wake at the earlier of (next decky due) or
# (poll_interval_secs), bounded below by 1s so a thrashing
# schedule can't spin the loop. A bus wake (enqueue or
# mutate_request) short-circuits the wait.
if next_due is None or next_due > poll_interval_secs:
timeout = float(poll_interval_secs)
else:
timeout = max(1.0, next_due)
try:
await asyncio.wait_for(wake.wait(), timeout=timeout)
except asyncio.TimeoutError:
pass
wake.clear()
except KeyboardInterrupt:
log.info("mutator watch loop stopped")
console.print("\n[dim]Mutator watcher stopped.[/]")
finally:
for t in wake_tasks:
t.cancel()
if heartbeat_task is not None:
heartbeat_task.cancel()
for t in (*wake_tasks, heartbeat_task):
if t is None:
continue
with contextlib.suppress(asyncio.CancelledError, Exception):
await t
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
async def _wake_on_enqueue(bus: BaseBus, wake: asyncio.Event) -> None:
"""Flip *wake* every time a ``mutation.enqueued`` event lands.
Subscribes to the wildcard ``topology.*.mutation.enqueued`` — a single
subscription covers every topology on the host. Runs until cancelled
or the bus closes (NullBus yields nothing and returns immediately,
which is fine: the poll-interval fallback still ticks).
"""
pattern = f"{_topics.TOPOLOGY}.*.mutation.{_topics.MUTATION_ENQUEUED}"
try:
sub = bus.subscribe(pattern)
async with sub:
async for _event in sub:
wake.set()
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
log.warning("mutator: wake subscriber died (%s); falling back to poll", exc)
async def _wake_on_mutate_request(
bus: BaseBus,
wake: asyncio.Event,
pending: set[str],
) -> None:
"""Collect on-demand ``decky.<name>.mutate_request`` events.
API/CLI/UI callers publish to ``decky.{name}.mutate_request`` to force
an immediate mutation without waiting for the scheduled interval. We
stash the target decky name in *pending* so the next tick can feed it
to ``mutate_all(only=...)``, then flip *wake* to short-circuit the
sleep. Payload is optional — the topic's second token is the name.
"""
pattern = f"{_topics.DECKY}.*.{_topics.DECKY_MUTATE_REQUEST}"
try:
sub = bus.subscribe(pattern)
async with sub:
async for event in sub:
topic = getattr(event, "topic", "") or ""
parts = topic.split(".")
name = parts[1] if len(parts) >= 3 else ""
payload = getattr(event, "payload", None) or {}
if not name and isinstance(payload, dict):
name = payload.get("name", "") or ""
if name:
pending.add(name)
wake.set()
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
log.warning(
"mutator: mutate_request subscriber died (%s); falling back to poll",
exc,
)