From 0e5484648ffd1b846d9ed43d19bcc1a010b481f3 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 28 Apr 2026 23:15:38 -0400 Subject: [PATCH] feat: forward decky.*.service.* on per-topology SSE stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /topologies/{id}/events SSE proxy now subscribes to two bus patterns concurrently and merges them through a bounded asyncio.Queue: * topology.{id}.> — lifecycle (status, mutation.*) — unchanged. * decky.> — per-decky events, filtered by payload.topology_id so a fleet decky sharing a name with a topology decky doesn't leak across. _sse_name_for routes 'decky..service.added' to the SSE event name 'decky.service.added' (kept the prefix so the frontend doesn't collide with topology lifecycle events that share leaf names like 'status'). useTopologyStream surfaces the two new event names; MazeNET.tsx's onStreamEvent optimistically patches the matching node's services list so a second tab reflects shape changes without a refetch. --- decnet/web/router/topology/api_events.py | 91 +++++++++++++------ decnet_web/src/components/MazeNET/MazeNET.tsx | 16 ++++ .../components/MazeNET/useTopologyStream.ts | 10 +- 3 files changed, 89 insertions(+), 28 deletions(-) diff --git a/decnet/web/router/topology/api_events.py b/decnet/web/router/topology/api_events.py index 6fcab719..bc360af7 100644 --- a/decnet/web/router/topology/api_events.py +++ b/decnet/web/router/topology/api_events.py @@ -102,38 +102,65 @@ async def api_topology_events( yield ": keepalive\n\n" return - sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>") - try: + # Two subscriptions, merged through an asyncio.Queue: + # + # topology..> — lifecycle (status, mutation.*). + # decky.> — per-decky events, filtered to this + # topology by the event's payload. + # + # Decky events carry ``topology_id`` in their payload (see + # decnet.engine.services_live._publish); we discard ones + # that don't belong to this stream so a fleet decky sharing + # a name with a topology decky doesn't leak across. + topo_sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>") + decky_sub = bus.subscribe(f"{_topics.DECKY}.>") + queue: asyncio.Queue = asyncio.Queue(maxsize=256) + + async def _pump(sub, *, only_topology: bool = False) -> None: async with sub: - sub_iter = sub.__aiter__() - while True: - if await request.is_disconnected(): - break - next_task = asyncio.ensure_future(sub_iter.__anext__()) + async for ev in sub: + if only_topology: + payload = ev.payload or {} + if payload.get("topology_id") != topology_id: + continue try: - event = await asyncio.wait_for(next_task, timeout=_KEEPALIVE_SECS) - except asyncio.TimeoutError: - next_task.cancel() - yield ": keepalive\n\n" - continue - except StopAsyncIteration: - break - # Map the bus event onto an SSE ``event:`` name that - # the frontend can switch on without parsing topics. - yield _format_sse( - _sse_name_for(event.topic), - { - "topic": event.topic, - "type": event.type, - "ts": event.ts, - "payload": event.payload, - }, + queue.put_nowait(ev) + except asyncio.QueueFull: + # Drop on overflow rather than backpressuring + # the bus; the snapshot + reconnect path will + # cover any gap a slow consumer creates. + pass + + topo_task = asyncio.create_task(_pump(topo_sub)) + decky_task = asyncio.create_task(_pump(decky_sub, only_topology=True)) + try: + while True: + if await request.is_disconnected(): + break + try: + event = await asyncio.wait_for( + queue.get(), timeout=_KEEPALIVE_SECS, ) + except asyncio.TimeoutError: + yield ": keepalive\n\n" + continue + yield _format_sse( + _sse_name_for(event.topic), + { + "topic": event.topic, + "type": event.type, + "ts": event.ts, + "payload": event.payload, + }, + ) except asyncio.CancelledError: pass except Exception: log.exception("topology events stream crashed topology_id=%s", topology_id) yield _format_sse("error", {"message": "Stream interrupted"}) + finally: + topo_task.cancel() + decky_task.cancel() return StreamingResponse( generator(), @@ -148,10 +175,20 @@ async def api_topology_events( def _sse_name_for(topic: str) -> str: """Derive an SSE ``event:`` name from a bus topic. - ``topology..mutation.applied`` → ``mutation.applied`` - ``topology..status`` → ``status`` + ``topology..mutation.applied`` → ``mutation.applied`` + ``topology..status`` → ``status`` + ``decky..service.added`` → ``decky.service.added`` + ``decky..service.removed`` → ``decky.service.removed`` Anything else is passed through unchanged so future topic families don't silently collapse onto a generic bucket. """ parts = topic.split(".", 2) - return parts[2] if len(parts) >= 3 else topic + if len(parts) < 3: + return topic + head, _ident, tail = parts + # Decky events: keep the ``decky.`` prefix so the frontend + # discriminates them from topology-lifecycle events that happen to + # share an event name (e.g. ``status``). + if head == _topics.DECKY: + return f"{_topics.DECKY}.{tail}" + return tail diff --git a/decnet_web/src/components/MazeNET/MazeNET.tsx b/decnet_web/src/components/MazeNET/MazeNET.tsx index 399caa18..be0b2602 100644 --- a/decnet_web/src/components/MazeNET/MazeNET.tsx +++ b/decnet_web/src/components/MazeNET/MazeNET.tsx @@ -650,6 +650,22 @@ const MazeNET: React.FC = () => { || event.name === 'status') { refetch(); } + // Live service mutations from another tab / admin: optimistically + // patch local state so the chip set reflects shape without a full + // re-hydrate. The post-mutation services list lives on the + // payload; same shape the actor's POST/DELETE response carries. + if (event.name === 'decky.service.added' + || event.name === 'decky.service.removed') { + const p = event.payload ?? {}; + const deckyName = typeof p.decky_name === 'string' ? p.decky_name : null; + const services = Array.isArray(p.services) ? p.services as string[] : null; + if (deckyName && services) { + setNodes((prev) => prev.map((n) => n.kind === 'decky' && n.name === deckyName + ? { ...n, services } : n)); + setStreamLive(true); + setLastEventAt(new Date()); + } + } }, [refetch]); const onStreamError = useCallback(() => { setStreamLive(false); }, []); useTopologyStream({ diff --git a/decnet_web/src/components/MazeNET/useTopologyStream.ts b/decnet_web/src/components/MazeNET/useTopologyStream.ts index 68d4a549..4e57beae 100644 --- a/decnet_web/src/components/MazeNET/useTopologyStream.ts +++ b/decnet_web/src/components/MazeNET/useTopologyStream.ts @@ -16,7 +16,13 @@ export type TopologyStreamEventName = | 'mutation.applying' | 'mutation.applied' | 'mutation.failed' - | 'status'; + | 'status' + // Live per-decky service mutations forwarded by the SSE proxy on the + // server. The payload carries decky_name + service_name + the + // post-mutation services list, so a second tab can reconcile shape + // without a refetch. + | 'decky.service.added' + | 'decky.service.removed'; export interface TopologyStreamEvent { name: TopologyStreamEventName | string; @@ -40,6 +46,8 @@ const NAMED_EVENTS: TopologyStreamEventName[] = [ 'mutation.applied', 'mutation.failed', 'status', + 'decky.service.added', + 'decky.service.removed', ]; export function useTopologyStream({