diff --git a/decnet_web/src/components/MazeNET/MazeNET.tsx b/decnet_web/src/components/MazeNET/MazeNET.tsx
index a15aa5fa..925bebf3 100644
--- a/decnet_web/src/components/MazeNET/MazeNET.tsx
+++ b/decnet_web/src/components/MazeNET/MazeNET.tsx
@@ -17,6 +17,7 @@ import type { Net, MazeNode, Edge, DeckyNode } from './types';
import { useMazeApi } from './useMazeApi';
import { useMazeInteraction, type PaletteDrag } from './useMazeInteraction';
import { useLayoutPersistor } from './useMazeLayoutStore';
+import { useTopologyStream, type TopologyStreamEvent } from './useTopologyStream';
import { ARCHETYPES as DEFAULT_ARCHETYPES } from './data';
/* Short unique suffix for default names — avoids the DB uniqueness
@@ -424,6 +425,29 @@ const MazeNET: React.FC = () => {
useEffect(() => { refetch(); }, [refetch]);
+ /* Live topology stream. Open only when the topology is deployed —
+ * pending topologies have no mutator loop and would just idle on
+ * keepalives. On any state-transition event we refetch; DB is the
+ * source of truth and the bus is at-most-once. */
+ const [streamLive, setStreamLive] = useState(false);
+ const streamEnabled = topoStatus === 'active' || topoStatus === 'degraded';
+ const onStreamEvent = useCallback((event: TopologyStreamEvent) => {
+ setStreamLive(true);
+ if (event.name === 'mutation.applied'
+ || event.name === 'mutation.failed'
+ || event.name === 'status') {
+ refetch();
+ }
+ }, [refetch]);
+ const onStreamError = useCallback(() => { setStreamLive(false); }, []);
+ useTopologyStream({
+ topologyId: streamEnabled ? topologyId : null,
+ enabled: streamEnabled,
+ onEvent: onStreamEvent,
+ onError: onStreamError,
+ });
+ useEffect(() => { if (!streamEnabled) setStreamLive(false); }, [streamEnabled]);
+
const onDeploy = async () => {
if (!topologyId) return;
setDeploying(true);
@@ -455,6 +479,11 @@ const MazeNET: React.FC = () => {
NETWORK OF NETWORKS · {topoStatus.toUpperCase()} · v{topoVersion} ·{' '}
{nets.length} NETS · {nodes.length} NODES · {edges.length} PATHS
+ {streamEnabled && (
+
+ {' '}· {streamLive ? 'LIVE' : 'CONNECTING…'}
+
+ )}
{loadErr && · {loadErr}}
{actionErr && · {actionErr}}
diff --git a/decnet_web/src/components/MazeNET/useMazeApi.ts b/decnet_web/src/components/MazeNET/useMazeApi.ts
index 45ecc6f4..72e89b9b 100644
--- a/decnet_web/src/components/MazeNET/useMazeApi.ts
+++ b/decnet_web/src/components/MazeNET/useMazeApi.ts
@@ -193,6 +193,15 @@ export interface CreateDeckyBody {
decky_config?: Record;
}
+export type MutationOp =
+ | 'add_lan' | 'remove_lan' | 'update_lan'
+ | 'attach_decky' | 'detach_decky' | 'remove_decky' | 'update_decky';
+
+export interface EnqueueMutationResponse {
+ mutation_id: string;
+ state: string;
+}
+
export interface MazeApi {
listTopologies: () => Promise;
createBlankTopology: (name: string) => Promise;
@@ -213,6 +222,13 @@ export interface MazeApi {
attachEdge: (topologyId: string, body: { decky_uuid: string; lan_id: string; is_bridge?: boolean; forwards_l3?: boolean }) => Promise;
detachEdge: (topologyId: string, edgeId: string) => Promise;
+ enqueueMutation: (
+ topologyId: string,
+ op: MutationOp,
+ payload: Record,
+ expectedVersion?: number,
+ ) => Promise;
+
deployTopology: (topologyId: string) => Promise;
}
@@ -356,6 +372,24 @@ export function useMazeApi(): MazeApi {
[],
);
+ const enqueueMutation = useCallback(
+ async (
+ topologyId: string,
+ op: MutationOp,
+ payload: Record,
+ expectedVersion?: number,
+ ): Promise => {
+ const body: { op: MutationOp; payload: Record; expected_version?: number } = { op, payload };
+ if (expectedVersion !== undefined) body.expected_version = expectedVersion;
+ const { data } = await api.post(
+ `/topologies/${topologyId}/mutations`,
+ body,
+ );
+ return data;
+ },
+ [],
+ );
+
return useMemo(
() => ({
listTopologies, createBlankTopology, getTopology, getServices, getArchetypes,
@@ -363,6 +397,7 @@ export function useMazeApi(): MazeApi {
createLan, updateLan, deleteLan,
createDecky, updateDecky, deleteDecky,
attachEdge, detachEdge,
+ enqueueMutation,
deployTopology,
}),
[
@@ -371,6 +406,7 @@ export function useMazeApi(): MazeApi {
createLan, updateLan, deleteLan,
createDecky, updateDecky, deleteDecky,
attachEdge, detachEdge,
+ enqueueMutation,
deployTopology,
],
);
diff --git a/decnet_web/src/components/MazeNET/useTopologyStream.ts b/decnet_web/src/components/MazeNET/useTopologyStream.ts
new file mode 100644
index 00000000..68d4a549
--- /dev/null
+++ b/decnet_web/src/components/MazeNET/useTopologyStream.ts
@@ -0,0 +1,107 @@
+/**
+ * Topology event stream — opens an SSE connection to
+ * `/topologies/{id}/events` and dispatches typed events to the caller.
+ *
+ * Mirrors the reconnect shape used by the dashboard's `/stream` consumer:
+ * on any error we close the current EventSource and retry after 3s. The
+ * hook is inert until `topologyId` is non-empty and `enabled` is true —
+ * typical usage is to gate on `topoStatus === 'active' || 'degraded'` so
+ * pending topologies don't open a useless channel.
+ */
+import { useEffect, useRef } from 'react';
+
+export type TopologyStreamEventName =
+ | 'snapshot'
+ | 'mutation.enqueued'
+ | 'mutation.applying'
+ | 'mutation.applied'
+ | 'mutation.failed'
+ | 'status';
+
+export interface TopologyStreamEvent {
+ name: TopologyStreamEventName | string;
+ topic?: string;
+ type?: string;
+ ts?: string;
+ payload: Record;
+}
+
+export interface UseTopologyStreamOptions {
+ topologyId: string | null;
+ enabled: boolean;
+ onEvent: (event: TopologyStreamEvent) => void;
+ onError?: () => void;
+}
+
+const NAMED_EVENTS: TopologyStreamEventName[] = [
+ 'snapshot',
+ 'mutation.enqueued',
+ 'mutation.applying',
+ 'mutation.applied',
+ 'mutation.failed',
+ 'status',
+];
+
+export function useTopologyStream({
+ topologyId,
+ enabled,
+ onEvent,
+ onError,
+}: UseTopologyStreamOptions): void {
+ const esRef = useRef(null);
+ const reconnectRef = useRef | null>(null);
+ // Keep the latest callbacks in refs so reconnect logic doesn't tear
+ // down and rebuild the connection every time the consumer rerenders.
+ const onEventRef = useRef(onEvent);
+ const onErrorRef = useRef(onError);
+ useEffect(() => { onEventRef.current = onEvent; }, [onEvent]);
+ useEffect(() => { onErrorRef.current = onError; }, [onError]);
+
+ useEffect(() => {
+ if (!enabled || !topologyId) return;
+
+ const connect = () => {
+ if (esRef.current) esRef.current.close();
+ const token = localStorage.getItem('token') ?? '';
+ const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
+ const url = `${baseUrl}/topologies/${topologyId}/events?token=${encodeURIComponent(token)}`;
+
+ const es = new EventSource(url);
+ esRef.current = es;
+
+ const dispatch = (name: string) => (event: MessageEvent) => {
+ try {
+ const parsed = JSON.parse(event.data) as Partial;
+ onEventRef.current({
+ name,
+ topic: parsed.topic,
+ type: parsed.type,
+ ts: parsed.ts,
+ payload: (parsed.payload ?? {}) as Record,
+ });
+ } catch (err) {
+ console.error('useTopologyStream: parse failed', err);
+ }
+ };
+
+ for (const name of NAMED_EVENTS) {
+ es.addEventListener(name, dispatch(name) as EventListener);
+ }
+
+ es.onerror = () => {
+ es.close();
+ esRef.current = null;
+ onErrorRef.current?.();
+ reconnectRef.current = setTimeout(connect, 3000);
+ };
+ };
+
+ connect();
+
+ return () => {
+ if (reconnectRef.current) clearTimeout(reconnectRef.current);
+ if (esRef.current) esRef.current.close();
+ esRef.current = null;
+ };
+ }, [topologyId, enabled]);
+}