Files
DECNET/decnet/mutator/engine.py
anti a76b9ecdf9 feat(mazenet): step 7 — topology_mutations queue + mutator reconciler
Adds the live-mutation pipeline for active/degraded topologies:

* TopologyMutation table with composite index (state, topology_id)
  so the watch-loop guard query stays O(log n).
* claim_next_mutation is a single atomic UPDATE ... WHERE
  state='pending' so racing reconcilers deterministically pick one
  winner; losers see rowcount=0 and skip.
* reconcile_topologies drains pending rows per live topology, applies
  via decnet.mutator.ops.dispatch, and on failure marks the mutation
  failed + transitions topology to degraded.
* run_watch_loop gains a gated branch: flat-fleet mutate_all runs
  every tick unchanged; the reconciler only enters when the cheap
  has_pending_topology_mutation guard returns True.
* apply_* ops re-check hard invariants (names, IP collisions, subnet
  overlap, known services, service_config shape) after every mutation
  so the repo never lands in an invalid state.
* CLI: 'decnet topology mutate' / 'mutations' subcommands.
2026-04-20 18:02:37 -04:00

223 lines
8.2 KiB
Python

"""
Mutation Engine for DECNET.
Handles dynamic rotation of exposed honeypot services over time.
"""
import random
import time
from typing import Optional
from rich.console import Console
from decnet.archetypes import get_archetype
from decnet.fleet import all_service_names
from decnet.composer import write_compose
from decnet.config import DeckyConfig, DecnetConfig
from decnet.engine import _compose_with_retry
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from pathlib import Path
import anyio
import asyncio
from decnet.web.db.repository import BaseRepository
log = get_logger("mutator")
console = Console()
@_traced("mutator.mutate_decky")
async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
"""
Perform an Intra-Archetype Shuffle for a specific decky.
Returns True if mutation succeeded, False otherwise.
"""
log.debug("mutate_decky: start decky=%s", decky_name)
state_dict = await repo.get_state("deployment")
if state_dict is None:
log.error("mutate_decky: no active deployment found in database")
console.print("[red]No active deployment found in database.[/]")
return False
config = DecnetConfig(**state_dict["config"])
compose_path = Path(state_dict["compose_path"])
decky: Optional[DeckyConfig] = next((d for d in config.deckies if d.name == decky_name), None)
if not decky:
console.print(f"[red]Decky '{decky_name}' not found in state.[/]")
return False
if decky.archetype:
try:
arch = get_archetype(decky.archetype)
svc_pool = list(arch.services)
except ValueError:
svc_pool = all_service_names()
else:
svc_pool = all_service_names()
if not svc_pool:
console.print(f"[yellow]No services available for mutating '{decky_name}'.[/]")
return False
current_services = set(decky.services)
attempts = 0
while True:
count = random.randint(1, min(3, len(svc_pool))) # nosec B311
chosen = set(random.sample(svc_pool, count)) # nosec B311
attempts += 1
if chosen != current_services or attempts > 20:
break
decky.services = list(chosen)
decky.last_mutated = time.time()
# Save to DB
await repo.set_state("deployment", {"config": config.model_dump(), "compose_path": str(compose_path)})
# Still writes files for Docker to use
write_compose(config, compose_path)
log.info("mutation applied decky=%s services=%s", decky_name, ",".join(decky.services))
console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]")
try:
# Wrap blocking call in thread
await anyio.to_thread.run_sync(_compose_with_retry, "up", "-d", "--remove-orphans", compose_path)
except Exception as e:
log.error("mutation failed decky=%s error=%s", decky_name, e)
console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]")
return False
return True
@_traced("mutator.mutate_all")
async def mutate_all(repo: BaseRepository, force: bool = False) -> None:
"""
Check all deckies and mutate those that are due.
If force=True, mutates all deckies regardless of schedule.
"""
log.debug("mutate_all: start force=%s", force)
state_dict = await repo.get_state("deployment")
if state_dict is None:
log.error("mutate_all: no active deployment found")
console.print("[red]No active deployment found.[/]")
return
config = DecnetConfig(**state_dict["config"])
now = time.time()
mutated_count = 0
for decky in config.deckies:
interval_mins = decky.mutate_interval or config.mutate_interval
if interval_mins is None and not force:
continue
if force:
due = True
else:
elapsed_secs = now - decky.last_mutated
due = elapsed_secs >= (interval_mins * 60)
if due:
success = await mutate_decky(decky.name, repo=repo)
if success:
mutated_count += 1
if mutated_count == 0 and not force:
log.debug("mutate_all: no deckies due for mutation")
console.print("[dim]No deckies are due for mutation.[/]")
else:
log.info("mutate_all: complete mutated_count=%d", mutated_count)
@_traced("mutator.reconcile_topologies")
async def reconcile_topologies(repo: BaseRepository) -> int:
"""Drain pending ``topology_mutations`` rows against live topologies.
For every topology in ``active|degraded`` with at least one pending
mutation, atomically claim the oldest via
:meth:`BaseRepository.claim_next_mutation`, dispatch to the matching
``apply_<op>`` in :mod:`decnet.mutator.ops`, and write the outcome
back (``applied`` or ``failed``).
On ``MutationError`` the topology is flipped to ``degraded`` — the
same state the future Healer will target — so operators can see that
a requested change was rejected without the repo drifting into an
inconsistent state.
Returns the number of mutations drained this tick.
"""
# Local imports keep the flat-fleet hot path free of MazeNET cost.
from decnet.mutator.ops import MutationError, dispatch as _op_dispatch
from decnet.topology.persistence import transition_status
from decnet.topology.status import TopologyStatus, TopologyStatusError
drained = 0
for tid in await repo.list_live_topology_ids():
while True:
mut = await repo.claim_next_mutation(tid)
if mut is None:
break # no more work for this topology this tick.
try:
await _op_dispatch(repo, tid, mut["op"], mut["payload"])
await repo.mark_mutation_applied(mut["id"])
drained += 1
log.info(
"topology %s mutation %s applied op=%s",
tid, mut["id"], mut["op"],
)
except (MutationError, Exception) as exc: # noqa: BLE001
reason = f"{type(exc).__name__}: {exc}"
await repo.mark_mutation_failed(mut["id"], reason)
log.warning(
"topology %s mutation %s failed: %s",
tid, mut["id"], reason,
)
try:
await transition_status(
repo, tid, TopologyStatus.DEGRADED, reason=reason,
)
except TopologyStatusError:
# Already degraded / in a state that can't degrade
# further — leave as is.
pass
# Stop draining this topology on first failure so the
# operator can inspect before a cascade.
break
return drained
@_traced("mutator.watch_loop")
async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None:
"""Run an infinite loop checking for deckies that need mutation.
Two independent responsibilities, in strict order per tick:
1. Flat-fleet service rotation (``mutate_all``) — runs every tick
regardless of MazeNET state, preserving phase-1 timing.
2. MazeNET live-mutation reconciliation — runs only when the cheap
guard ``has_pending_topology_mutation`` (indexed composite
lookup) returns True. Zero-topology and idle-topology hosts pay
exactly one indexed query per tick.
"""
log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs)
console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]")
try:
while True:
await mutate_all(force=False, repo=repo)
# Gate reconciler on the O(log n) guard query — avoids
# entering the dispatch body when there's nothing to do.
try:
if await repo.has_pending_topology_mutation():
await reconcile_topologies(repo)
except NotImplementedError:
# Backend without MazeNET support — nothing to reconcile.
pass
await asyncio.sleep(poll_interval_secs)
except KeyboardInterrupt:
log.info("mutator watch loop stopped")
console.print("\n[dim]Mutator watcher stopped.[/]")