From 8ecb9e6c2d1bbe5b7653187f02de31f25583bbba Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 14:38:58 -0400 Subject: [PATCH] feat(web/mazenet): subscribe to topology SSE stream in editor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire the MazeNET editor to the new /topologies/{id}/events SSE route so live (active|degraded) topologies reflect mutator state transitions without reload: - useTopologyStream hook opens an EventSource against /topologies/{id}/events?token=, with 3s reconnect matching the dashboard's /stream consumer. Callback refs avoid tearing down the connection on consumer rerenders. - useMazeApi gains enqueueMutation(topologyId, op, payload, expectedVersion?) — thin wrapper over POST /mutations. - MazeNET.tsx opens the stream only when topoStatus is active|degraded (pending editors have nothing to stream) and refetches on mutation.applied|failed|status events. Header shows a LIVE / CONNECTING… indicator. Phase A slice — Apply (N changes) with an optimistic staged buffer lands in a follow-up; the hooks + API method it'll need are already here. --- decnet_web/src/components/MazeNET/MazeNET.tsx | 29 +++++ .../src/components/MazeNET/useMazeApi.ts | 36 ++++++ .../components/MazeNET/useTopologyStream.ts | 107 ++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 decnet_web/src/components/MazeNET/useTopologyStream.ts diff --git a/decnet_web/src/components/MazeNET/MazeNET.tsx b/decnet_web/src/components/MazeNET/MazeNET.tsx index a15aa5fa..925bebf3 100644 --- a/decnet_web/src/components/MazeNET/MazeNET.tsx +++ b/decnet_web/src/components/MazeNET/MazeNET.tsx @@ -17,6 +17,7 @@ import type { Net, MazeNode, Edge, DeckyNode } from './types'; import { useMazeApi } from './useMazeApi'; import { useMazeInteraction, type PaletteDrag } from './useMazeInteraction'; import { useLayoutPersistor } from './useMazeLayoutStore'; +import { useTopologyStream, type TopologyStreamEvent } from './useTopologyStream'; import { ARCHETYPES as DEFAULT_ARCHETYPES } from './data'; /* Short unique suffix for default names — avoids the DB uniqueness @@ -424,6 +425,29 @@ const MazeNET: React.FC = () => { useEffect(() => { refetch(); }, [refetch]); + /* Live topology stream. Open only when the topology is deployed — + * pending topologies have no mutator loop and would just idle on + * keepalives. On any state-transition event we refetch; DB is the + * source of truth and the bus is at-most-once. */ + const [streamLive, setStreamLive] = useState(false); + const streamEnabled = topoStatus === 'active' || topoStatus === 'degraded'; + const onStreamEvent = useCallback((event: TopologyStreamEvent) => { + setStreamLive(true); + if (event.name === 'mutation.applied' + || event.name === 'mutation.failed' + || event.name === 'status') { + refetch(); + } + }, [refetch]); + const onStreamError = useCallback(() => { setStreamLive(false); }, []); + useTopologyStream({ + topologyId: streamEnabled ? topologyId : null, + enabled: streamEnabled, + onEvent: onStreamEvent, + onError: onStreamError, + }); + useEffect(() => { if (!streamEnabled) setStreamLive(false); }, [streamEnabled]); + const onDeploy = async () => { if (!topologyId) return; setDeploying(true); @@ -455,6 +479,11 @@ const MazeNET: React.FC = () => {
NETWORK OF NETWORKS · {topoStatus.toUpperCase()} · v{topoVersion} ·{' '} {nets.length} NETS · {nodes.length} NODES · {edges.length} PATHS + {streamEnabled && ( + + {' '}· {streamLive ? 'LIVE' : 'CONNECTING…'} + + )} {loadErr && · {loadErr}} {actionErr && · {actionErr}}
diff --git a/decnet_web/src/components/MazeNET/useMazeApi.ts b/decnet_web/src/components/MazeNET/useMazeApi.ts index 45ecc6f4..72e89b9b 100644 --- a/decnet_web/src/components/MazeNET/useMazeApi.ts +++ b/decnet_web/src/components/MazeNET/useMazeApi.ts @@ -193,6 +193,15 @@ export interface CreateDeckyBody { decky_config?: Record; } +export type MutationOp = + | 'add_lan' | 'remove_lan' | 'update_lan' + | 'attach_decky' | 'detach_decky' | 'remove_decky' | 'update_decky'; + +export interface EnqueueMutationResponse { + mutation_id: string; + state: string; +} + export interface MazeApi { listTopologies: () => Promise; createBlankTopology: (name: string) => Promise; @@ -213,6 +222,13 @@ export interface MazeApi { attachEdge: (topologyId: string, body: { decky_uuid: string; lan_id: string; is_bridge?: boolean; forwards_l3?: boolean }) => Promise; detachEdge: (topologyId: string, edgeId: string) => Promise; + enqueueMutation: ( + topologyId: string, + op: MutationOp, + payload: Record, + expectedVersion?: number, + ) => Promise; + deployTopology: (topologyId: string) => Promise; } @@ -356,6 +372,24 @@ export function useMazeApi(): MazeApi { [], ); + const enqueueMutation = useCallback( + async ( + topologyId: string, + op: MutationOp, + payload: Record, + expectedVersion?: number, + ): Promise => { + const body: { op: MutationOp; payload: Record; expected_version?: number } = { op, payload }; + if (expectedVersion !== undefined) body.expected_version = expectedVersion; + const { data } = await api.post( + `/topologies/${topologyId}/mutations`, + body, + ); + return data; + }, + [], + ); + return useMemo( () => ({ listTopologies, createBlankTopology, getTopology, getServices, getArchetypes, @@ -363,6 +397,7 @@ export function useMazeApi(): MazeApi { createLan, updateLan, deleteLan, createDecky, updateDecky, deleteDecky, attachEdge, detachEdge, + enqueueMutation, deployTopology, }), [ @@ -371,6 +406,7 @@ export function useMazeApi(): MazeApi { createLan, updateLan, deleteLan, createDecky, updateDecky, deleteDecky, attachEdge, detachEdge, + enqueueMutation, deployTopology, ], ); diff --git a/decnet_web/src/components/MazeNET/useTopologyStream.ts b/decnet_web/src/components/MazeNET/useTopologyStream.ts new file mode 100644 index 00000000..68d4a549 --- /dev/null +++ b/decnet_web/src/components/MazeNET/useTopologyStream.ts @@ -0,0 +1,107 @@ +/** + * Topology event stream — opens an SSE connection to + * `/topologies/{id}/events` and dispatches typed events to the caller. + * + * Mirrors the reconnect shape used by the dashboard's `/stream` consumer: + * on any error we close the current EventSource and retry after 3s. The + * hook is inert until `topologyId` is non-empty and `enabled` is true — + * typical usage is to gate on `topoStatus === 'active' || 'degraded'` so + * pending topologies don't open a useless channel. + */ +import { useEffect, useRef } from 'react'; + +export type TopologyStreamEventName = + | 'snapshot' + | 'mutation.enqueued' + | 'mutation.applying' + | 'mutation.applied' + | 'mutation.failed' + | 'status'; + +export interface TopologyStreamEvent { + name: TopologyStreamEventName | string; + topic?: string; + type?: string; + ts?: string; + payload: Record; +} + +export interface UseTopologyStreamOptions { + topologyId: string | null; + enabled: boolean; + onEvent: (event: TopologyStreamEvent) => void; + onError?: () => void; +} + +const NAMED_EVENTS: TopologyStreamEventName[] = [ + 'snapshot', + 'mutation.enqueued', + 'mutation.applying', + 'mutation.applied', + 'mutation.failed', + 'status', +]; + +export function useTopologyStream({ + topologyId, + enabled, + onEvent, + onError, +}: UseTopologyStreamOptions): void { + const esRef = useRef(null); + const reconnectRef = useRef | null>(null); + // Keep the latest callbacks in refs so reconnect logic doesn't tear + // down and rebuild the connection every time the consumer rerenders. + const onEventRef = useRef(onEvent); + const onErrorRef = useRef(onError); + useEffect(() => { onEventRef.current = onEvent; }, [onEvent]); + useEffect(() => { onErrorRef.current = onError; }, [onError]); + + useEffect(() => { + if (!enabled || !topologyId) return; + + const connect = () => { + if (esRef.current) esRef.current.close(); + const token = localStorage.getItem('token') ?? ''; + const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1'; + const url = `${baseUrl}/topologies/${topologyId}/events?token=${encodeURIComponent(token)}`; + + const es = new EventSource(url); + esRef.current = es; + + const dispatch = (name: string) => (event: MessageEvent) => { + try { + const parsed = JSON.parse(event.data) as Partial; + onEventRef.current({ + name, + topic: parsed.topic, + type: parsed.type, + ts: parsed.ts, + payload: (parsed.payload ?? {}) as Record, + }); + } catch (err) { + console.error('useTopologyStream: parse failed', err); + } + }; + + for (const name of NAMED_EVENTS) { + es.addEventListener(name, dispatch(name) as EventListener); + } + + es.onerror = () => { + es.close(); + esRef.current = null; + onErrorRef.current?.(); + reconnectRef.current = setTimeout(connect, 3000); + }; + }; + + connect(); + + return () => { + if (reconnectRef.current) clearTimeout(reconnectRef.current); + if (esRef.current) esRef.current.close(); + esRef.current = null; + }; + }, [topologyId, enabled]); +}