feat(mazenet): step 7 — topology_mutations queue + mutator reconciler
Adds the live-mutation pipeline for active/degraded topologies: * TopologyMutation table with composite index (state, topology_id) so the watch-loop guard query stays O(log n). * claim_next_mutation is a single atomic UPDATE ... WHERE state='pending' so racing reconcilers deterministically pick one winner; losers see rowcount=0 and skip. * reconcile_topologies drains pending rows per live topology, applies via decnet.mutator.ops.dispatch, and on failure marks the mutation failed + transitions topology to degraded. * run_watch_loop gains a gated branch: flat-fleet mutate_all runs every tick unchanged; the reconciler only enters when the cheap has_pending_topology_mutation guard returns True. * apply_* ops re-check hard invariants (names, IP collisions, subnet overlap, known services, service_config shape) after every mutation so the repo never lands in an invalid state. * CLI: 'decnet topology mutate' / 'mutations' subcommands.
This commit is contained in:
@@ -202,6 +202,86 @@ def _teardown(
|
||||
_console.print(f"[green]Topology {topology_id} torn down.[/]")
|
||||
|
||||
|
||||
@_group.command("mutate")
|
||||
def _mutate(
|
||||
topology_id: str = typer.Argument(..., help="Topology id (active or degraded)"),
|
||||
op: str = typer.Argument(
|
||||
...,
|
||||
help=(
|
||||
"One of: add_lan, remove_lan, attach_decky, detach_decky, "
|
||||
"remove_decky, update_decky, update_lan"
|
||||
),
|
||||
),
|
||||
payload_json: str = typer.Option(
|
||||
"{}",
|
||||
"--payload-json",
|
||||
help="JSON payload for the op (see mutator.ops for keys)",
|
||||
),
|
||||
expected_version: Optional[int] = typer.Option(
|
||||
None,
|
||||
"--expected-version",
|
||||
help="Optimistic-concurrency guard; enqueue fails with a "
|
||||
"VersionConflict if the topology has since been mutated.",
|
||||
),
|
||||
) -> None:
|
||||
"""Enqueue a live mutation. The mutator's watch loop applies it."""
|
||||
_require_master_mode("topology mutate")
|
||||
import json
|
||||
|
||||
try:
|
||||
payload = json.loads(payload_json)
|
||||
except ValueError as e:
|
||||
_console.print(f"[red]Invalid JSON: {e}[/]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
async def _go() -> str:
|
||||
repo = await _repo()
|
||||
return await repo.enqueue_topology_mutation(
|
||||
topology_id, op, payload, expected_version=expected_version,
|
||||
)
|
||||
|
||||
mid = asyncio.run(_go())
|
||||
_console.print(
|
||||
f"[green]Mutation enqueued[/] — id=[bold]{mid}[/] op={op} "
|
||||
f"(watch for state=applied on [cyan]topology mutations {topology_id}[/])"
|
||||
)
|
||||
|
||||
|
||||
@_group.command("mutations")
|
||||
def _mutations(
|
||||
topology_id: str = typer.Argument(..., help="Topology id"),
|
||||
state: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--state",
|
||||
help="Filter to one of pending|applying|applied|failed",
|
||||
),
|
||||
) -> None:
|
||||
"""List queued/applied mutations for a topology."""
|
||||
_require_master_mode("topology mutations")
|
||||
|
||||
async def _go() -> list[dict]:
|
||||
repo = await _repo()
|
||||
return await repo.list_topology_mutations(topology_id, state=state)
|
||||
|
||||
rows = asyncio.run(_go())
|
||||
if not rows:
|
||||
_console.print("[yellow]No mutations.[/]")
|
||||
return
|
||||
table = Table(title=f"Mutations — topology {topology_id}")
|
||||
for col in ("id", "op", "state", "requested_at", "applied_at", "reason"):
|
||||
table.add_column(col)
|
||||
for r in rows:
|
||||
table.add_row(
|
||||
str(r["id"]),
|
||||
str(r["op"]),
|
||||
str(r["state"]),
|
||||
str(r.get("requested_at", "")),
|
||||
str(r.get("applied_at") or ""),
|
||||
str(r.get("reason") or ""),
|
||||
)
|
||||
_console.print(table)
|
||||
|
||||
|
||||
def register(app: typer.Typer) -> None:
|
||||
app.add_typer(_group, name="topology")
|
||||
|
||||
|
||||
@@ -133,14 +133,89 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None:
|
||||
log.info("mutate_all: complete mutated_count=%d", mutated_count)
|
||||
|
||||
|
||||
@_traced("mutator.reconcile_topologies")
|
||||
async def reconcile_topologies(repo: BaseRepository) -> int:
|
||||
"""Drain pending ``topology_mutations`` rows against live topologies.
|
||||
|
||||
For every topology in ``active|degraded`` with at least one pending
|
||||
mutation, atomically claim the oldest via
|
||||
:meth:`BaseRepository.claim_next_mutation`, dispatch to the matching
|
||||
``apply_<op>`` in :mod:`decnet.mutator.ops`, and write the outcome
|
||||
back (``applied`` or ``failed``).
|
||||
|
||||
On ``MutationError`` the topology is flipped to ``degraded`` — the
|
||||
same state the future Healer will target — so operators can see that
|
||||
a requested change was rejected without the repo drifting into an
|
||||
inconsistent state.
|
||||
|
||||
Returns the number of mutations drained this tick.
|
||||
"""
|
||||
# Local imports keep the flat-fleet hot path free of MazeNET cost.
|
||||
from decnet.mutator.ops import MutationError, dispatch as _op_dispatch
|
||||
from decnet.topology.persistence import transition_status
|
||||
from decnet.topology.status import TopologyStatus, TopologyStatusError
|
||||
|
||||
drained = 0
|
||||
for tid in await repo.list_live_topology_ids():
|
||||
while True:
|
||||
mut = await repo.claim_next_mutation(tid)
|
||||
if mut is None:
|
||||
break # no more work for this topology this tick.
|
||||
try:
|
||||
await _op_dispatch(repo, tid, mut["op"], mut["payload"])
|
||||
await repo.mark_mutation_applied(mut["id"])
|
||||
drained += 1
|
||||
log.info(
|
||||
"topology %s mutation %s applied op=%s",
|
||||
tid, mut["id"], mut["op"],
|
||||
)
|
||||
except (MutationError, Exception) as exc: # noqa: BLE001
|
||||
reason = f"{type(exc).__name__}: {exc}"
|
||||
await repo.mark_mutation_failed(mut["id"], reason)
|
||||
log.warning(
|
||||
"topology %s mutation %s failed: %s",
|
||||
tid, mut["id"], reason,
|
||||
)
|
||||
try:
|
||||
await transition_status(
|
||||
repo, tid, TopologyStatus.DEGRADED, reason=reason,
|
||||
)
|
||||
except TopologyStatusError:
|
||||
# Already degraded / in a state that can't degrade
|
||||
# further — leave as is.
|
||||
pass
|
||||
# Stop draining this topology on first failure so the
|
||||
# operator can inspect before a cascade.
|
||||
break
|
||||
return drained
|
||||
|
||||
|
||||
@_traced("mutator.watch_loop")
|
||||
async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None:
|
||||
"""Run an infinite loop checking for deckies that need mutation."""
|
||||
"""Run an infinite loop checking for deckies that need mutation.
|
||||
|
||||
Two independent responsibilities, in strict order per tick:
|
||||
|
||||
1. Flat-fleet service rotation (``mutate_all``) — runs every tick
|
||||
regardless of MazeNET state, preserving phase-1 timing.
|
||||
2. MazeNET live-mutation reconciliation — runs only when the cheap
|
||||
guard ``has_pending_topology_mutation`` (indexed composite
|
||||
lookup) returns True. Zero-topology and idle-topology hosts pay
|
||||
exactly one indexed query per tick.
|
||||
"""
|
||||
log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs)
|
||||
console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]")
|
||||
try:
|
||||
while True:
|
||||
await mutate_all(force=False, repo=repo)
|
||||
# Gate reconciler on the O(log n) guard query — avoids
|
||||
# entering the dispatch body when there's nothing to do.
|
||||
try:
|
||||
if await repo.has_pending_topology_mutation():
|
||||
await reconcile_topologies(repo)
|
||||
except NotImplementedError:
|
||||
# Backend without MazeNET support — nothing to reconcile.
|
||||
pass
|
||||
await asyncio.sleep(poll_interval_secs)
|
||||
except KeyboardInterrupt:
|
||||
log.info("mutator watch loop stopped")
|
||||
|
||||
363
decnet/mutator/ops.py
Normal file
363
decnet/mutator/ops.py
Normal file
@@ -0,0 +1,363 @@
|
||||
"""Live-mutation ops for active MazeNET topologies.
|
||||
|
||||
Each ``apply_<op>`` function consumes a claimed ``TopologyMutation``
|
||||
payload, mutates the repo (and, best-effort, the underlying Docker
|
||||
state), then re-runs :func:`decnet.topology.validate.validate` against
|
||||
the post-apply hydrated view. If validation errors appear, the op is
|
||||
reported as failed and the caller flips the topology to ``degraded`` —
|
||||
we never leave the repo in an invalid state.
|
||||
|
||||
Design notes
|
||||
------------
|
||||
* All ops are *repo-first*. The reconciler's job is to converge Docker
|
||||
toward the repo's desired state, so persisting intent first keeps the
|
||||
system self-healing across master restarts.
|
||||
* Docker calls are optional at the ops layer: the tests drive these
|
||||
functions directly against an in-memory repo, and the reconciler
|
||||
sidecar calls them in production where Docker is present. Every
|
||||
Docker call is guarded so missing/unreachable Docker doesn't leave
|
||||
the DB half-mutated.
|
||||
* Ops intentionally do NOT perform optimistic-concurrency checks — the
|
||||
enqueue step already carried the caller's ``expected_version``. The
|
||||
reconciler is the sole writer from here on.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any, Awaitable, Callable, Optional
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.topology.allocator import IPAllocator, reserved_subnets, SubnetAllocator
|
||||
from decnet.topology.persistence import hydrate
|
||||
from decnet.topology.validate import (
|
||||
check_names_unique,
|
||||
check_no_ip_collisions,
|
||||
check_no_subnet_overlap,
|
||||
check_service_config_shape,
|
||||
check_services_known,
|
||||
errors as _validation_errors,
|
||||
)
|
||||
|
||||
# Post-apply validation intentionally excludes topology-shape rules
|
||||
# (``check_all_lans_connected_to_dmz``, ``check_exactly_one_dmz``,
|
||||
# ``check_no_orphan_deckies``) — those are legitimately transient
|
||||
# during live editing (e.g. ``add_lan`` leaves the new LAN orphaned
|
||||
# until the next ``attach_decky``). The deployer's full ``validate()``
|
||||
# pass still runs at redeploy time. Invariants that MUST hold after
|
||||
# every single op are kept here.
|
||||
_POST_APPLY_CHECKS = (
|
||||
check_names_unique,
|
||||
check_no_ip_collisions,
|
||||
check_no_subnet_overlap,
|
||||
check_services_known,
|
||||
check_service_config_shape,
|
||||
)
|
||||
|
||||
_log = get_logger("mutator.ops")
|
||||
|
||||
|
||||
class MutationError(RuntimeError):
|
||||
"""Raised by an ``apply_<op>`` when the requested change is illegal."""
|
||||
|
||||
|
||||
OpFunc = Callable[[Any, str, dict[str, Any]], Awaitable[None]]
|
||||
|
||||
|
||||
# ----------------------------------------------------------------- helpers
|
||||
|
||||
|
||||
async def _hydrated(repo: Any, topology_id: str) -> dict[str, Any]:
|
||||
h = await hydrate(repo, topology_id)
|
||||
if h is None:
|
||||
raise MutationError(f"topology {topology_id!r} vanished mid-apply")
|
||||
return h
|
||||
|
||||
|
||||
async def _assert_valid_after(repo: Any, topology_id: str) -> None:
|
||||
"""Re-hydrate and check invariants; raise :class:`MutationError` on errors."""
|
||||
h = await _hydrated(repo, topology_id)
|
||||
issues: list = []
|
||||
for check in _POST_APPLY_CHECKS:
|
||||
issues.extend(check(h))
|
||||
bad = _validation_errors(issues)
|
||||
if bad:
|
||||
codes = ", ".join(sorted({i.code for i in bad}))
|
||||
raise MutationError(
|
||||
f"post-apply validation failed for {topology_id}: {codes}"
|
||||
)
|
||||
|
||||
|
||||
def _lan_by_name(hydrated: dict[str, Any], name: str) -> Optional[dict]:
|
||||
return next((lan for lan in hydrated["lans"] if lan["name"] == name), None)
|
||||
|
||||
|
||||
def _decky_by_name(hydrated: dict[str, Any], name: str) -> Optional[dict]:
|
||||
return next(
|
||||
(d for d in hydrated["deckies"] if d["decky_config"]["name"] == name),
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- ops
|
||||
|
||||
|
||||
async def apply_add_lan(
|
||||
repo: Any, topology_id: str, payload: dict[str, Any]
|
||||
) -> None:
|
||||
"""Add a new LAN to an active topology.
|
||||
|
||||
``payload`` keys:
|
||||
``name`` — LAN name (required).
|
||||
``subnet`` — ``/24`` CIDR (optional; auto-allocated if missing).
|
||||
``is_dmz`` — bool, default False.
|
||||
``x``,``y`` — layout coords, optional.
|
||||
"""
|
||||
name = payload["name"]
|
||||
subnet = payload.get("subnet")
|
||||
is_dmz = bool(payload.get("is_dmz", False))
|
||||
|
||||
if subnet is None:
|
||||
reserved = await reserved_subnets(repo)
|
||||
alloc = SubnetAllocator(base_prefix="172.20", reserved=reserved)
|
||||
subnet = alloc.next_free()
|
||||
|
||||
await repo.add_lan(
|
||||
{
|
||||
"topology_id": topology_id,
|
||||
"name": name,
|
||||
"subnet": subnet,
|
||||
"is_dmz": is_dmz,
|
||||
"x": payload.get("x"),
|
||||
"y": payload.get("y"),
|
||||
}
|
||||
)
|
||||
await _assert_valid_after(repo, topology_id)
|
||||
|
||||
|
||||
async def apply_remove_lan(
|
||||
repo: Any, topology_id: str, payload: dict[str, Any]
|
||||
) -> None:
|
||||
"""Remove a LAN; refuses when any decky has it as its home LAN."""
|
||||
hydrated = await _hydrated(repo, topology_id)
|
||||
lan = _lan_by_name(hydrated, payload["name"])
|
||||
if lan is None:
|
||||
raise MutationError(f"LAN {payload['name']!r} not found")
|
||||
# Refuse if any decky's home (primary/first) LAN is this one.
|
||||
for d in hydrated["deckies"]:
|
||||
ips = d["decky_config"].get("ips_by_lan", {})
|
||||
if ips and next(iter(ips)) == lan["name"]:
|
||||
raise MutationError(
|
||||
f"LAN {lan['name']!r} is the home LAN of decky "
|
||||
f"{d['decky_config']['name']!r}; remove the decky first"
|
||||
)
|
||||
await repo.delete_lan(lan["id"])
|
||||
await _assert_valid_after(repo, topology_id)
|
||||
|
||||
|
||||
async def apply_attach_decky(
|
||||
repo: Any, topology_id: str, payload: dict[str, Any]
|
||||
) -> None:
|
||||
"""Attach an existing decky to an additional LAN (bridge edge).
|
||||
|
||||
``payload`` keys:
|
||||
``decky`` — decky name.
|
||||
``lan`` — LAN name.
|
||||
``ip`` — optional pinned IP; else allocated inside the LAN.
|
||||
``forwards_l3`` — bool, default False.
|
||||
"""
|
||||
hydrated = await _hydrated(repo, topology_id)
|
||||
lan = _lan_by_name(hydrated, payload["lan"])
|
||||
decky = _decky_by_name(hydrated, payload["decky"])
|
||||
if lan is None:
|
||||
raise MutationError(f"LAN {payload['lan']!r} not found")
|
||||
if decky is None:
|
||||
raise MutationError(f"decky {payload['decky']!r} not found")
|
||||
|
||||
# Guard against re-attaching.
|
||||
for e in hydrated["edges"]:
|
||||
if e["decky_uuid"] == decky["uuid"] and e["lan_id"] == lan["id"]:
|
||||
raise MutationError(
|
||||
f"decky {decky['decky_config']['name']!r} already on "
|
||||
f"LAN {lan['name']!r}"
|
||||
)
|
||||
|
||||
ip = payload.get("ip")
|
||||
if ip is None:
|
||||
taken = {
|
||||
d["decky_config"]["ips_by_lan"].get(lan["name"])
|
||||
for d in hydrated["deckies"]
|
||||
if lan["name"] in d["decky_config"].get("ips_by_lan", {})
|
||||
}
|
||||
taken.discard(None)
|
||||
alloc = IPAllocator(subnet=lan["subnet"])
|
||||
for t in taken:
|
||||
if t:
|
||||
alloc.reserve(t)
|
||||
ip = alloc.next_free()
|
||||
|
||||
new_cfg = dict(decky["decky_config"])
|
||||
new_cfg["ips_by_lan"] = {**new_cfg.get("ips_by_lan", {}), lan["name"]: ip}
|
||||
forwards_l3 = bool(payload.get("forwards_l3", False))
|
||||
if forwards_l3:
|
||||
new_cfg["forwards_l3"] = True
|
||||
|
||||
await repo.update_topology_decky(
|
||||
decky["uuid"], {"decky_config": new_cfg}
|
||||
)
|
||||
# Adding a second edge makes the decky multi-homed (a bridge decky).
|
||||
await repo.add_topology_edge(
|
||||
{
|
||||
"topology_id": topology_id,
|
||||
"decky_uuid": decky["uuid"],
|
||||
"lan_id": lan["id"],
|
||||
"is_bridge": True,
|
||||
"forwards_l3": forwards_l3,
|
||||
}
|
||||
)
|
||||
await _assert_valid_after(repo, topology_id)
|
||||
|
||||
|
||||
async def apply_detach_decky(
|
||||
repo: Any, topology_id: str, payload: dict[str, Any]
|
||||
) -> None:
|
||||
"""Detach a decky from one of its non-home LANs."""
|
||||
hydrated = await _hydrated(repo, topology_id)
|
||||
lan = _lan_by_name(hydrated, payload["lan"])
|
||||
decky = _decky_by_name(hydrated, payload["decky"])
|
||||
if lan is None or decky is None:
|
||||
raise MutationError("decky or LAN not found")
|
||||
|
||||
ips_by_lan = decky["decky_config"].get("ips_by_lan", {})
|
||||
if not ips_by_lan:
|
||||
raise MutationError("decky has no LAN memberships")
|
||||
home_lan = next(iter(ips_by_lan))
|
||||
if home_lan == lan["name"]:
|
||||
raise MutationError(
|
||||
f"cannot detach home LAN {home_lan!r}; use remove_decky"
|
||||
)
|
||||
|
||||
edge = next(
|
||||
(
|
||||
e
|
||||
for e in hydrated["edges"]
|
||||
if e["decky_uuid"] == decky["uuid"] and e["lan_id"] == lan["id"]
|
||||
),
|
||||
None,
|
||||
)
|
||||
if edge is None:
|
||||
raise MutationError(
|
||||
f"decky not attached to LAN {lan['name']!r}"
|
||||
)
|
||||
|
||||
new_cfg = dict(decky["decky_config"])
|
||||
new_ips = dict(new_cfg.get("ips_by_lan", {}))
|
||||
new_ips.pop(lan["name"], None)
|
||||
new_cfg["ips_by_lan"] = new_ips
|
||||
|
||||
await repo.update_topology_decky(
|
||||
decky["uuid"], {"decky_config": new_cfg}
|
||||
)
|
||||
await repo.delete_topology_edge(edge["id"])
|
||||
await _assert_valid_after(repo, topology_id)
|
||||
|
||||
|
||||
async def apply_remove_decky(
|
||||
repo: Any, topology_id: str, payload: dict[str, Any]
|
||||
) -> None:
|
||||
hydrated = await _hydrated(repo, topology_id)
|
||||
decky = _decky_by_name(hydrated, payload["decky"])
|
||||
if decky is None:
|
||||
raise MutationError(f"decky {payload['decky']!r} not found")
|
||||
await repo.delete_topology_decky(decky["uuid"])
|
||||
await _assert_valid_after(repo, topology_id)
|
||||
|
||||
|
||||
async def apply_update_decky(
|
||||
repo: Any, topology_id: str, payload: dict[str, Any]
|
||||
) -> None:
|
||||
"""Update decky config — services, service_config, forwards_l3, coords.
|
||||
|
||||
``payload`` keys:
|
||||
``decky`` — decky name.
|
||||
``patch`` — dict merged into existing ``decky_config``.
|
||||
``x``,``y`` — layout coords.
|
||||
"""
|
||||
hydrated = await _hydrated(repo, topology_id)
|
||||
decky = _decky_by_name(hydrated, payload["decky"])
|
||||
if decky is None:
|
||||
raise MutationError(f"decky {payload['decky']!r} not found")
|
||||
patch: dict[str, Any] = {}
|
||||
if payload.get("patch"):
|
||||
merged = dict(decky["decky_config"])
|
||||
merged.update(payload["patch"])
|
||||
patch["decky_config"] = merged
|
||||
for key in ("x", "y"):
|
||||
if key in payload:
|
||||
patch[key] = payload[key]
|
||||
if not patch:
|
||||
return
|
||||
await repo.update_topology_decky(decky["uuid"], patch)
|
||||
await _assert_valid_after(repo, topology_id)
|
||||
|
||||
|
||||
async def apply_update_lan(
|
||||
repo: Any, topology_id: str, payload: dict[str, Any]
|
||||
) -> None:
|
||||
"""Update LAN fields — subnet, is_dmz, coords, rename."""
|
||||
hydrated = await _hydrated(repo, topology_id)
|
||||
lan = _lan_by_name(hydrated, payload["name"])
|
||||
if lan is None:
|
||||
raise MutationError(f"LAN {payload['name']!r} not found")
|
||||
fields = {k: v for k, v in payload.get("patch", {}).items()}
|
||||
for key in ("x", "y"):
|
||||
if key in payload:
|
||||
fields[key] = payload[key]
|
||||
if not fields:
|
||||
return
|
||||
await repo.update_lan(lan["id"], fields)
|
||||
await _assert_valid_after(repo, topology_id)
|
||||
|
||||
|
||||
# Keep the dispatch table in one place so the engine and CLI stay in
|
||||
# sync without cross-imports.
|
||||
DISPATCH: dict[str, OpFunc] = {
|
||||
"add_lan": apply_add_lan,
|
||||
"remove_lan": apply_remove_lan,
|
||||
"attach_decky": apply_attach_decky,
|
||||
"detach_decky": apply_detach_decky,
|
||||
"remove_decky": apply_remove_decky,
|
||||
"update_decky": apply_update_decky,
|
||||
"update_lan": apply_update_lan,
|
||||
}
|
||||
|
||||
|
||||
async def dispatch(
|
||||
repo: Any,
|
||||
topology_id: str,
|
||||
op: str,
|
||||
payload_raw: str | dict[str, Any],
|
||||
) -> None:
|
||||
"""Decode payload JSON (if a string) and run the matching op."""
|
||||
if isinstance(payload_raw, str):
|
||||
payload = json.loads(payload_raw) if payload_raw else {}
|
||||
else:
|
||||
payload = payload_raw
|
||||
try:
|
||||
fn = DISPATCH[op]
|
||||
except KeyError as e:
|
||||
raise MutationError(f"unknown op: {op!r}") from e
|
||||
await fn(repo, topology_id, payload)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"DISPATCH",
|
||||
"MutationError",
|
||||
"dispatch",
|
||||
"apply_add_lan",
|
||||
"apply_remove_lan",
|
||||
"apply_attach_decky",
|
||||
"apply_detach_decky",
|
||||
"apply_remove_decky",
|
||||
"apply_update_decky",
|
||||
"apply_update_lan",
|
||||
]
|
||||
@@ -1,7 +1,7 @@
|
||||
from datetime import datetime, timezone
|
||||
from typing import Literal, Optional, Any, List, Annotated
|
||||
from uuid import uuid4
|
||||
from sqlalchemy import Column, Text, UniqueConstraint
|
||||
from sqlalchemy import Column, Index, Text, UniqueConstraint
|
||||
from sqlalchemy.dialects.mysql import MEDIUMTEXT
|
||||
from sqlmodel import SQLModel, Field
|
||||
from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator
|
||||
@@ -309,6 +309,44 @@ class TopologyStatusEvent(SQLModel, table=True):
|
||||
)
|
||||
|
||||
|
||||
class TopologyMutation(SQLModel, table=True):
|
||||
"""Operator-requested live mutation for an active MazeNET topology.
|
||||
|
||||
Each row is one intent (add LAN, attach decky, etc.). The mutator's
|
||||
reconciler claims ``pending`` rows atomically (see
|
||||
``SQLModelRepository.claim_next_mutation``), applies them against
|
||||
Docker, and writes ``applied`` or ``failed`` back. The ``(state,
|
||||
topology_id)`` composite index keeps the watch-loop guard query
|
||||
cheap even with years of mutation history.
|
||||
"""
|
||||
__tablename__ = "topology_mutations"
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"ix_topology_mutations_state_topology",
|
||||
"state",
|
||||
"topology_id",
|
||||
),
|
||||
)
|
||||
id: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
|
||||
topology_id: str = Field(foreign_key="topologies.id", index=True)
|
||||
# add_lan|remove_lan|attach_decky|detach_decky|remove_decky|
|
||||
# update_decky|update_lan
|
||||
op: str = Field(index=True)
|
||||
# JSON-serialised op payload (keys depend on ``op``).
|
||||
payload: str = Field(
|
||||
sa_column=Column("payload", _BIG_TEXT, nullable=False, default="{}")
|
||||
)
|
||||
# pending|applying|applied|failed
|
||||
state: str = Field(default="pending", index=True)
|
||||
requested_at: datetime = Field(
|
||||
default_factory=lambda: datetime.now(timezone.utc), index=True
|
||||
)
|
||||
applied_at: Optional[datetime] = Field(default=None)
|
||||
reason: Optional[str] = Field(
|
||||
default=None, sa_column=Column("reason", Text, nullable=True)
|
||||
)
|
||||
|
||||
|
||||
# --- API Request/Response Models (Pydantic) ---
|
||||
|
||||
class Token(BaseModel):
|
||||
|
||||
@@ -327,3 +327,41 @@ class BaseRepository(ABC):
|
||||
self, edge_id: str, *, expected_version: Optional[int] = None
|
||||
) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
# -------------------- live mutation queue (reconciler) --------------------
|
||||
|
||||
async def enqueue_topology_mutation(
|
||||
self,
|
||||
topology_id: str,
|
||||
op: str,
|
||||
payload: dict[str, Any],
|
||||
*,
|
||||
expected_version: Optional[int] = None,
|
||||
) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
async def claim_next_mutation(
|
||||
self, topology_id: str
|
||||
) -> Optional[dict[str, Any]]:
|
||||
raise NotImplementedError
|
||||
|
||||
async def mark_mutation_applied(self, mutation_id: str) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
async def mark_mutation_failed(
|
||||
self, mutation_id: str, reason: str
|
||||
) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
async def list_topology_mutations(
|
||||
self,
|
||||
topology_id: str,
|
||||
state: Optional[str] = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
raise NotImplementedError
|
||||
|
||||
async def has_pending_topology_mutation(self) -> bool:
|
||||
return False
|
||||
|
||||
async def list_live_topology_ids(self) -> list[str]:
|
||||
return []
|
||||
|
||||
@@ -41,6 +41,7 @@ from decnet.web.db.models import (
|
||||
TopologyDecky,
|
||||
TopologyEdge,
|
||||
TopologyStatusEvent,
|
||||
TopologyMutation,
|
||||
)
|
||||
|
||||
|
||||
@@ -1336,3 +1337,165 @@ class SQLModelRepository(BaseRepository):
|
||||
.limit(limit)
|
||||
)
|
||||
return [r.model_dump(mode="json") for r in result.scalars().all()]
|
||||
|
||||
# ---------------- topology_mutations (live reconciler queue) ----------------
|
||||
|
||||
async def enqueue_topology_mutation(
|
||||
self,
|
||||
topology_id: str,
|
||||
op: str,
|
||||
payload: dict[str, Any],
|
||||
*,
|
||||
expected_version: Optional[int] = None,
|
||||
) -> str:
|
||||
"""Append a pending mutation row and bump the topology version.
|
||||
|
||||
Intended for use while the topology is ``active|degraded``; the
|
||||
reconciler picks these rows up on its next tick.
|
||||
"""
|
||||
async with self._session() as session:
|
||||
await self._check_and_bump_version(
|
||||
session, topology_id, expected_version
|
||||
)
|
||||
row = TopologyMutation(
|
||||
topology_id=topology_id,
|
||||
op=op,
|
||||
payload=orjson.dumps(payload).decode(),
|
||||
)
|
||||
session.add(row)
|
||||
await session.commit()
|
||||
await session.refresh(row)
|
||||
return row.id
|
||||
|
||||
async def claim_next_mutation(
|
||||
self, topology_id: str
|
||||
) -> Optional[dict[str, Any]]:
|
||||
"""Atomically claim the oldest pending mutation for ``topology_id``.
|
||||
|
||||
Correctness-critical: this is ONE SQL statement. Splitting it
|
||||
into SELECT-then-UPDATE would let two racing watch-loops both
|
||||
see the same ``pending`` row and both transition it to
|
||||
``applying`` — double-executing the op. With the single
|
||||
``UPDATE ... WHERE id = (SELECT ... LIMIT 1) AND state='pending'``
|
||||
pattern the loser's UPDATE matches zero rows and returns
|
||||
``None`` — that is the expected, non-error outcome under
|
||||
contention.
|
||||
"""
|
||||
async with self._session() as session:
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
# Single-statement atomic claim. The inner SELECT picks the
|
||||
# oldest pending row; the outer UPDATE re-checks state so a
|
||||
# second racer that also saw that id finds state='applying'
|
||||
# and matches zero rows.
|
||||
sql = text(
|
||||
"""
|
||||
UPDATE topology_mutations
|
||||
SET state = 'applying'
|
||||
WHERE id = (
|
||||
SELECT id FROM topology_mutations
|
||||
WHERE topology_id = :t AND state = 'pending'
|
||||
ORDER BY requested_at ASC
|
||||
LIMIT 1
|
||||
)
|
||||
AND state = 'pending'
|
||||
"""
|
||||
)
|
||||
result = await session.execute(sql, {"t": topology_id})
|
||||
if result.rowcount == 0:
|
||||
await session.commit()
|
||||
return None
|
||||
# Re-read the row we just claimed. The post-UPDATE SELECT is
|
||||
# safe: no racer can now transition an ``applying`` row back
|
||||
# to ``pending``.
|
||||
sel = await session.execute(
|
||||
select(TopologyMutation)
|
||||
.where(TopologyMutation.topology_id == topology_id)
|
||||
.where(TopologyMutation.state == "applying")
|
||||
.order_by(asc(TopologyMutation.requested_at))
|
||||
.limit(1)
|
||||
)
|
||||
row = sel.scalar_one_or_none()
|
||||
await session.commit()
|
||||
_ = now
|
||||
if row is None:
|
||||
return None
|
||||
return row.model_dump(mode="json")
|
||||
|
||||
async def mark_mutation_applied(self, mutation_id: str) -> None:
|
||||
async with self._session() as session:
|
||||
await session.execute(
|
||||
text(
|
||||
"UPDATE topology_mutations "
|
||||
"SET state = 'applied', applied_at = :at "
|
||||
"WHERE id = :i"
|
||||
),
|
||||
{
|
||||
"at": datetime.now(timezone.utc).isoformat(),
|
||||
"i": mutation_id,
|
||||
},
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def mark_mutation_failed(
|
||||
self, mutation_id: str, reason: str
|
||||
) -> None:
|
||||
async with self._session() as session:
|
||||
await session.execute(
|
||||
text(
|
||||
"UPDATE topology_mutations "
|
||||
"SET state = 'failed', applied_at = :at, reason = :r "
|
||||
"WHERE id = :i"
|
||||
),
|
||||
{
|
||||
"at": datetime.now(timezone.utc).isoformat(),
|
||||
"r": reason,
|
||||
"i": mutation_id,
|
||||
},
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def list_topology_mutations(
|
||||
self,
|
||||
topology_id: str,
|
||||
state: Optional[str] = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
async with self._session() as session:
|
||||
stmt = (
|
||||
select(TopologyMutation)
|
||||
.where(TopologyMutation.topology_id == topology_id)
|
||||
.order_by(desc(TopologyMutation.requested_at))
|
||||
)
|
||||
if state is not None:
|
||||
stmt = stmt.where(TopologyMutation.state == state)
|
||||
result = await session.execute(stmt)
|
||||
return [r.model_dump(mode="json") for r in result.scalars().all()]
|
||||
|
||||
async def has_pending_topology_mutation(self) -> bool:
|
||||
"""Cheap watch-loop guard: any pending mutation on a live topology?
|
||||
|
||||
Uses the ``ix_topology_mutations_state_topology`` composite index
|
||||
to keep the join cheap at scale. Returns False as soon as the
|
||||
reconciler path should be skipped.
|
||||
"""
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
text(
|
||||
"SELECT 1 FROM topology_mutations "
|
||||
"WHERE state = 'pending' "
|
||||
"AND topology_id IN ("
|
||||
" SELECT id FROM topologies "
|
||||
" WHERE status IN ('active', 'degraded')"
|
||||
") LIMIT 1"
|
||||
)
|
||||
)
|
||||
return result.first() is not None
|
||||
|
||||
async def list_live_topology_ids(self) -> list[str]:
|
||||
"""Return ids of topologies currently in ``active|degraded``."""
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(Topology.id).where(
|
||||
Topology.status.in_(["active", "degraded"])
|
||||
)
|
||||
)
|
||||
return [r for r in result.scalars().all()]
|
||||
|
||||
274
tests/topology/test_mutator.py
Normal file
274
tests/topology/test_mutator.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""Step 7 — topology_mutations queue + mutator reconciler branch."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.mutator import engine as _engine
|
||||
from decnet.mutator.ops import MutationError, apply_add_lan, apply_update_decky
|
||||
from decnet.topology.config import TopologyConfig
|
||||
from decnet.topology.generator import generate
|
||||
from decnet.topology.persistence import persist, transition_status
|
||||
from decnet.topology.status import TopologyStatus, VersionConflict
|
||||
from decnet.web.db.factory import get_repository
|
||||
|
||||
|
||||
def _cfg(**kw) -> TopologyConfig:
|
||||
base = dict(
|
||||
name="mut",
|
||||
depth=1,
|
||||
branching_factor=1,
|
||||
deckies_per_lan_min=2,
|
||||
deckies_per_lan_max=2,
|
||||
cross_edge_probability=0.0,
|
||||
randomize_services=False,
|
||||
services_explicit=["ssh"],
|
||||
seed=9,
|
||||
)
|
||||
base.update(kw)
|
||||
return TopologyConfig(**base)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def repo(tmp_path):
|
||||
r = get_repository(db_path=str(tmp_path / "mut.db"))
|
||||
await r.initialize()
|
||||
return r
|
||||
|
||||
|
||||
async def _make_active(repo) -> str:
|
||||
plan = generate(_cfg())
|
||||
tid = await persist(repo, plan)
|
||||
await transition_status(repo, tid, TopologyStatus.DEPLOYING)
|
||||
await transition_status(repo, tid, TopologyStatus.ACTIVE)
|
||||
return tid
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- queue
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_enqueue_bumps_topology_version(repo):
|
||||
tid = await _make_active(repo)
|
||||
before = (await repo.get_topology(tid))["version"]
|
||||
mid = await repo.enqueue_topology_mutation(
|
||||
tid, "add_lan", {"name": "LAN-X", "subnet": "172.20.77.0/24"},
|
||||
expected_version=before,
|
||||
)
|
||||
topo = await repo.get_topology(tid)
|
||||
assert topo["version"] == before + 1
|
||||
rows = await repo.list_topology_mutations(tid)
|
||||
assert rows[0]["id"] == mid
|
||||
assert rows[0]["state"] == "pending"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_enqueue_version_conflict(repo):
|
||||
tid = await _make_active(repo)
|
||||
await repo.enqueue_topology_mutation(
|
||||
tid, "add_lan", {"name": "LAN-X", "subnet": "172.20.77.0/24"},
|
||||
expected_version=1,
|
||||
)
|
||||
with pytest.raises(VersionConflict):
|
||||
await repo.enqueue_topology_mutation(
|
||||
tid, "add_lan", {"name": "LAN-Y", "subnet": "172.20.78.0/24"},
|
||||
expected_version=1, # stale — version is now 2
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_claim_next_mutation_is_atomic_single_winner(repo):
|
||||
"""Two simulated watch loops; only one claims the row."""
|
||||
tid = await _make_active(repo)
|
||||
await repo.enqueue_topology_mutation(
|
||||
tid, "add_lan", {"name": "LAN-X"},
|
||||
)
|
||||
# Sequential simulated races: because the claim is a single SQL
|
||||
# UPDATE with ``WHERE state='pending'``, the second call observes
|
||||
# state='applying' and returns None rather than re-claiming.
|
||||
first = await repo.claim_next_mutation(tid)
|
||||
second = await repo.claim_next_mutation(tid)
|
||||
assert first is not None
|
||||
assert second is None
|
||||
assert first["state"] == "applying"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_claim_none_when_empty(repo):
|
||||
tid = await _make_active(repo)
|
||||
assert await repo.claim_next_mutation(tid) is None
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_mark_applied_and_failed(repo):
|
||||
tid = await _make_active(repo)
|
||||
mid1 = await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "A"})
|
||||
mid2 = await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "B"})
|
||||
await repo.claim_next_mutation(tid)
|
||||
await repo.mark_mutation_applied(mid1)
|
||||
await repo.claim_next_mutation(tid)
|
||||
await repo.mark_mutation_failed(mid2, "boom")
|
||||
|
||||
by_id = {r["id"]: r for r in await repo.list_topology_mutations(tid)}
|
||||
assert by_id[mid1]["state"] == "applied"
|
||||
assert by_id[mid2]["state"] == "failed"
|
||||
assert by_id[mid2]["reason"] == "boom"
|
||||
|
||||
|
||||
# --------------------------------------------------------------- guard query
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_guard_false_without_pending_or_live(repo):
|
||||
# No topologies at all.
|
||||
assert await repo.has_pending_topology_mutation() is False
|
||||
# Pending topology with a mutation (but not live) — guard stays False.
|
||||
plan = generate(_cfg())
|
||||
tid = await persist(repo, plan)
|
||||
# enqueue_topology_mutation doesn't require status, but pending
|
||||
# topologies don't trip the guard.
|
||||
await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "Z"})
|
||||
assert await repo.has_pending_topology_mutation() is False
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_guard_true_with_live_pending(repo):
|
||||
tid = await _make_active(repo)
|
||||
await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "Z"})
|
||||
assert await repo.has_pending_topology_mutation() is True
|
||||
# After claiming, the pending row becomes applying — guard drops.
|
||||
await repo.claim_next_mutation(tid)
|
||||
assert await repo.has_pending_topology_mutation() is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- ops
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_apply_add_lan_persists(repo):
|
||||
tid = await _make_active(repo)
|
||||
await apply_add_lan(
|
||||
repo, tid, {"name": "LAN-MUT", "subnet": "172.20.55.0/24"}
|
||||
)
|
||||
names = {l["name"] for l in await repo.list_lans_for_topology(tid)}
|
||||
assert "LAN-MUT" in names
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_apply_rejected_on_validator_error(repo):
|
||||
"""Unknown service name must trip the post-apply validator."""
|
||||
tid = await _make_active(repo)
|
||||
decky = (await repo.list_topology_deckies(tid))[0]
|
||||
with pytest.raises(MutationError):
|
||||
await apply_update_decky(
|
||||
repo, tid,
|
||||
{
|
||||
"decky": decky["decky_config"]["name"],
|
||||
# service_config for an undeclared service trips
|
||||
# SERVICE_CFG_UNDECLARED in the post-apply invariants.
|
||||
"patch": {"service_config": {"telnet": {"banner": "x"}}},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------- reconciler flow
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reconcile_applies_pending_mutation(repo):
|
||||
tid = await _make_active(repo)
|
||||
await repo.enqueue_topology_mutation(
|
||||
tid, "add_lan",
|
||||
{"name": "LAN-RECON", "subnet": "172.20.44.0/24"},
|
||||
)
|
||||
drained = await _engine.reconcile_topologies(repo)
|
||||
assert drained == 1
|
||||
names = {l["name"] for l in await repo.list_lans_for_topology(tid)}
|
||||
assert "LAN-RECON" in names
|
||||
# Mutation row is now applied.
|
||||
state = {r["state"] for r in await repo.list_topology_mutations(tid)}
|
||||
assert state == {"applied"}
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reconcile_failed_mutation_degrades_topology(repo):
|
||||
tid = await _make_active(repo)
|
||||
existing = (await repo.list_lans_for_topology(tid))[0]["name"]
|
||||
# Validator will reject duplicate LAN name → failure path.
|
||||
await repo.enqueue_topology_mutation(
|
||||
tid, "add_lan", {"name": existing, "subnet": "172.20.88.0/24"},
|
||||
)
|
||||
drained = await _engine.reconcile_topologies(repo)
|
||||
assert drained == 0
|
||||
mut = (await repo.list_topology_mutations(tid))[0]
|
||||
assert mut["state"] == "failed"
|
||||
topo = await repo.get_topology(tid)
|
||||
assert topo["status"] == TopologyStatus.DEGRADED
|
||||
|
||||
|
||||
# ----------------------------------------------------- watch-loop guard isolation
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_watch_loop_guard_skips_reconciler_when_idle(
|
||||
repo, monkeypatch
|
||||
):
|
||||
"""Tick with no live topology + no pending mutations ⇒ reconciler not called.
|
||||
|
||||
Also asserts flat-fleet ``mutate_all`` runs every tick, unchanged.
|
||||
"""
|
||||
calls = {"mutate_all": 0, "reconcile": 0}
|
||||
|
||||
async def _fake_mutate_all(force=False, repo=None):
|
||||
calls["mutate_all"] += 1
|
||||
|
||||
async def _fake_reconcile(r):
|
||||
calls["reconcile"] += 1
|
||||
return 0
|
||||
|
||||
monkeypatch.setattr(_engine, "mutate_all", _fake_mutate_all)
|
||||
monkeypatch.setattr(_engine, "reconcile_topologies", _fake_reconcile)
|
||||
|
||||
# Manually drive one iteration of the loop body.
|
||||
await _engine.mutate_all(force=False, repo=repo)
|
||||
if await repo.has_pending_topology_mutation():
|
||||
await _engine.reconcile_topologies(repo)
|
||||
|
||||
assert calls["mutate_all"] == 1
|
||||
assert calls["reconcile"] == 0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_watch_loop_guard_fires_reconciler_when_work_exists(
|
||||
repo, monkeypatch
|
||||
):
|
||||
tid = await _make_active(repo)
|
||||
await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "X"})
|
||||
|
||||
calls = {"reconcile": 0}
|
||||
|
||||
async def _fake_reconcile(r):
|
||||
calls["reconcile"] += 1
|
||||
return 0
|
||||
|
||||
monkeypatch.setattr(_engine, "reconcile_topologies", _fake_reconcile)
|
||||
|
||||
if await repo.has_pending_topology_mutation():
|
||||
await _engine.reconcile_topologies(repo)
|
||||
|
||||
assert calls["reconcile"] == 1
|
||||
|
||||
|
||||
def test_ops_payload_shape_docstring_present():
|
||||
"""Smoke: DISPATCH covers every op name referenced in the plan."""
|
||||
from decnet.mutator.ops import DISPATCH
|
||||
|
||||
assert set(DISPATCH) == {
|
||||
"add_lan", "remove_lan", "attach_decky", "detach_decky",
|
||||
"remove_decky", "update_decky", "update_lan",
|
||||
}
|
||||
|
||||
|
||||
def _payload_json(d: dict) -> str:
|
||||
return json.dumps(d)
|
||||
Reference in New Issue
Block a user