diff --git a/decnet/bus/app.py b/decnet/bus/app.py new file mode 100644 index 00000000..ed8e8e87 --- /dev/null +++ b/decnet/bus/app.py @@ -0,0 +1,71 @@ +"""Process-wide bus singleton for request-serving workers (API, SSE routes). + +A single connected :class:`~decnet.bus.base.BaseBus` shared across request +handlers — opening a UNIX socket per request would be wasteful and add +latency to the hot path. The API lifespan is responsible for calling +:func:`close_app_bus` on shutdown; connect is lazy so tests and +contract-test mode that never hit a publish/subscribe code path don't +pay for a bus connection they'll never use. + +Failures during :meth:`BaseBus.connect` are swallowed and logged — a +dead bus must never break request serving. Publishers should treat a +``None`` return from :func:`get_app_bus` as "skip this notification", +same as ``DECNET_BUS_ENABLED=false``. +""" +from __future__ import annotations + +import asyncio + +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus +from decnet.logging import get_logger + +log = get_logger("bus.app") + +_lock = asyncio.Lock() +_shared: BaseBus | None = None +_tried = False + + +async def get_app_bus() -> BaseBus | None: + """Return the process-wide connected bus, or ``None`` if unavailable. + + On first call, constructs a client via :func:`get_bus` and awaits + ``connect()``. Subsequent calls return the cached instance. If the + initial connect raises, we remember the failure and return ``None`` + from here on — callers are expected to fall back cleanly. + """ + global _shared, _tried + if _shared is not None: + return _shared + if _tried: + return None + async with _lock: + if _shared is not None: + return _shared + if _tried: + return None + _tried = True + try: + candidate = get_bus(client_name="api") + await candidate.connect() + _shared = candidate + except Exception as exc: # noqa: BLE001 + log.warning("app bus unavailable: %s", exc) + return None + return _shared + + +async def close_app_bus() -> None: + """Close the shared bus if one is open; reset the tried-once guard. + + Call from the API lifespan shutdown. Safe to call multiple times. + """ + global _shared, _tried + bus, _shared = _shared, None + _tried = False + if bus is not None: + try: + await bus.close() + except Exception as exc: # noqa: BLE001 + log.warning("app bus close raised: %s", exc) diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index c9364b0d..4f3f9192 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -20,12 +20,37 @@ from decnet.telemetry import traced as _traced from pathlib import Path import anyio import asyncio +import contextlib + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus from decnet.web.db.repository import BaseRepository log = get_logger("mutator") console = Console() +async def _publish_safely( + bus: BaseBus | None, + topic: str, + payload: dict, + event_type: str = "", +) -> None: + """Fire-and-forget bus publish. + + A bus failure must never break the reconciler — the DB write already + happened before we got here, so losing the notification is at most a + few seconds of UI latency (the next poll tick picks it up). + """ + if bus is None: + return + try: + await bus.publish(topic, payload, event_type=event_type) + except Exception as exc: # noqa: BLE001 + log.warning("bus publish failed topic=%s: %s", topic, exc) + + @_traced("mutator.mutate_decky") async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool: """ @@ -134,7 +159,9 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None: @_traced("mutator.reconcile_topologies") -async def reconcile_topologies(repo: BaseRepository) -> int: +async def reconcile_topologies( + repo: BaseRepository, bus: BaseBus | None = None, +) -> int: """Drain pending ``topology_mutations`` rows against live topologies. For every topology in ``active|degraded`` with at least one pending @@ -161,6 +188,12 @@ async def reconcile_topologies(repo: BaseRepository) -> int: mut = await repo.claim_next_mutation(tid) if mut is None: break # no more work for this topology this tick. + await _publish_safely( + bus, + _topics.topology_mutation(tid, _topics.MUTATION_APPLYING), + {"mutation_id": mut["id"], "op": mut["op"], "payload": mut["payload"]}, + event_type=_topics.MUTATION_APPLYING, + ) try: await _op_dispatch(repo, tid, mut["op"], mut["payload"]) await repo.mark_mutation_applied(mut["id"]) @@ -169,6 +202,12 @@ async def reconcile_topologies(repo: BaseRepository) -> int: "topology %s mutation %s applied op=%s", tid, mut["id"], mut["op"], ) + await _publish_safely( + bus, + _topics.topology_mutation(tid, _topics.MUTATION_APPLIED), + {"mutation_id": mut["id"], "op": mut["op"]}, + event_type=_topics.MUTATION_APPLIED, + ) except (MutationError, Exception) as exc: # noqa: BLE001 reason = f"{type(exc).__name__}: {exc}" await repo.mark_mutation_failed(mut["id"], reason) @@ -176,10 +215,22 @@ async def reconcile_topologies(repo: BaseRepository) -> int: "topology %s mutation %s failed: %s", tid, mut["id"], reason, ) + await _publish_safely( + bus, + _topics.topology_mutation(tid, _topics.MUTATION_FAILED), + {"mutation_id": mut["id"], "op": mut["op"], "reason": reason}, + event_type=_topics.MUTATION_FAILED, + ) try: await transition_status( repo, tid, TopologyStatus.DEGRADED, reason=reason, ) + await _publish_safely( + bus, + _topics.topology_status(tid), + {"state": TopologyStatus.DEGRADED, "reason": reason}, + event_type=_topics.TOPOLOGY_STATUS, + ) except TopologyStatusError: # Already degraded / in a state that can't degrade # further — leave as is. @@ -239,6 +290,21 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> """ log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs) console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]") + + # Connect to the bus for publish + wake-on-enqueue. Failure here is + # non-fatal: a mutator without a bus still works, it just runs at + # poll-interval latency and doesn't push notifications to UI clients. + bus: BaseBus | None = None + wake = asyncio.Event() + wake_task: asyncio.Task | None = None + try: + candidate = get_bus(client_name="mutator") + await candidate.connect() + bus = candidate + wake_task = asyncio.create_task(_wake_on_enqueue(bus, wake)) + except Exception as exc: # noqa: BLE001 + log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc) + try: while True: await mutate_all(force=False, repo=repo) @@ -246,7 +312,7 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> # entering the dispatch body when there's nothing to do. try: if await repo.has_pending_topology_mutation(): - await reconcile_topologies(repo) + await reconcile_topologies(repo, bus=bus) except NotImplementedError: # Backend without MazeNET support — nothing to reconcile. pass @@ -256,7 +322,42 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> pass except Exception: log.exception("reconcile_agent_resyncs tick raised") - await asyncio.sleep(poll_interval_secs) + # Wait until either poll_interval_secs elapses OR an enqueued + # mutation wakes us early. Clearing before the next tick + # means a second wake during the tick will re-fire after. + try: + await asyncio.wait_for(wake.wait(), timeout=poll_interval_secs) + except asyncio.TimeoutError: + pass + wake.clear() except KeyboardInterrupt: log.info("mutator watch loop stopped") console.print("\n[dim]Mutator watcher stopped.[/]") + finally: + if wake_task is not None: + wake_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await wake_task + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _wake_on_enqueue(bus: BaseBus, wake: asyncio.Event) -> None: + """Flip *wake* every time a ``mutation.enqueued`` event lands. + + Subscribes to the wildcard ``topology.*.mutation.enqueued`` — a single + subscription covers every topology on the host. Runs until cancelled + or the bus closes (NullBus yields nothing and returns immediately, + which is fine: the poll-interval fallback still ticks). + """ + pattern = f"{_topics.TOPOLOGY}.*.mutation.{_topics.MUTATION_ENQUEUED}" + try: + sub = bus.subscribe(pattern) + async with sub: + async for _event in sub: + wake.set() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning("mutator: wake subscriber died (%s); falling back to poll", exc) diff --git a/decnet/web/api.py b/decnet/web/api.py index d861b570..20151387 100644 --- a/decnet/web/api.py +++ b/decnet/web/api.py @@ -127,6 +127,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: pass except Exception as exc: log.warning("Task shutdown error: %s", exc) + from decnet.bus.app import close_app_bus + await close_app_bus() from decnet.telemetry import shutdown_tracing shutdown_tracing() log.info("API shutdown complete") diff --git a/decnet/web/router/topology/__init__.py b/decnet/web/router/topology/__init__.py index e93ccf6a..32dbe864 100644 --- a/decnet/web/router/topology/__init__.py +++ b/decnet/web/router/topology/__init__.py @@ -16,6 +16,7 @@ from .api_decky_crud import router as _decky_router from .api_delete_topology import router as _delete_router from .api_deploy_topology import router as _deploy_router from .api_edge_crud import router as _edge_router +from .api_events import router as _events_router from .api_get_topology import router as _get_router from .api_lan_crud import router as _lan_router from .api_list_topologies import router as _list_router @@ -40,6 +41,7 @@ topology_router.include_router(_lan_router) topology_router.include_router(_decky_router) topology_router.include_router(_edge_router) topology_router.include_router(_mutations_router) +topology_router.include_router(_events_router) topology_router.include_router(_get_router) diff --git a/decnet/web/router/topology/api_events.py b/decnet/web/router/topology/api_events.py new file mode 100644 index 00000000..759ffd91 --- /dev/null +++ b/decnet/web/router/topology/api_events.py @@ -0,0 +1,149 @@ +"""SSE stream of topology lifecycle events — one connection per editor. + +Subscribes to ``topology..>`` on the :class:`~decnet.bus.base.BaseBus` +for the duration of the request and forwards each matching bus event as +a Server-Sent Event to the browser. Emits a one-shot snapshot on connect +(current status + any in-flight mutations) so the client doesn't need a +separate fetch to initialise the "pending" buffer. + +Authorization matches :mod:`decnet.web.router.stream.api_stream_events` +— a JWT passed via the ``?token=`` query parameter (EventSource can't +set arbitrary headers) + ``require_stream_viewer`` role gate. The +per-topology 404 is enforced after auth so existence probes can't leak +a topology id to an unauthenticated caller. +""" +from __future__ import annotations + +import asyncio +from typing import AsyncGenerator + +import orjson +from fastapi import APIRouter, Depends, Request +from fastapi.responses import StreamingResponse + +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_stream_viewer + +from ._guards import get_topology_or_404 + +log = get_logger("api.topology.events") + +router = APIRouter() + +_KEEPALIVE_SECS = 15.0 +_IN_FLIGHT_STATES = ("pending", "applying") + + +def _format_sse(event_name: str, data: dict) -> str: + """Build one SSE frame: ``event: \\ndata: \\n\\n``.""" + return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n" + + +@router.get( + "/{topology_id}/events", + tags=["MazeNET Topologies"], + responses={ + 200: { + "content": {"text/event-stream": {}}, + "description": "SSE stream of mutation and status events for one topology", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Topology not found"}, + }, +) +@_traced("api.topology.events") +async def api_topology_events( + topology_id: str, + request: Request, + _user: dict = Depends(require_stream_viewer), +) -> StreamingResponse: + topo = await get_topology_or_404(topology_id) + snapshot_status = topo["status"] + in_flight: list[dict] = [] + for state in _IN_FLIGHT_STATES: + in_flight.extend(await repo.list_topology_mutations(topology_id, state=state)) + + async def generator() -> AsyncGenerator[str, None]: + # Flush headers immediately so the browser's EventSource sees a + # live connection before the first real event arrives. + yield ": keepalive\n\n" + + # One-shot snapshot — pair the current topology status with any + # mutations the mutator is still holding, so the client buffer + # can render an accurate "already in flight" state. + yield _format_sse("snapshot", { + "topology_id": topology_id, + "status": snapshot_status, + "in_flight": in_flight, + }) + + bus = await get_app_bus() + if bus is None: + # Bus disabled (NullBus) or unreachable. The snapshot is + # still useful; we idle on keepalives so the client stays + # connected and will re-poll on its own timers. + while not await request.is_disconnected(): + try: + await asyncio.sleep(_KEEPALIVE_SECS) + except asyncio.CancelledError: + break + yield ": keepalive\n\n" + return + + sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>") + try: + async with sub: + sub_iter = sub.__aiter__() + while True: + if await request.is_disconnected(): + break + next_task = asyncio.ensure_future(sub_iter.__anext__()) + try: + event = await asyncio.wait_for(next_task, timeout=_KEEPALIVE_SECS) + except asyncio.TimeoutError: + next_task.cancel() + yield ": keepalive\n\n" + continue + except StopAsyncIteration: + break + # Map the bus event onto an SSE ``event:`` name that + # the frontend can switch on without parsing topics. + yield _format_sse( + _sse_name_for(event.topic), + { + "topic": event.topic, + "type": event.type, + "ts": event.ts, + "payload": event.payload, + }, + ) + except asyncio.CancelledError: + pass + except Exception: + log.exception("topology events stream crashed topology_id=%s", topology_id) + yield _format_sse("error", {"message": "Stream interrupted"}) + + return StreamingResponse( + generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +def _sse_name_for(topic: str) -> str: + """Derive an SSE ``event:`` name from a bus topic. + + ``topology..mutation.applied`` → ``mutation.applied`` + ``topology..status`` → ``status`` + Anything else is passed through unchanged so future topic families + don't silently collapse onto a generic bucket. + """ + parts = topic.split(".", 2) + return parts[2] if len(parts) >= 3 else topic diff --git a/decnet/web/router/topology/api_mutations.py b/decnet/web/router/topology/api_mutations.py index 9415dc11..d0f4cf4b 100644 --- a/decnet/web/router/topology/api_mutations.py +++ b/decnet/web/router/topology/api_mutations.py @@ -13,6 +13,9 @@ from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query, status +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.logging import get_logger from decnet.telemetry import traced as _traced from decnet.topology.status import ( TopologyStatus, @@ -27,6 +30,8 @@ from decnet.web.dependencies import repo, require_admin, require_viewer from ._guards import get_topology_or_404, map_repo_exception +_log = get_logger("api.topology.mutations") + router = APIRouter() _MUTATABLE: frozenset[str] = frozenset( @@ -80,6 +85,20 @@ async def api_enqueue_mutation( except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc + # Fire-and-forget bus publish so the mutator can wake immediately and + # the SSE route can notify connected editors. Bus failure here must + # never mask a successful enqueue — the DB row is authoritative. + bus = await get_app_bus() + if bus is not None: + try: + await bus.publish( + _topics.topology_mutation(topology_id, _topics.MUTATION_ENQUEUED), + {"mutation_id": mutation_id, "op": body.op, "payload": body.payload}, + event_type=_topics.MUTATION_ENQUEUED, + ) + except Exception as exc: # noqa: BLE001 + _log.warning("bus publish (enqueued) failed: %s", exc) + return MutationEnqueueResponse(mutation_id=mutation_id, state="pending")