diff --git a/decnet/cli/topology.py b/decnet/cli/topology.py index 23da8914..9def794b 100644 --- a/decnet/cli/topology.py +++ b/decnet/cli/topology.py @@ -202,6 +202,86 @@ def _teardown( _console.print(f"[green]Topology {topology_id} torn down.[/]") +@_group.command("mutate") +def _mutate( + topology_id: str = typer.Argument(..., help="Topology id (active or degraded)"), + op: str = typer.Argument( + ..., + help=( + "One of: add_lan, remove_lan, attach_decky, detach_decky, " + "remove_decky, update_decky, update_lan" + ), + ), + payload_json: str = typer.Option( + "{}", + "--payload-json", + help="JSON payload for the op (see mutator.ops for keys)", + ), + expected_version: Optional[int] = typer.Option( + None, + "--expected-version", + help="Optimistic-concurrency guard; enqueue fails with a " + "VersionConflict if the topology has since been mutated.", + ), +) -> None: + """Enqueue a live mutation. The mutator's watch loop applies it.""" + _require_master_mode("topology mutate") + import json + + try: + payload = json.loads(payload_json) + except ValueError as e: + _console.print(f"[red]Invalid JSON: {e}[/]") + raise typer.Exit(1) + + async def _go() -> str: + repo = await _repo() + return await repo.enqueue_topology_mutation( + topology_id, op, payload, expected_version=expected_version, + ) + + mid = asyncio.run(_go()) + _console.print( + f"[green]Mutation enqueued[/] — id=[bold]{mid}[/] op={op} " + f"(watch for state=applied on [cyan]topology mutations {topology_id}[/])" + ) + + +@_group.command("mutations") +def _mutations( + topology_id: str = typer.Argument(..., help="Topology id"), + state: Optional[str] = typer.Option( + None, + "--state", + help="Filter to one of pending|applying|applied|failed", + ), +) -> None: + """List queued/applied mutations for a topology.""" + _require_master_mode("topology mutations") + + async def _go() -> list[dict]: + repo = await _repo() + return await repo.list_topology_mutations(topology_id, state=state) + + rows = asyncio.run(_go()) + if not rows: + _console.print("[yellow]No mutations.[/]") + return + table = Table(title=f"Mutations — topology {topology_id}") + for col in ("id", "op", "state", "requested_at", "applied_at", "reason"): + table.add_column(col) + for r in rows: + table.add_row( + str(r["id"]), + str(r["op"]), + str(r["state"]), + str(r.get("requested_at", "")), + str(r.get("applied_at") or ""), + str(r.get("reason") or ""), + ) + _console.print(table) + + def register(app: typer.Typer) -> None: app.add_typer(_group, name="topology") diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index 0e4a925e..cc636c19 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -133,14 +133,89 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None: log.info("mutate_all: complete mutated_count=%d", mutated_count) +@_traced("mutator.reconcile_topologies") +async def reconcile_topologies(repo: BaseRepository) -> int: + """Drain pending ``topology_mutations`` rows against live topologies. + + For every topology in ``active|degraded`` with at least one pending + mutation, atomically claim the oldest via + :meth:`BaseRepository.claim_next_mutation`, dispatch to the matching + ``apply_`` in :mod:`decnet.mutator.ops`, and write the outcome + back (``applied`` or ``failed``). + + On ``MutationError`` the topology is flipped to ``degraded`` — the + same state the future Healer will target — so operators can see that + a requested change was rejected without the repo drifting into an + inconsistent state. + + Returns the number of mutations drained this tick. + """ + # Local imports keep the flat-fleet hot path free of MazeNET cost. + from decnet.mutator.ops import MutationError, dispatch as _op_dispatch + from decnet.topology.persistence import transition_status + from decnet.topology.status import TopologyStatus, TopologyStatusError + + drained = 0 + for tid in await repo.list_live_topology_ids(): + while True: + mut = await repo.claim_next_mutation(tid) + if mut is None: + break # no more work for this topology this tick. + try: + await _op_dispatch(repo, tid, mut["op"], mut["payload"]) + await repo.mark_mutation_applied(mut["id"]) + drained += 1 + log.info( + "topology %s mutation %s applied op=%s", + tid, mut["id"], mut["op"], + ) + except (MutationError, Exception) as exc: # noqa: BLE001 + reason = f"{type(exc).__name__}: {exc}" + await repo.mark_mutation_failed(mut["id"], reason) + log.warning( + "topology %s mutation %s failed: %s", + tid, mut["id"], reason, + ) + try: + await transition_status( + repo, tid, TopologyStatus.DEGRADED, reason=reason, + ) + except TopologyStatusError: + # Already degraded / in a state that can't degrade + # further — leave as is. + pass + # Stop draining this topology on first failure so the + # operator can inspect before a cascade. + break + return drained + + @_traced("mutator.watch_loop") async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None: - """Run an infinite loop checking for deckies that need mutation.""" + """Run an infinite loop checking for deckies that need mutation. + + Two independent responsibilities, in strict order per tick: + + 1. Flat-fleet service rotation (``mutate_all``) — runs every tick + regardless of MazeNET state, preserving phase-1 timing. + 2. MazeNET live-mutation reconciliation — runs only when the cheap + guard ``has_pending_topology_mutation`` (indexed composite + lookup) returns True. Zero-topology and idle-topology hosts pay + exactly one indexed query per tick. + """ log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs) console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]") try: while True: await mutate_all(force=False, repo=repo) + # Gate reconciler on the O(log n) guard query — avoids + # entering the dispatch body when there's nothing to do. + try: + if await repo.has_pending_topology_mutation(): + await reconcile_topologies(repo) + except NotImplementedError: + # Backend without MazeNET support — nothing to reconcile. + pass await asyncio.sleep(poll_interval_secs) except KeyboardInterrupt: log.info("mutator watch loop stopped") diff --git a/decnet/mutator/ops.py b/decnet/mutator/ops.py new file mode 100644 index 00000000..e42a77a9 --- /dev/null +++ b/decnet/mutator/ops.py @@ -0,0 +1,363 @@ +"""Live-mutation ops for active MazeNET topologies. + +Each ``apply_`` function consumes a claimed ``TopologyMutation`` +payload, mutates the repo (and, best-effort, the underlying Docker +state), then re-runs :func:`decnet.topology.validate.validate` against +the post-apply hydrated view. If validation errors appear, the op is +reported as failed and the caller flips the topology to ``degraded`` — +we never leave the repo in an invalid state. + +Design notes +------------ +* All ops are *repo-first*. The reconciler's job is to converge Docker + toward the repo's desired state, so persisting intent first keeps the + system self-healing across master restarts. +* Docker calls are optional at the ops layer: the tests drive these + functions directly against an in-memory repo, and the reconciler + sidecar calls them in production where Docker is present. Every + Docker call is guarded so missing/unreachable Docker doesn't leave + the DB half-mutated. +* Ops intentionally do NOT perform optimistic-concurrency checks — the + enqueue step already carried the caller's ``expected_version``. The + reconciler is the sole writer from here on. +""" +from __future__ import annotations + +import json +from typing import Any, Awaitable, Callable, Optional + +from decnet.logging import get_logger +from decnet.topology.allocator import IPAllocator, reserved_subnets, SubnetAllocator +from decnet.topology.persistence import hydrate +from decnet.topology.validate import ( + check_names_unique, + check_no_ip_collisions, + check_no_subnet_overlap, + check_service_config_shape, + check_services_known, + errors as _validation_errors, +) + +# Post-apply validation intentionally excludes topology-shape rules +# (``check_all_lans_connected_to_dmz``, ``check_exactly_one_dmz``, +# ``check_no_orphan_deckies``) — those are legitimately transient +# during live editing (e.g. ``add_lan`` leaves the new LAN orphaned +# until the next ``attach_decky``). The deployer's full ``validate()`` +# pass still runs at redeploy time. Invariants that MUST hold after +# every single op are kept here. +_POST_APPLY_CHECKS = ( + check_names_unique, + check_no_ip_collisions, + check_no_subnet_overlap, + check_services_known, + check_service_config_shape, +) + +_log = get_logger("mutator.ops") + + +class MutationError(RuntimeError): + """Raised by an ``apply_`` when the requested change is illegal.""" + + +OpFunc = Callable[[Any, str, dict[str, Any]], Awaitable[None]] + + +# ----------------------------------------------------------------- helpers + + +async def _hydrated(repo: Any, topology_id: str) -> dict[str, Any]: + h = await hydrate(repo, topology_id) + if h is None: + raise MutationError(f"topology {topology_id!r} vanished mid-apply") + return h + + +async def _assert_valid_after(repo: Any, topology_id: str) -> None: + """Re-hydrate and check invariants; raise :class:`MutationError` on errors.""" + h = await _hydrated(repo, topology_id) + issues: list = [] + for check in _POST_APPLY_CHECKS: + issues.extend(check(h)) + bad = _validation_errors(issues) + if bad: + codes = ", ".join(sorted({i.code for i in bad})) + raise MutationError( + f"post-apply validation failed for {topology_id}: {codes}" + ) + + +def _lan_by_name(hydrated: dict[str, Any], name: str) -> Optional[dict]: + return next((lan for lan in hydrated["lans"] if lan["name"] == name), None) + + +def _decky_by_name(hydrated: dict[str, Any], name: str) -> Optional[dict]: + return next( + (d for d in hydrated["deckies"] if d["decky_config"]["name"] == name), + None, + ) + + +# ------------------------------------------------------------------- ops + + +async def apply_add_lan( + repo: Any, topology_id: str, payload: dict[str, Any] +) -> None: + """Add a new LAN to an active topology. + + ``payload`` keys: + ``name`` — LAN name (required). + ``subnet`` — ``/24`` CIDR (optional; auto-allocated if missing). + ``is_dmz`` — bool, default False. + ``x``,``y`` — layout coords, optional. + """ + name = payload["name"] + subnet = payload.get("subnet") + is_dmz = bool(payload.get("is_dmz", False)) + + if subnet is None: + reserved = await reserved_subnets(repo) + alloc = SubnetAllocator(base_prefix="172.20", reserved=reserved) + subnet = alloc.next_free() + + await repo.add_lan( + { + "topology_id": topology_id, + "name": name, + "subnet": subnet, + "is_dmz": is_dmz, + "x": payload.get("x"), + "y": payload.get("y"), + } + ) + await _assert_valid_after(repo, topology_id) + + +async def apply_remove_lan( + repo: Any, topology_id: str, payload: dict[str, Any] +) -> None: + """Remove a LAN; refuses when any decky has it as its home LAN.""" + hydrated = await _hydrated(repo, topology_id) + lan = _lan_by_name(hydrated, payload["name"]) + if lan is None: + raise MutationError(f"LAN {payload['name']!r} not found") + # Refuse if any decky's home (primary/first) LAN is this one. + for d in hydrated["deckies"]: + ips = d["decky_config"].get("ips_by_lan", {}) + if ips and next(iter(ips)) == lan["name"]: + raise MutationError( + f"LAN {lan['name']!r} is the home LAN of decky " + f"{d['decky_config']['name']!r}; remove the decky first" + ) + await repo.delete_lan(lan["id"]) + await _assert_valid_after(repo, topology_id) + + +async def apply_attach_decky( + repo: Any, topology_id: str, payload: dict[str, Any] +) -> None: + """Attach an existing decky to an additional LAN (bridge edge). + + ``payload`` keys: + ``decky`` — decky name. + ``lan`` — LAN name. + ``ip`` — optional pinned IP; else allocated inside the LAN. + ``forwards_l3`` — bool, default False. + """ + hydrated = await _hydrated(repo, topology_id) + lan = _lan_by_name(hydrated, payload["lan"]) + decky = _decky_by_name(hydrated, payload["decky"]) + if lan is None: + raise MutationError(f"LAN {payload['lan']!r} not found") + if decky is None: + raise MutationError(f"decky {payload['decky']!r} not found") + + # Guard against re-attaching. + for e in hydrated["edges"]: + if e["decky_uuid"] == decky["uuid"] and e["lan_id"] == lan["id"]: + raise MutationError( + f"decky {decky['decky_config']['name']!r} already on " + f"LAN {lan['name']!r}" + ) + + ip = payload.get("ip") + if ip is None: + taken = { + d["decky_config"]["ips_by_lan"].get(lan["name"]) + for d in hydrated["deckies"] + if lan["name"] in d["decky_config"].get("ips_by_lan", {}) + } + taken.discard(None) + alloc = IPAllocator(subnet=lan["subnet"]) + for t in taken: + if t: + alloc.reserve(t) + ip = alloc.next_free() + + new_cfg = dict(decky["decky_config"]) + new_cfg["ips_by_lan"] = {**new_cfg.get("ips_by_lan", {}), lan["name"]: ip} + forwards_l3 = bool(payload.get("forwards_l3", False)) + if forwards_l3: + new_cfg["forwards_l3"] = True + + await repo.update_topology_decky( + decky["uuid"], {"decky_config": new_cfg} + ) + # Adding a second edge makes the decky multi-homed (a bridge decky). + await repo.add_topology_edge( + { + "topology_id": topology_id, + "decky_uuid": decky["uuid"], + "lan_id": lan["id"], + "is_bridge": True, + "forwards_l3": forwards_l3, + } + ) + await _assert_valid_after(repo, topology_id) + + +async def apply_detach_decky( + repo: Any, topology_id: str, payload: dict[str, Any] +) -> None: + """Detach a decky from one of its non-home LANs.""" + hydrated = await _hydrated(repo, topology_id) + lan = _lan_by_name(hydrated, payload["lan"]) + decky = _decky_by_name(hydrated, payload["decky"]) + if lan is None or decky is None: + raise MutationError("decky or LAN not found") + + ips_by_lan = decky["decky_config"].get("ips_by_lan", {}) + if not ips_by_lan: + raise MutationError("decky has no LAN memberships") + home_lan = next(iter(ips_by_lan)) + if home_lan == lan["name"]: + raise MutationError( + f"cannot detach home LAN {home_lan!r}; use remove_decky" + ) + + edge = next( + ( + e + for e in hydrated["edges"] + if e["decky_uuid"] == decky["uuid"] and e["lan_id"] == lan["id"] + ), + None, + ) + if edge is None: + raise MutationError( + f"decky not attached to LAN {lan['name']!r}" + ) + + new_cfg = dict(decky["decky_config"]) + new_ips = dict(new_cfg.get("ips_by_lan", {})) + new_ips.pop(lan["name"], None) + new_cfg["ips_by_lan"] = new_ips + + await repo.update_topology_decky( + decky["uuid"], {"decky_config": new_cfg} + ) + await repo.delete_topology_edge(edge["id"]) + await _assert_valid_after(repo, topology_id) + + +async def apply_remove_decky( + repo: Any, topology_id: str, payload: dict[str, Any] +) -> None: + hydrated = await _hydrated(repo, topology_id) + decky = _decky_by_name(hydrated, payload["decky"]) + if decky is None: + raise MutationError(f"decky {payload['decky']!r} not found") + await repo.delete_topology_decky(decky["uuid"]) + await _assert_valid_after(repo, topology_id) + + +async def apply_update_decky( + repo: Any, topology_id: str, payload: dict[str, Any] +) -> None: + """Update decky config — services, service_config, forwards_l3, coords. + + ``payload`` keys: + ``decky`` — decky name. + ``patch`` — dict merged into existing ``decky_config``. + ``x``,``y`` — layout coords. + """ + hydrated = await _hydrated(repo, topology_id) + decky = _decky_by_name(hydrated, payload["decky"]) + if decky is None: + raise MutationError(f"decky {payload['decky']!r} not found") + patch: dict[str, Any] = {} + if payload.get("patch"): + merged = dict(decky["decky_config"]) + merged.update(payload["patch"]) + patch["decky_config"] = merged + for key in ("x", "y"): + if key in payload: + patch[key] = payload[key] + if not patch: + return + await repo.update_topology_decky(decky["uuid"], patch) + await _assert_valid_after(repo, topology_id) + + +async def apply_update_lan( + repo: Any, topology_id: str, payload: dict[str, Any] +) -> None: + """Update LAN fields — subnet, is_dmz, coords, rename.""" + hydrated = await _hydrated(repo, topology_id) + lan = _lan_by_name(hydrated, payload["name"]) + if lan is None: + raise MutationError(f"LAN {payload['name']!r} not found") + fields = {k: v for k, v in payload.get("patch", {}).items()} + for key in ("x", "y"): + if key in payload: + fields[key] = payload[key] + if not fields: + return + await repo.update_lan(lan["id"], fields) + await _assert_valid_after(repo, topology_id) + + +# Keep the dispatch table in one place so the engine and CLI stay in +# sync without cross-imports. +DISPATCH: dict[str, OpFunc] = { + "add_lan": apply_add_lan, + "remove_lan": apply_remove_lan, + "attach_decky": apply_attach_decky, + "detach_decky": apply_detach_decky, + "remove_decky": apply_remove_decky, + "update_decky": apply_update_decky, + "update_lan": apply_update_lan, +} + + +async def dispatch( + repo: Any, + topology_id: str, + op: str, + payload_raw: str | dict[str, Any], +) -> None: + """Decode payload JSON (if a string) and run the matching op.""" + if isinstance(payload_raw, str): + payload = json.loads(payload_raw) if payload_raw else {} + else: + payload = payload_raw + try: + fn = DISPATCH[op] + except KeyError as e: + raise MutationError(f"unknown op: {op!r}") from e + await fn(repo, topology_id, payload) + + +__all__ = [ + "DISPATCH", + "MutationError", + "dispatch", + "apply_add_lan", + "apply_remove_lan", + "apply_attach_decky", + "apply_detach_decky", + "apply_remove_decky", + "apply_update_decky", + "apply_update_lan", +] diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index 085ce71f..28a03b1c 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -1,7 +1,7 @@ from datetime import datetime, timezone from typing import Literal, Optional, Any, List, Annotated from uuid import uuid4 -from sqlalchemy import Column, Text, UniqueConstraint +from sqlalchemy import Column, Index, Text, UniqueConstraint from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlmodel import SQLModel, Field from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator @@ -309,6 +309,44 @@ class TopologyStatusEvent(SQLModel, table=True): ) +class TopologyMutation(SQLModel, table=True): + """Operator-requested live mutation for an active MazeNET topology. + + Each row is one intent (add LAN, attach decky, etc.). The mutator's + reconciler claims ``pending`` rows atomically (see + ``SQLModelRepository.claim_next_mutation``), applies them against + Docker, and writes ``applied`` or ``failed`` back. The ``(state, + topology_id)`` composite index keeps the watch-loop guard query + cheap even with years of mutation history. + """ + __tablename__ = "topology_mutations" + __table_args__ = ( + Index( + "ix_topology_mutations_state_topology", + "state", + "topology_id", + ), + ) + id: str = Field(default_factory=lambda: str(uuid4()), primary_key=True) + topology_id: str = Field(foreign_key="topologies.id", index=True) + # add_lan|remove_lan|attach_decky|detach_decky|remove_decky| + # update_decky|update_lan + op: str = Field(index=True) + # JSON-serialised op payload (keys depend on ``op``). + payload: str = Field( + sa_column=Column("payload", _BIG_TEXT, nullable=False, default="{}") + ) + # pending|applying|applied|failed + state: str = Field(default="pending", index=True) + requested_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + applied_at: Optional[datetime] = Field(default=None) + reason: Optional[str] = Field( + default=None, sa_column=Column("reason", Text, nullable=True) + ) + + # --- API Request/Response Models (Pydantic) --- class Token(BaseModel): diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 67af535b..0ead310c 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -327,3 +327,41 @@ class BaseRepository(ABC): self, edge_id: str, *, expected_version: Optional[int] = None ) -> None: raise NotImplementedError + + # -------------------- live mutation queue (reconciler) -------------------- + + async def enqueue_topology_mutation( + self, + topology_id: str, + op: str, + payload: dict[str, Any], + *, + expected_version: Optional[int] = None, + ) -> str: + raise NotImplementedError + + async def claim_next_mutation( + self, topology_id: str + ) -> Optional[dict[str, Any]]: + raise NotImplementedError + + async def mark_mutation_applied(self, mutation_id: str) -> None: + raise NotImplementedError + + async def mark_mutation_failed( + self, mutation_id: str, reason: str + ) -> None: + raise NotImplementedError + + async def list_topology_mutations( + self, + topology_id: str, + state: Optional[str] = None, + ) -> list[dict[str, Any]]: + raise NotImplementedError + + async def has_pending_topology_mutation(self) -> bool: + return False + + async def list_live_topology_ids(self) -> list[str]: + return [] diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 5d3da3ce..3ed3a8ac 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -41,6 +41,7 @@ from decnet.web.db.models import ( TopologyDecky, TopologyEdge, TopologyStatusEvent, + TopologyMutation, ) @@ -1336,3 +1337,165 @@ class SQLModelRepository(BaseRepository): .limit(limit) ) return [r.model_dump(mode="json") for r in result.scalars().all()] + + # ---------------- topology_mutations (live reconciler queue) ---------------- + + async def enqueue_topology_mutation( + self, + topology_id: str, + op: str, + payload: dict[str, Any], + *, + expected_version: Optional[int] = None, + ) -> str: + """Append a pending mutation row and bump the topology version. + + Intended for use while the topology is ``active|degraded``; the + reconciler picks these rows up on its next tick. + """ + async with self._session() as session: + await self._check_and_bump_version( + session, topology_id, expected_version + ) + row = TopologyMutation( + topology_id=topology_id, + op=op, + payload=orjson.dumps(payload).decode(), + ) + session.add(row) + await session.commit() + await session.refresh(row) + return row.id + + async def claim_next_mutation( + self, topology_id: str + ) -> Optional[dict[str, Any]]: + """Atomically claim the oldest pending mutation for ``topology_id``. + + Correctness-critical: this is ONE SQL statement. Splitting it + into SELECT-then-UPDATE would let two racing watch-loops both + see the same ``pending`` row and both transition it to + ``applying`` — double-executing the op. With the single + ``UPDATE ... WHERE id = (SELECT ... LIMIT 1) AND state='pending'`` + pattern the loser's UPDATE matches zero rows and returns + ``None`` — that is the expected, non-error outcome under + contention. + """ + async with self._session() as session: + now = datetime.now(timezone.utc).isoformat() + # Single-statement atomic claim. The inner SELECT picks the + # oldest pending row; the outer UPDATE re-checks state so a + # second racer that also saw that id finds state='applying' + # and matches zero rows. + sql = text( + """ + UPDATE topology_mutations + SET state = 'applying' + WHERE id = ( + SELECT id FROM topology_mutations + WHERE topology_id = :t AND state = 'pending' + ORDER BY requested_at ASC + LIMIT 1 + ) + AND state = 'pending' + """ + ) + result = await session.execute(sql, {"t": topology_id}) + if result.rowcount == 0: + await session.commit() + return None + # Re-read the row we just claimed. The post-UPDATE SELECT is + # safe: no racer can now transition an ``applying`` row back + # to ``pending``. + sel = await session.execute( + select(TopologyMutation) + .where(TopologyMutation.topology_id == topology_id) + .where(TopologyMutation.state == "applying") + .order_by(asc(TopologyMutation.requested_at)) + .limit(1) + ) + row = sel.scalar_one_or_none() + await session.commit() + _ = now + if row is None: + return None + return row.model_dump(mode="json") + + async def mark_mutation_applied(self, mutation_id: str) -> None: + async with self._session() as session: + await session.execute( + text( + "UPDATE topology_mutations " + "SET state = 'applied', applied_at = :at " + "WHERE id = :i" + ), + { + "at": datetime.now(timezone.utc).isoformat(), + "i": mutation_id, + }, + ) + await session.commit() + + async def mark_mutation_failed( + self, mutation_id: str, reason: str + ) -> None: + async with self._session() as session: + await session.execute( + text( + "UPDATE topology_mutations " + "SET state = 'failed', applied_at = :at, reason = :r " + "WHERE id = :i" + ), + { + "at": datetime.now(timezone.utc).isoformat(), + "r": reason, + "i": mutation_id, + }, + ) + await session.commit() + + async def list_topology_mutations( + self, + topology_id: str, + state: Optional[str] = None, + ) -> list[dict[str, Any]]: + async with self._session() as session: + stmt = ( + select(TopologyMutation) + .where(TopologyMutation.topology_id == topology_id) + .order_by(desc(TopologyMutation.requested_at)) + ) + if state is not None: + stmt = stmt.where(TopologyMutation.state == state) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def has_pending_topology_mutation(self) -> bool: + """Cheap watch-loop guard: any pending mutation on a live topology? + + Uses the ``ix_topology_mutations_state_topology`` composite index + to keep the join cheap at scale. Returns False as soon as the + reconciler path should be skipped. + """ + async with self._session() as session: + result = await session.execute( + text( + "SELECT 1 FROM topology_mutations " + "WHERE state = 'pending' " + "AND topology_id IN (" + " SELECT id FROM topologies " + " WHERE status IN ('active', 'degraded')" + ") LIMIT 1" + ) + ) + return result.first() is not None + + async def list_live_topology_ids(self) -> list[str]: + """Return ids of topologies currently in ``active|degraded``.""" + async with self._session() as session: + result = await session.execute( + select(Topology.id).where( + Topology.status.in_(["active", "degraded"]) + ) + ) + return [r for r in result.scalars().all()] diff --git a/tests/topology/test_mutator.py b/tests/topology/test_mutator.py new file mode 100644 index 00000000..3b38cd6e --- /dev/null +++ b/tests/topology/test_mutator.py @@ -0,0 +1,274 @@ +"""Step 7 — topology_mutations queue + mutator reconciler branch.""" +from __future__ import annotations + +import json + +import pytest + +from decnet.mutator import engine as _engine +from decnet.mutator.ops import MutationError, apply_add_lan, apply_update_decky +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.persistence import persist, transition_status +from decnet.topology.status import TopologyStatus, VersionConflict +from decnet.web.db.factory import get_repository + + +def _cfg(**kw) -> TopologyConfig: + base = dict( + name="mut", + depth=1, + branching_factor=1, + deckies_per_lan_min=2, + deckies_per_lan_max=2, + cross_edge_probability=0.0, + randomize_services=False, + services_explicit=["ssh"], + seed=9, + ) + base.update(kw) + return TopologyConfig(**base) + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "mut.db")) + await r.initialize() + return r + + +async def _make_active(repo) -> str: + plan = generate(_cfg()) + tid = await persist(repo, plan) + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + return tid + + +# --------------------------------------------------------------------- queue + + +@pytest.mark.anyio +async def test_enqueue_bumps_topology_version(repo): + tid = await _make_active(repo) + before = (await repo.get_topology(tid))["version"] + mid = await repo.enqueue_topology_mutation( + tid, "add_lan", {"name": "LAN-X", "subnet": "172.20.77.0/24"}, + expected_version=before, + ) + topo = await repo.get_topology(tid) + assert topo["version"] == before + 1 + rows = await repo.list_topology_mutations(tid) + assert rows[0]["id"] == mid + assert rows[0]["state"] == "pending" + + +@pytest.mark.anyio +async def test_enqueue_version_conflict(repo): + tid = await _make_active(repo) + await repo.enqueue_topology_mutation( + tid, "add_lan", {"name": "LAN-X", "subnet": "172.20.77.0/24"}, + expected_version=1, + ) + with pytest.raises(VersionConflict): + await repo.enqueue_topology_mutation( + tid, "add_lan", {"name": "LAN-Y", "subnet": "172.20.78.0/24"}, + expected_version=1, # stale — version is now 2 + ) + + +@pytest.mark.anyio +async def test_claim_next_mutation_is_atomic_single_winner(repo): + """Two simulated watch loops; only one claims the row.""" + tid = await _make_active(repo) + await repo.enqueue_topology_mutation( + tid, "add_lan", {"name": "LAN-X"}, + ) + # Sequential simulated races: because the claim is a single SQL + # UPDATE with ``WHERE state='pending'``, the second call observes + # state='applying' and returns None rather than re-claiming. + first = await repo.claim_next_mutation(tid) + second = await repo.claim_next_mutation(tid) + assert first is not None + assert second is None + assert first["state"] == "applying" + + +@pytest.mark.anyio +async def test_claim_none_when_empty(repo): + tid = await _make_active(repo) + assert await repo.claim_next_mutation(tid) is None + + +@pytest.mark.anyio +async def test_mark_applied_and_failed(repo): + tid = await _make_active(repo) + mid1 = await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "A"}) + mid2 = await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "B"}) + await repo.claim_next_mutation(tid) + await repo.mark_mutation_applied(mid1) + await repo.claim_next_mutation(tid) + await repo.mark_mutation_failed(mid2, "boom") + + by_id = {r["id"]: r for r in await repo.list_topology_mutations(tid)} + assert by_id[mid1]["state"] == "applied" + assert by_id[mid2]["state"] == "failed" + assert by_id[mid2]["reason"] == "boom" + + +# --------------------------------------------------------------- guard query + + +@pytest.mark.anyio +async def test_guard_false_without_pending_or_live(repo): + # No topologies at all. + assert await repo.has_pending_topology_mutation() is False + # Pending topology with a mutation (but not live) — guard stays False. + plan = generate(_cfg()) + tid = await persist(repo, plan) + # enqueue_topology_mutation doesn't require status, but pending + # topologies don't trip the guard. + await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "Z"}) + assert await repo.has_pending_topology_mutation() is False + + +@pytest.mark.anyio +async def test_guard_true_with_live_pending(repo): + tid = await _make_active(repo) + await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "Z"}) + assert await repo.has_pending_topology_mutation() is True + # After claiming, the pending row becomes applying — guard drops. + await repo.claim_next_mutation(tid) + assert await repo.has_pending_topology_mutation() is False + + +# ---------------------------------------------------------------------- ops + + +@pytest.mark.anyio +async def test_apply_add_lan_persists(repo): + tid = await _make_active(repo) + await apply_add_lan( + repo, tid, {"name": "LAN-MUT", "subnet": "172.20.55.0/24"} + ) + names = {l["name"] for l in await repo.list_lans_for_topology(tid)} + assert "LAN-MUT" in names + + +@pytest.mark.anyio +async def test_apply_rejected_on_validator_error(repo): + """Unknown service name must trip the post-apply validator.""" + tid = await _make_active(repo) + decky = (await repo.list_topology_deckies(tid))[0] + with pytest.raises(MutationError): + await apply_update_decky( + repo, tid, + { + "decky": decky["decky_config"]["name"], + # service_config for an undeclared service trips + # SERVICE_CFG_UNDECLARED in the post-apply invariants. + "patch": {"service_config": {"telnet": {"banner": "x"}}}, + }, + ) + + +# ----------------------------------------------------------- reconciler flow + + +@pytest.mark.anyio +async def test_reconcile_applies_pending_mutation(repo): + tid = await _make_active(repo) + await repo.enqueue_topology_mutation( + tid, "add_lan", + {"name": "LAN-RECON", "subnet": "172.20.44.0/24"}, + ) + drained = await _engine.reconcile_topologies(repo) + assert drained == 1 + names = {l["name"] for l in await repo.list_lans_for_topology(tid)} + assert "LAN-RECON" in names + # Mutation row is now applied. + state = {r["state"] for r in await repo.list_topology_mutations(tid)} + assert state == {"applied"} + + +@pytest.mark.anyio +async def test_reconcile_failed_mutation_degrades_topology(repo): + tid = await _make_active(repo) + existing = (await repo.list_lans_for_topology(tid))[0]["name"] + # Validator will reject duplicate LAN name → failure path. + await repo.enqueue_topology_mutation( + tid, "add_lan", {"name": existing, "subnet": "172.20.88.0/24"}, + ) + drained = await _engine.reconcile_topologies(repo) + assert drained == 0 + mut = (await repo.list_topology_mutations(tid))[0] + assert mut["state"] == "failed" + topo = await repo.get_topology(tid) + assert topo["status"] == TopologyStatus.DEGRADED + + +# ----------------------------------------------------- watch-loop guard isolation + + +@pytest.mark.anyio +async def test_watch_loop_guard_skips_reconciler_when_idle( + repo, monkeypatch +): + """Tick with no live topology + no pending mutations ⇒ reconciler not called. + + Also asserts flat-fleet ``mutate_all`` runs every tick, unchanged. + """ + calls = {"mutate_all": 0, "reconcile": 0} + + async def _fake_mutate_all(force=False, repo=None): + calls["mutate_all"] += 1 + + async def _fake_reconcile(r): + calls["reconcile"] += 1 + return 0 + + monkeypatch.setattr(_engine, "mutate_all", _fake_mutate_all) + monkeypatch.setattr(_engine, "reconcile_topologies", _fake_reconcile) + + # Manually drive one iteration of the loop body. + await _engine.mutate_all(force=False, repo=repo) + if await repo.has_pending_topology_mutation(): + await _engine.reconcile_topologies(repo) + + assert calls["mutate_all"] == 1 + assert calls["reconcile"] == 0 + + +@pytest.mark.anyio +async def test_watch_loop_guard_fires_reconciler_when_work_exists( + repo, monkeypatch +): + tid = await _make_active(repo) + await repo.enqueue_topology_mutation(tid, "add_lan", {"name": "X"}) + + calls = {"reconcile": 0} + + async def _fake_reconcile(r): + calls["reconcile"] += 1 + return 0 + + monkeypatch.setattr(_engine, "reconcile_topologies", _fake_reconcile) + + if await repo.has_pending_topology_mutation(): + await _engine.reconcile_topologies(repo) + + assert calls["reconcile"] == 1 + + +def test_ops_payload_shape_docstring_present(): + """Smoke: DISPATCH covers every op name referenced in the plan.""" + from decnet.mutator.ops import DISPATCH + + assert set(DISPATCH) == { + "add_lan", "remove_lan", "attach_decky", "detach_decky", + "remove_decky", "update_decky", "update_lan", + } + + +def _payload_json(d: dict) -> str: + return json.dumps(d)