From 059d1dba750284ab509fd14a94fd268543f6dc02 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 08:38:27 -0400 Subject: [PATCH] feat(web): live identity-resolution updates via SSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit useIdentityStream hook mirrors useTopologyStream — opens an EventSource against /api/v1/identities/events with the JWT in ?token=, dispatches the five named events (snapshot, formed, observation.linked, merged, unmerged) to the consumer, reconnects 3s after any error. AttackerDetail subscribes whenever it has an attacker id loaded. On any event whose payload references this observation's uuid OR the attacker's current identity_id, refetch /attackers/{id} so the IDENTITY badge appears (or follows through merges / unmerges) live without a tab refocus. IdentityDetail subscribes whenever it has an identity id loaded. On any event whose payload references this identity_id (formed for it, merge winner / loser, unmerge resurrected / former-winner), it refetches both the identity row and its observations list. Both consumers filter inside onEvent — the hook itself is dumb glue and stays unaware of which uuids any given component cares about. --- decnet_web/src/components/AttackerDetail.tsx | 34 ++++++ decnet_web/src/components/IdentityDetail.tsx | 34 ++++++ .../src/components/useIdentityStream.ts | 111 ++++++++++++++++++ 3 files changed, 179 insertions(+) create mode 100644 decnet_web/src/components/useIdentityStream.ts diff --git a/decnet_web/src/components/AttackerDetail.tsx b/decnet_web/src/components/AttackerDetail.tsx index ef972fc8..c10eee94 100644 --- a/decnet_web/src/components/AttackerDetail.tsx +++ b/decnet_web/src/components/AttackerDetail.tsx @@ -6,6 +6,7 @@ import ArtifactDrawer from './ArtifactDrawer'; import MailDrawer from './MailDrawer'; import SessionDrawer from './SessionDrawer'; import EmptyState from './EmptyState/EmptyState'; +import { useIdentityStream } from './useIdentityStream'; import './Dashboard.css'; interface AttackerBehavior { @@ -1279,6 +1280,39 @@ const AttackerDetail: React.FC = () => { fetchAttacker(); }, [id]); + // Re-fetch this attacker row whenever an identity event references + // its uuid. The IDENTITY badge appears once the clusterer binds the + // row, and follows through merges / unmerges live. + useIdentityStream({ + enabled: !!id, + onEvent: (ev) => { + if (!id) return; + const payload = ev.payload || {}; + const refs = new Set(); + const addUuid = (v: unknown) => { + if (typeof v === 'string') refs.add(v); + }; + addUuid(payload.observation_uuid); + const obsList = payload.observation_uuids; + if (Array.isArray(obsList)) obsList.forEach(addUuid); + // merge / unmerge events carry identity uuids, not observation + // uuids — but if the current attacker's identity_id matches any + // of them, we still want to refresh so the badge link follows. + addUuid(payload.identity_uuid); + addUuid(payload.winner_uuid); + addUuid(payload.loser_uuid); + addUuid(payload.resurrected_uuid); + addUuid(payload.former_winner_uuid); + + const myIdentity = attacker?.identity_id; + if (refs.has(id) || (myIdentity && refs.has(myIdentity))) { + api.get(`/attackers/${id}`) + .then((res) => setAttacker(res.data)) + .catch(() => {}); + } + }, + }); + useEffect(() => { if (!id) return; const fetchCommands = async () => { diff --git a/decnet_web/src/components/IdentityDetail.tsx b/decnet_web/src/components/IdentityDetail.tsx index cb05170a..e156261a 100644 --- a/decnet_web/src/components/IdentityDetail.tsx +++ b/decnet_web/src/components/IdentityDetail.tsx @@ -2,6 +2,7 @@ import React, { useEffect, useState } from 'react'; import { useParams, useNavigate } from 'react-router-dom'; import { ArrowLeft, Crosshair, Fingerprint, Globe, Radio } from '../icons'; import api from '../utils/api'; +import { useIdentityStream } from './useIdentityStream'; import './Dashboard.css'; /* @@ -105,6 +106,39 @@ const IdentityDetail: React.FC = () => { fetchObservations(); }, [id]); + // Live updates: when the clusterer fires an identity event that + // touches this identity (links a fresh observation, soft-merges, + // resurrects on unmerge), refetch both the row and the observations + // list so the page reflects current truth without a manual refresh. + useIdentityStream({ + enabled: !!id, + onEvent: (ev) => { + if (!id) return; + const payload = ev.payload || {}; + const refs = new Set(); + const addUuid = (v: unknown) => { + if (typeof v === 'string') refs.add(v); + }; + addUuid(payload.identity_uuid); + addUuid(payload.winner_uuid); + addUuid(payload.loser_uuid); + addUuid(payload.resurrected_uuid); + addUuid(payload.former_winner_uuid); + + if (refs.has(id)) { + api.get(`/identities/${id}`) + .then((res) => setIdentity(res.data)) + .catch(() => {}); + api.get(`/identities/${id}/observations?limit=50&offset=0`) + .then((res) => { + setObservations(res.data.data ?? []); + setObservationTotal(res.data.total ?? 0); + }) + .catch(() => {}); + } + }, + }); + if (loading) { return (
diff --git a/decnet_web/src/components/useIdentityStream.ts b/decnet_web/src/components/useIdentityStream.ts new file mode 100644 index 00000000..d424df26 --- /dev/null +++ b/decnet_web/src/components/useIdentityStream.ts @@ -0,0 +1,111 @@ +/** + * Identity-resolution event stream — opens an SSE connection to + * `/identities/events` and dispatches typed events to the caller. + * + * Mirrors `useTopologyStream` (reconnect on error after 3s, callbacks + * stashed in refs so the connection isn't torn down on every consumer + * rerender). The stream is broadly scoped — every identity event, not + * per-uuid — because both AttackerDetail and IdentityDetail want the + * same firehose: + * + * * AttackerDetail watches for `identity.formed` events whose payload + * references its observation uuid (the badge appears once the + * clusterer binds the row), plus `merged` / `unmerged` so the + * badge link updates if the row's identity gets re-pointed. + * * IdentityDetail watches for `observation.linked` / `merged` / + * `unmerged` against the identity it's rendering. + * + * Each consumer applies its own filter inside `onEvent`; the hook + * itself is dumb glue. + */ +import { useEffect, useRef } from 'react'; + +export type IdentityStreamEventName = + | 'snapshot' + | 'formed' + | 'observation.linked' + | 'merged' + | 'unmerged'; + +export interface IdentityStreamEvent { + name: IdentityStreamEventName | string; + topic?: string; + type?: string; + ts?: string; + payload: Record; +} + +export interface UseIdentityStreamOptions { + enabled: boolean; + onEvent: (event: IdentityStreamEvent) => void; + onError?: () => void; +} + +const NAMED_EVENTS: IdentityStreamEventName[] = [ + 'snapshot', + 'formed', + 'observation.linked', + 'merged', + 'unmerged', +]; + +export function useIdentityStream({ + enabled, + onEvent, + onError, +}: UseIdentityStreamOptions): void { + const esRef = useRef(null); + const reconnectRef = useRef | null>(null); + const onEventRef = useRef(onEvent); + const onErrorRef = useRef(onError); + useEffect(() => { onEventRef.current = onEvent; }, [onEvent]); + useEffect(() => { onErrorRef.current = onError; }, [onError]); + + useEffect(() => { + if (!enabled) return; + + const connect = () => { + if (esRef.current) esRef.current.close(); + const token = localStorage.getItem('token') ?? ''; + const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1'; + const url = `${baseUrl}/identities/events?token=${encodeURIComponent(token)}`; + + const es = new EventSource(url); + esRef.current = es; + + const dispatch = (name: string) => (event: MessageEvent) => { + try { + const parsed = JSON.parse(event.data) as Partial; + onEventRef.current({ + name, + topic: parsed.topic, + type: parsed.type, + ts: parsed.ts, + payload: (parsed.payload ?? {}) as Record, + }); + } catch (err) { + console.error('useIdentityStream: parse failed', err); + } + }; + + for (const name of NAMED_EVENTS) { + es.addEventListener(name, dispatch(name) as EventListener); + } + + es.onerror = () => { + es.close(); + esRef.current = null; + onErrorRef.current?.(); + reconnectRef.current = setTimeout(connect, 3000); + }; + }; + + connect(); + + return () => { + if (reconnectRef.current) clearTimeout(reconnectRef.current); + if (esRef.current) esRef.current.close(); + esRef.current = null; + }; + }, [enabled]); +}