feat: forward decky.*.service.* on per-topology SSE stream
The /topologies/{id}/events SSE proxy now subscribes to two bus
patterns concurrently and merges them through a bounded asyncio.Queue:
* topology.{id}.> — lifecycle (status, mutation.*) — unchanged.
* decky.> — per-decky events, filtered by payload.topology_id
so a fleet decky sharing a name with a topology
decky doesn't leak across.
_sse_name_for routes 'decky.<name>.service.added' to the SSE event
name 'decky.service.added' (kept the prefix so the frontend doesn't
collide with topology lifecycle events that share leaf names like
'status').
useTopologyStream surfaces the two new event names; MazeNET.tsx's
onStreamEvent optimistically patches the matching node's services
list so a second tab reflects shape changes without a refetch.
This commit is contained in:
@@ -102,38 +102,65 @@ async def api_topology_events(
|
|||||||
yield ": keepalive\n\n"
|
yield ": keepalive\n\n"
|
||||||
return
|
return
|
||||||
|
|
||||||
sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>")
|
# Two subscriptions, merged through an asyncio.Queue:
|
||||||
try:
|
#
|
||||||
|
# topology.<id>.> — lifecycle (status, mutation.*).
|
||||||
|
# decky.> — per-decky events, filtered to this
|
||||||
|
# topology by the event's payload.
|
||||||
|
#
|
||||||
|
# Decky events carry ``topology_id`` in their payload (see
|
||||||
|
# decnet.engine.services_live._publish); we discard ones
|
||||||
|
# that don't belong to this stream so a fleet decky sharing
|
||||||
|
# a name with a topology decky doesn't leak across.
|
||||||
|
topo_sub = bus.subscribe(f"{_topics.TOPOLOGY}.{topology_id}.>")
|
||||||
|
decky_sub = bus.subscribe(f"{_topics.DECKY}.>")
|
||||||
|
queue: asyncio.Queue = asyncio.Queue(maxsize=256)
|
||||||
|
|
||||||
|
async def _pump(sub, *, only_topology: bool = False) -> None:
|
||||||
async with sub:
|
async with sub:
|
||||||
sub_iter = sub.__aiter__()
|
async for ev in sub:
|
||||||
while True:
|
if only_topology:
|
||||||
if await request.is_disconnected():
|
payload = ev.payload or {}
|
||||||
break
|
if payload.get("topology_id") != topology_id:
|
||||||
next_task = asyncio.ensure_future(sub_iter.__anext__())
|
continue
|
||||||
try:
|
try:
|
||||||
event = await asyncio.wait_for(next_task, timeout=_KEEPALIVE_SECS)
|
queue.put_nowait(ev)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.QueueFull:
|
||||||
next_task.cancel()
|
# Drop on overflow rather than backpressuring
|
||||||
yield ": keepalive\n\n"
|
# the bus; the snapshot + reconnect path will
|
||||||
continue
|
# cover any gap a slow consumer creates.
|
||||||
except StopAsyncIteration:
|
pass
|
||||||
break
|
|
||||||
# Map the bus event onto an SSE ``event:`` name that
|
topo_task = asyncio.create_task(_pump(topo_sub))
|
||||||
# the frontend can switch on without parsing topics.
|
decky_task = asyncio.create_task(_pump(decky_sub, only_topology=True))
|
||||||
yield _format_sse(
|
try:
|
||||||
_sse_name_for(event.topic),
|
while True:
|
||||||
{
|
if await request.is_disconnected():
|
||||||
"topic": event.topic,
|
break
|
||||||
"type": event.type,
|
try:
|
||||||
"ts": event.ts,
|
event = await asyncio.wait_for(
|
||||||
"payload": event.payload,
|
queue.get(), timeout=_KEEPALIVE_SECS,
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
yield ": keepalive\n\n"
|
||||||
|
continue
|
||||||
|
yield _format_sse(
|
||||||
|
_sse_name_for(event.topic),
|
||||||
|
{
|
||||||
|
"topic": event.topic,
|
||||||
|
"type": event.type,
|
||||||
|
"ts": event.ts,
|
||||||
|
"payload": event.payload,
|
||||||
|
},
|
||||||
|
)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception("topology events stream crashed topology_id=%s", topology_id)
|
log.exception("topology events stream crashed topology_id=%s", topology_id)
|
||||||
yield _format_sse("error", {"message": "Stream interrupted"})
|
yield _format_sse("error", {"message": "Stream interrupted"})
|
||||||
|
finally:
|
||||||
|
topo_task.cancel()
|
||||||
|
decky_task.cancel()
|
||||||
|
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
generator(),
|
generator(),
|
||||||
@@ -148,10 +175,20 @@ async def api_topology_events(
|
|||||||
def _sse_name_for(topic: str) -> str:
|
def _sse_name_for(topic: str) -> str:
|
||||||
"""Derive an SSE ``event:`` name from a bus topic.
|
"""Derive an SSE ``event:`` name from a bus topic.
|
||||||
|
|
||||||
``topology.<id>.mutation.applied`` → ``mutation.applied``
|
``topology.<id>.mutation.applied`` → ``mutation.applied``
|
||||||
``topology.<id>.status`` → ``status``
|
``topology.<id>.status`` → ``status``
|
||||||
|
``decky.<name>.service.added`` → ``decky.service.added``
|
||||||
|
``decky.<name>.service.removed`` → ``decky.service.removed``
|
||||||
Anything else is passed through unchanged so future topic families
|
Anything else is passed through unchanged so future topic families
|
||||||
don't silently collapse onto a generic bucket.
|
don't silently collapse onto a generic bucket.
|
||||||
"""
|
"""
|
||||||
parts = topic.split(".", 2)
|
parts = topic.split(".", 2)
|
||||||
return parts[2] if len(parts) >= 3 else topic
|
if len(parts) < 3:
|
||||||
|
return topic
|
||||||
|
head, _ident, tail = parts
|
||||||
|
# Decky events: keep the ``decky.`` prefix so the frontend
|
||||||
|
# discriminates them from topology-lifecycle events that happen to
|
||||||
|
# share an event name (e.g. ``status``).
|
||||||
|
if head == _topics.DECKY:
|
||||||
|
return f"{_topics.DECKY}.{tail}"
|
||||||
|
return tail
|
||||||
|
|||||||
@@ -650,6 +650,22 @@ const MazeNET: React.FC = () => {
|
|||||||
|| event.name === 'status') {
|
|| event.name === 'status') {
|
||||||
refetch();
|
refetch();
|
||||||
}
|
}
|
||||||
|
// Live service mutations from another tab / admin: optimistically
|
||||||
|
// patch local state so the chip set reflects shape without a full
|
||||||
|
// re-hydrate. The post-mutation services list lives on the
|
||||||
|
// payload; same shape the actor's POST/DELETE response carries.
|
||||||
|
if (event.name === 'decky.service.added'
|
||||||
|
|| event.name === 'decky.service.removed') {
|
||||||
|
const p = event.payload ?? {};
|
||||||
|
const deckyName = typeof p.decky_name === 'string' ? p.decky_name : null;
|
||||||
|
const services = Array.isArray(p.services) ? p.services as string[] : null;
|
||||||
|
if (deckyName && services) {
|
||||||
|
setNodes((prev) => prev.map((n) => n.kind === 'decky' && n.name === deckyName
|
||||||
|
? { ...n, services } : n));
|
||||||
|
setStreamLive(true);
|
||||||
|
setLastEventAt(new Date());
|
||||||
|
}
|
||||||
|
}
|
||||||
}, [refetch]);
|
}, [refetch]);
|
||||||
const onStreamError = useCallback(() => { setStreamLive(false); }, []);
|
const onStreamError = useCallback(() => { setStreamLive(false); }, []);
|
||||||
useTopologyStream({
|
useTopologyStream({
|
||||||
|
|||||||
@@ -16,7 +16,13 @@ export type TopologyStreamEventName =
|
|||||||
| 'mutation.applying'
|
| 'mutation.applying'
|
||||||
| 'mutation.applied'
|
| 'mutation.applied'
|
||||||
| 'mutation.failed'
|
| 'mutation.failed'
|
||||||
| 'status';
|
| 'status'
|
||||||
|
// Live per-decky service mutations forwarded by the SSE proxy on the
|
||||||
|
// server. The payload carries decky_name + service_name + the
|
||||||
|
// post-mutation services list, so a second tab can reconcile shape
|
||||||
|
// without a refetch.
|
||||||
|
| 'decky.service.added'
|
||||||
|
| 'decky.service.removed';
|
||||||
|
|
||||||
export interface TopologyStreamEvent {
|
export interface TopologyStreamEvent {
|
||||||
name: TopologyStreamEventName | string;
|
name: TopologyStreamEventName | string;
|
||||||
@@ -40,6 +46,8 @@ const NAMED_EVENTS: TopologyStreamEventName[] = [
|
|||||||
'mutation.applied',
|
'mutation.applied',
|
||||||
'mutation.failed',
|
'mutation.failed',
|
||||||
'status',
|
'status',
|
||||||
|
'decky.service.added',
|
||||||
|
'decky.service.removed',
|
||||||
];
|
];
|
||||||
|
|
||||||
export function useTopologyStream({
|
export function useTopologyStream({
|
||||||
|
|||||||
Reference in New Issue
Block a user