feat(mutator,web): live topology mutation pipeline backend (DEBT-030)

Wire the mutator and web API into the service bus so live-topology
edits flow sub-second from enqueue to UI:

- Mutator publishes every state transition on the bus (mutation.applying
  /applied/failed + topology.status). Fire-and-forget; DB stays source
  of truth.
- Mutator watch loop subscribes to topology.*.mutation.enqueued and
  wakes early via asyncio.Event — the 10s poll becomes a fallback
  heartbeat, not the primary dispatch trigger.
- POST /topologies/{id}/mutations publishes mutation.enqueued after
  the DB write succeeds.
- New GET /topologies/{id}/events SSE route: snapshot on connect
  (status + in-flight mutations), live forwards topology.{id}.>
  bus events, 15s keepalive. ?token= auth mirrors /stream.
- New decnet/bus/app.py — process-wide lazy bus singleton for the
  API, closed cleanly on lifespan shutdown.
This commit is contained in:
2026-04-21 14:38:25 -04:00
parent f0349632c3
commit f611e7363b
6 changed files with 347 additions and 3 deletions

71
decnet/bus/app.py Normal file
View File

@@ -0,0 +1,71 @@
"""Process-wide bus singleton for request-serving workers (API, SSE routes).
A single connected :class:`~decnet.bus.base.BaseBus` shared across request
handlers — opening a UNIX socket per request would be wasteful and add
latency to the hot path. The API lifespan is responsible for calling
:func:`close_app_bus` on shutdown; connect is lazy so tests and
contract-test mode that never hit a publish/subscribe code path don't
pay for a bus connection they'll never use.
Failures during :meth:`BaseBus.connect` are swallowed and logged — a
dead bus must never break request serving. Publishers should treat a
``None`` return from :func:`get_app_bus` as "skip this notification",
same as ``DECNET_BUS_ENABLED=false``.
"""
from __future__ import annotations
import asyncio
from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus
from decnet.logging import get_logger
log = get_logger("bus.app")
_lock = asyncio.Lock()
_shared: BaseBus | None = None
_tried = False
async def get_app_bus() -> BaseBus | None:
"""Return the process-wide connected bus, or ``None`` if unavailable.
On first call, constructs a client via :func:`get_bus` and awaits
``connect()``. Subsequent calls return the cached instance. If the
initial connect raises, we remember the failure and return ``None``
from here on — callers are expected to fall back cleanly.
"""
global _shared, _tried
if _shared is not None:
return _shared
if _tried:
return None
async with _lock:
if _shared is not None:
return _shared
if _tried:
return None
_tried = True
try:
candidate = get_bus(client_name="api")
await candidate.connect()
_shared = candidate
except Exception as exc: # noqa: BLE001
log.warning("app bus unavailable: %s", exc)
return None
return _shared
async def close_app_bus() -> None:
"""Close the shared bus if one is open; reset the tried-once guard.
Call from the API lifespan shutdown. Safe to call multiple times.
"""
global _shared, _tried
bus, _shared = _shared, None
_tried = False
if bus is not None:
try:
await bus.close()
except Exception as exc: # noqa: BLE001
log.warning("app bus close raised: %s", exc)

View File

