diff --git a/decnet/web/router/topology/api_events.py b/decnet/web/router/topology/api_events.py index 6fcab719..bc360af7 100644 --- a/decnet/web/router/topology/api_events.py +++ b/decnet/web/router/topology/api_events.py @@ -102,38 +102,65 @@ async def api_topology_events( yield ": keepalive\n\n" return - sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>") - try: + # Two subscriptions, merged through an asyncio.Queue: + # + # topology..> — lifecycle (status, mutation.*). + # decky.> — per-decky events, filtered to this + # topology by the event's payload. + # + # Decky events carry ``topology_id`` in their payload (see + # decnet.engine.services_live._publish); we discard ones + # that don't belong to this stream so a fleet decky sharing + # a name with a topology decky doesn't leak across. + topo_sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>") + decky_sub = bus.subscribe(f"{_topics.DECKY}.>") + queue: asyncio.Queue = asyncio.Queue(maxsize=256) + + async def _pump(sub, *, only_topology: bool = False) -> None: async with sub: - sub_iter = sub.__aiter__() - while True: - if await request.is_disconnected(): - break - next_task = asyncio.ensure_future(sub_iter.__anext__()) + async for ev in sub: + if only_topology: + payload = ev.payload or {} + if payload.get("topology_id") != topology_id: + continue try: - event = await asyncio.wait_for(next_task, timeout=_KEEPALIVE_SECS) - except asyncio.TimeoutError: - next_task.cancel() - yield ": keepalive\n\n" - continue - except StopAsyncIteration: - break - # Map the bus event onto an SSE ``event:`` name that - # the frontend can switch on without parsing topics. - yield _format_sse( - _sse_name_for(event.topic), - { - "topic": event.topic, - "type": event.type, - "ts": event.ts, - "payload": event.payload, - }, + queue.put_nowait(ev) + except asyncio.QueueFull: + # Drop on overflow rather than backpressuring + # the bus; the snapshot + reconnect path will + # cover any gap a slow consumer creates. + pass + + topo_task = asyncio.create_task(_pump(topo_sub)) + decky_task = asyncio.create_task(_pump(decky_sub, only_topology=True)) + try: + while True: + if await request.is_disconnected(): + break + try: + event = await asyncio.wait_for( + queue.get(), timeout=_KEEPALIVE_SECS, ) + except asyncio.TimeoutError: + yield ": keepalive\n\n" + continue + yield _format_sse( + _sse_name_for(event.topic), + { + "topic": event.topic, + "type": event.type, + "ts": event.ts, + "payload": event.payload, + }, + ) except asyncio.CancelledError: pass except Exception: log.exception("topology events stream crashed topology_id=%s", topology_id) yield _format_sse("error", {"message": "Stream interrupted"}) + finally: + topo_task.cancel() + decky_task.cancel() return StreamingResponse( generator(), @@ -148,10 +175,20 @@ async def api_topology_events( def _sse_name_for(topic: str) -> str: """Derive an SSE ``event:`` name from a bus topic. - ``topology..mutation.applied`` → ``mutation.applied`` - ``topology..status`` → ``status`` + ``topology..mutation.applied`` → ``mutation.applied`` + ``topology..status`` → ``status`` + ``decky..service.added`` → ``decky.service.added`` + ``decky..service.removed`` → ``decky.service.removed`` Anything else is passed through unchanged so future topic families don't silently collapse onto a generic bucket. """ parts = topic.split(".", 2) - return parts[2] if len(parts) >= 3 else topic + if len(parts) < 3: + return topic + head, _ident, tail = parts + # Decky events: keep the ``decky.`` prefix so the frontend + # discriminates them from topology-lifecycle events that happen to + # share an event name (e.g. ``status``). + if head == _topics.DECKY: + return f"{_topics.DECKY}.{tail}" + return tail diff --git a/decnet_web/src/components/MazeNET/MazeNET.tsx b/decnet_web/src/components/MazeNET/MazeNET.tsx index 399caa18..be0b2602 100644 --- a/decnet_web/src/components/MazeNET/MazeNET.tsx +++ b/decnet_web/src/components/MazeNET/MazeNET.tsx @@ -650,6 +650,22 @@ const MazeNET: React.FC = () => { || event.name === 'status') { refetch(); } + // Live service mutations from another tab / admin: optimistically + // patch local state so the chip set reflects shape without a full + // re-hydrate. The post-mutation services list lives on the + // payload; same shape the actor's POST/DELETE response carries. + if (event.name === 'decky.service.added' + || event.name === 'decky.service.removed') { + const p = event.payload ?? {}; + const deckyName = typeof p.decky_name === 'string' ? p.decky_name : null; + const services = Array.isArray(p.services) ? p.services as string[] : null; + if (deckyName && services) { + setNodes((prev) => prev.map((n) => n.kind === 'decky' && n.name === deckyName + ? { ...n, services } : n)); + setStreamLive(true); + setLastEventAt(new Date()); + } + } }, [refetch]); const onStreamError = useCallback(() => { setStreamLive(false); }, []); useTopologyStream({ diff --git a/decnet_web/src/components/MazeNET/useTopologyStream.ts b/decnet_web/src/components/MazeNET/useTopologyStream.ts index 68d4a549..4e57beae 100644 --- a/decnet_web/src/components/MazeNET/useTopologyStream.ts +++ b/decnet_web/src/components/MazeNET/useTopologyStream.ts @@ -16,7 +16,13 @@ export type TopologyStreamEventName = | 'mutation.applying' | 'mutation.applied' | 'mutation.failed' - | 'status'; + | 'status' + // Live per-decky service mutations forwarded by the SSE proxy on the + // server. The payload carries decky_name + service_name + the + // post-mutation services list, so a second tab can reconcile shape + // without a refetch. + | 'decky.service.added' + | 'decky.service.removed'; export interface TopologyStreamEvent { name: TopologyStreamEventName | string; @@ -40,6 +46,8 @@ const NAMED_EVENTS: TopologyStreamEventName[] = [ 'mutation.applied', 'mutation.failed', 'status', + 'decky.service.added', + 'decky.service.removed', ]; export function useTopologyStream({