@@ -20,12 +20,37 @@ from decnet.telemetry import traced as _traced
from pathlib import Path
import anyio
import asyncio
import contextlib
from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus
from decnet.web.db.repository import BaseRepository
log = get_logger("mutator")
console = Console()
async def _publish_safely(
bus: BaseBus | None,
topic: str,
payload: dict,
event_type: str = "",
) -> None:
"""Fire-and-forget bus publish.
A bus failure must never break the reconciler — the DB write already
happened before we got here, so losing the notification is at most a
few seconds of UI latency (the next poll tick picks it up).
"""
if bus is None:
return
try:
await bus.publish(topic, payload, event_type=event_type)
except Exception as exc: # noqa: BLE001
log.warning("bus publish failed topic=%s: %s", topic, exc)
@_traced("mutator.mutate_decky")
async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
"""
@@ -134,7 +159,9 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None:
@_traced("mutator.reconcile_topologies")
async def reconcile_topologies(repo: BaseRepository) -> int:
async def reconcile_topologies(
repo: BaseRepository, bus: BaseBus | None = None,
) -> int:
"""Drain pending ``topology_mutations`` rows against live topologies.
For every topology in ``active|degraded`` with at least one pending
@@ -161,6 +188,12 @@ async def reconcile_topologies(repo: BaseRepository) -> int:
mut = await repo.claim_next_mutation(tid)
if mut is None:
break # no more work for this topology this tick.
await _publish_safely(
bus,
_topics.topology_mutation(tid, _topics.MUTATION_APPLYING),
{"mutation_id": mut["id"], "op": mut["op"], "payload": mut["payload"]},
event_type=_topics.MUTATION_APPLYING,
)
try:
await _op_dispatch(repo, tid, mut["op"], mut["payload"])
await repo.mark_mutation_applied(mut["id"])
@@ -169,6 +202,12 @@ async def reconcile_topologies(repo: BaseRepository) -> int:
"topology %s mutation %s applied op=%s",
tid, mut["id"], mut["op"],
)
await _publish_safely(
bus,
_topics.topology_mutation(tid, _topics.MUTATION_APPLIED),
{"mutation_id": mut["id"], "op": mut["op"]},
event_type=_topics.MUTATION_APPLIED,
)
except (MutationError, Exception) as exc: # noqa: BLE001
reason = f"{type(exc).__name__}: {exc}"
await repo.mark_mutation_failed(mut["id"], reason)
@@ -176,10 +215,22 @@ async def reconcile_topologies(repo: BaseRepository) -> int:
"topology %s mutation %s failed: %s",
tid, mut["id"], reason,
)
await _publish_safely(
bus,
_topics.topology_mutation(tid, _topics.MUTATION_FAILED),
{"mutation_id": mut["id"], "op": mut["op"], "reason": reason},
event_type=_topics.MUTATION_FAILED,
)
try:
await transition_status(
repo, tid, TopologyStatus.DEGRADED, reason=reason,
)
await _publish_safely(
bus,
_topics.topology_status(tid),
{"state": TopologyStatus.DEGRADED, "reason": reason},
event_type=_topics.TOPOLOGY_STATUS,
)
except TopologyStatusError:
# Already degraded / in a state that can't degrade
# further — leave as is.
@@ -239,6 +290,21 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) ->
"""
log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs)
console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]")
# Connect to the bus for publish + wake-on-enqueue. Failure here is
# non-fatal: a mutator without a bus still works, it just runs at
# poll-interval latency and doesn't push notifications to UI clients.
bus: BaseBus | None = None
wake = asyncio.Event()
wake_task: asyncio.Task | None = None
try:
candidate = get_bus(client_name="mutator")
await candidate.connect()
bus = candidate
wake_task = asyncio.create_task(_wake_on_enqueue(bus, wake))
except Exception as exc: # noqa: BLE001
log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc)
try:
while True:
await mutate_all(force=False, repo=repo)
@@ -246,7 +312,7 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) ->
# entering the dispatch body when there's nothing to do.
try:
if await repo.has_pending_topology_mutation():
await reconcile_topologies(repo)
await reconcile_topologies(repo, bus=bus)
except NotImplementedError:
# Backend without MazeNET support — nothing to reconcile.
pass
@@ -256,7 +322,42 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) ->
pass
except Exception:
log.exception("reconcile_agent_resyncs tick raised")
await asyncio.sleep(poll_interval_secs)
# Wait until either poll_interval_secs elapses OR an enqueued
# mutation wakes us early. Clearing before the next tick
# means a second wake during the tick will re-fire after.
try:
await asyncio.wait_for(wake.wait(), timeout=poll_interval_secs)
except asyncio.TimeoutError:
pass
wake.clear()
except KeyboardInterrupt:
log.info("mutator watch loop stopped")
console.print("\n[dim]Mutator watcher stopped.[/]")
finally:
if wake_task is not None:
wake_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await wake_task
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
async def _wake_on_enqueue(bus: BaseBus, wake: asyncio.Event) -> None:
"""Flip *wake* every time a ``mutation.enqueued`` event lands.
Subscribes to the wildcard ``topology.*.mutation.enqueued`` — a single
subscription covers every topology on the host. Runs until cancelled
or the bus closes (NullBus yields nothing and returns immediately,
which is fine: the poll-interval fallback still ticks).
"""
pattern = f"{_topics.TOPOLOGY}.*.mutation.{_topics.MUTATION_ENQUEUED}"
try:
sub = bus.subscribe(pattern)
async with sub:
async for _event in sub:
wake.set()
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
log.warning("mutator: wake subscriber died (%s); falling back to poll", exc)

View File

@@ -127,6 +127,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
pass
except Exception as exc:
log.warning("Task shutdown error: %s", exc)
from decnet.bus.app import close_app_bus
await close_app_bus()
from decnet.telemetry import shutdown_tracing
shutdown_tracing()
log.info("API shutdown complete")

View File

@@ -16,6 +16,7 @@ from .api_decky_crud import router as _decky_router
from .api_delete_topology import router as _delete_router
from .api_deploy_topology import router as _deploy_router
from .api_edge_crud import router as _edge_router
from .api_events import router as _events_router
from .api_get_topology import router as _get_router
from .api_lan_crud import router as _lan_router
from .api_list_topologies import router as _list_router
@@ -40,6 +41,7 @@ topology_router.include_router(_lan_router)
topology_router.include_router(_decky_router)
topology_router.include_router(_edge_router)
topology_router.include_router(_mutations_router)
topology_router.include_router(_events_router)
topology_router.include_router(_get_router)

View File

@@ -0,0 +1,149 @@
"""SSE stream of topology lifecycle events — one connection per editor.
Subscribes to ``topology.<id>.>`` on the :class:`~decnet.bus.base.BaseBus`
for the duration of the request and forwards each matching bus event as
a Server-Sent Event to the browser. Emits a one-shot snapshot on connect
(current status + any in-flight mutations) so the client doesn't need a
separate fetch to initialise the "pending" buffer.
Authorization matches :mod:`decnet.web.router.stream.api_stream_events`
— a JWT passed via the ``?token=`` query parameter (EventSource can't
set arbitrary headers) + ``require_stream_viewer`` role gate. The
per-topology 404 is enforced after auth so existence probes can't leak
a topology id to an unauthenticated caller.
"""
from __future__ import annotations
import asyncio
from typing import AsyncGenerator
import orjson
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from decnet.bus import topics as _topics
from decnet.bus.app import get_app_bus
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_stream_viewer
from ._guards import get_topology_or_404
log = get_logger("api.topology.events")
router = APIRouter()
_KEEPALIVE_SECS = 15.0
_IN_FLIGHT_STATES = ("pending", "applying")
def _format_sse(event_name: str, data: dict) -> str:
"""Build one SSE frame: ``event: <name>\\ndata: <json>\\n\\n``."""
return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n"
@router.get(
"/{topology_id}/events",
tags=["MazeNET Topologies"],
responses={
200: {
"content": {"text/event-stream": {}},
"description": "SSE stream of mutation and status events for one topology",
},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Topology not found"},
},
)
@_traced("api.topology.events")
async def api_topology_events(
topology_id: str,
request: Request,
_user: dict = Depends(require_stream_viewer),
) -> StreamingResponse:
topo = await get_topology_or_404(topology_id)
snapshot_status = topo["status"]
in_flight: list[dict] = []
for state in _IN_FLIGHT_STATES:
in_flight.extend(await repo.list_topology_mutations(topology_id, state=state))
async def generator() -> AsyncGenerator[str, None]:
# Flush headers immediately so the browser's EventSource sees a
# live connection before the first real event arrives.
yield ": keepalive\n\n"
# One-shot snapshot — pair the current topology status with any
# mutations the mutator is still holding, so the client buffer
# can render an accurate "already in flight" state.
yield _format_sse("snapshot", {
"topology_id": topology_id,
"status": snapshot_status,
"in_flight": in_flight,
})
bus = await get_app_bus()
if bus is None:
# Bus disabled (NullBus) or unreachable. The snapshot is
# still useful; we idle on keepalives so the client stays
# connected and will re-poll on its own timers.
while not await request.is_disconnected():
try:
await asyncio.sleep(_KEEPALIVE_SECS)
except asyncio.CancelledError:
break
yield ": keepalive\n\n"
return
sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>")
try:
async with sub:
sub_iter = sub.__aiter__()
while True:
if await request.is_disconnected():
break
next_task = asyncio.ensure_future(sub_iter.__anext__())
try:
event = await asyncio.wait_for(next_task, timeout=_KEEPALIVE_SECS)
except asyncio.TimeoutError:
next_task.cancel()
yield ": keepalive\n\n"
continue
except StopAsyncIteration:
break
# Map the bus event onto an SSE ``event:`` name that
# the frontend can switch on without parsing topics.
yield _format_sse(
_sse_name_for(event.topic),
{
"topic": event.topic,
"type": event.type,
"ts": event.ts,
"payload": event.payload,
},
)
except asyncio.CancelledError:
pass
except Exception:
log.exception("topology events stream crashed topology_id=%s", topology_id)
yield _format_sse("error", {"message": "Stream interrupted"})
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
def _sse_name_for(topic: str) -> str:
"""Derive an SSE ``event:`` name from a bus topic.
``topology.<id>.mutation.applied`` → ``mutation.applied``
``topology.<id>.status`` → ``status``
Anything else is passed through unchanged so future topic families
don't silently collapse onto a generic bucket.
"""
parts = topic.split(".", 2)
return parts[2] if len(parts) >= 3 else topic

View File

@@ -13,6 +13,9 @@ from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from decnet.bus import topics as _topics
from decnet.bus.app import get_app_bus
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.topology.status import (
TopologyStatus,
@@ -27,6 +30,8 @@ from decnet.web.dependencies import repo, require_admin, require_viewer
from ._guards import get_topology_or_404, map_repo_exception
_log = get_logger("api.topology.mutations")
router = APIRouter()
_MUTATABLE: frozenset[str] = frozenset(
@@ -80,6 +85,20 @@ async def api_enqueue_mutation(
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
# Fire-and-forget bus publish so the mutator can wake immediately and
# the SSE route can notify connected editors. Bus failure here must
# never mask a successful enqueue — the DB row is authoritative.
bus = await get_app_bus()
if bus is not None:
try:
await bus.publish(
_topics.topology_mutation(topology_id, _topics.MUTATION_ENQUEUED),
{"mutation_id": mutation_id, "op": body.op, "payload": body.payload},
event_type=_topics.MUTATION_ENQUEUED,
)
except Exception as exc: # noqa: BLE001
_log.warning("bus publish (enqueued) failed: %s", exc)
return MutationEnqueueResponse(mutation_id=mutation_id, state="pending")