feat(web): live identity-resolution updates via SSE
useIdentityStream hook mirrors useTopologyStream — opens an
EventSource against /api/v1/identities/events with the JWT in
?token=, dispatches the five named events (snapshot, formed,
observation.linked, merged, unmerged) to the consumer, reconnects
3s after any error.
AttackerDetail subscribes whenever it has an attacker id loaded.
On any event whose payload references this observation's uuid OR
the attacker's current identity_id, refetch /attackers/{id} so the
IDENTITY badge appears (or follows through merges / unmerges) live
without a tab refocus.
IdentityDetail subscribes whenever it has an identity id loaded.
On any event whose payload references this identity_id (formed for
it, merge winner / loser, unmerge resurrected / former-winner), it
refetches both the identity row and its observations list.
Both consumers filter inside onEvent — the hook itself is dumb glue
and stays unaware of which uuids any given component cares about.
This commit is contained in:
@@ -6,6 +6,7 @@ import ArtifactDrawer from './ArtifactDrawer';
|
||||
import MailDrawer from './MailDrawer';
|
||||
import SessionDrawer from './SessionDrawer';
|
||||
import EmptyState from './EmptyState/EmptyState';
|
||||
import { useIdentityStream } from './useIdentityStream';
|
||||
import './Dashboard.css';
|
||||
|
||||
interface AttackerBehavior {
|
||||
@@ -1279,6 +1280,39 @@ const AttackerDetail: React.FC = () => {
|
||||
fetchAttacker();
|
||||
}, [id]);
|
||||
|
||||
// Re-fetch this attacker row whenever an identity event references
|
||||
// its uuid. The IDENTITY badge appears once the clusterer binds the
|
||||
// row, and follows through merges / unmerges live.
|
||||
useIdentityStream({
|
||||
enabled: !!id,
|
||||
onEvent: (ev) => {
|
||||
if (!id) return;
|
||||
const payload = ev.payload || {};
|
||||
const refs = new Set<string>();
|
||||
const addUuid = (v: unknown) => {
|
||||
if (typeof v === 'string') refs.add(v);
|
||||
};
|
||||
addUuid(payload.observation_uuid);
|
||||
const obsList = payload.observation_uuids;
|
||||
if (Array.isArray(obsList)) obsList.forEach(addUuid);
|
||||
// merge / unmerge events carry identity uuids, not observation
|
||||
// uuids — but if the current attacker's identity_id matches any
|
||||
// of them, we still want to refresh so the badge link follows.
|
||||
addUuid(payload.identity_uuid);
|
||||
addUuid(payload.winner_uuid);
|
||||
addUuid(payload.loser_uuid);
|
||||
addUuid(payload.resurrected_uuid);
|
||||
addUuid(payload.former_winner_uuid);
|
||||
|
||||
const myIdentity = attacker?.identity_id;
|
||||
if (refs.has(id) || (myIdentity && refs.has(myIdentity))) {
|
||||
api.get(`/attackers/${id}`)
|
||||
.then((res) => setAttacker(res.data))
|
||||
.catch(() => {});
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (!id) return;
|
||||
const fetchCommands = async () => {
|
||||
|
||||
@@ -2,6 +2,7 @@ import React, { useEffect, useState } from 'react';
|
||||
import { useParams, useNavigate } from 'react-router-dom';
|
||||
import { ArrowLeft, Crosshair, Fingerprint, Globe, Radio } from '../icons';
|
||||
import api from '../utils/api';
|
||||
import { useIdentityStream } from './useIdentityStream';
|
||||
import './Dashboard.css';
|
||||
|
||||
/*
|
||||
@@ -105,6 +106,39 @@ const IdentityDetail: React.FC = () => {
|
||||
fetchObservations();
|
||||
}, [id]);
|
||||
|
||||
// Live updates: when the clusterer fires an identity event that
|
||||
// touches this identity (links a fresh observation, soft-merges,
|
||||
// resurrects on unmerge), refetch both the row and the observations
|
||||
// list so the page reflects current truth without a manual refresh.
|
||||
useIdentityStream({
|
||||
enabled: !!id,
|
||||
onEvent: (ev) => {
|
||||
if (!id) return;
|
||||
const payload = ev.payload || {};
|
||||
const refs = new Set<string>();
|
||||
const addUuid = (v: unknown) => {
|
||||
if (typeof v === 'string') refs.add(v);
|
||||
};
|
||||
addUuid(payload.identity_uuid);
|
||||
addUuid(payload.winner_uuid);
|
||||
addUuid(payload.loser_uuid);
|
||||
addUuid(payload.resurrected_uuid);
|
||||
addUuid(payload.former_winner_uuid);
|
||||
|
||||
if (refs.has(id)) {
|
||||
api.get(`/identities/${id}`)
|
||||
.then((res) => setIdentity(res.data))
|
||||
.catch(() => {});
|
||||
api.get(`/identities/${id}/observations?limit=50&offset=0`)
|
||||
.then((res) => {
|
||||
setObservations(res.data.data ?? []);
|
||||
setObservationTotal(res.data.total ?? 0);
|
||||
})
|
||||
.catch(() => {});
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
if (loading) {
|
||||
return (
|
||||
<div className="dashboard">
|
||||
|
||||
111
decnet_web/src/components/useIdentityStream.ts
Normal file
111
decnet_web/src/components/useIdentityStream.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
/**
|
||||
* Identity-resolution event stream — opens an SSE connection to
|
||||
* `/identities/events` and dispatches typed events to the caller.
|
||||
*
|
||||
* Mirrors `useTopologyStream` (reconnect on error after 3s, callbacks
|
||||
* stashed in refs so the connection isn't torn down on every consumer
|
||||
* rerender). The stream is broadly scoped — every identity event, not
|
||||
* per-uuid — because both AttackerDetail and IdentityDetail want the
|
||||
* same firehose:
|
||||
*
|
||||
* * AttackerDetail watches for `identity.formed` events whose payload
|
||||
* references its observation uuid (the badge appears once the
|
||||
* clusterer binds the row), plus `merged` / `unmerged` so the
|
||||
* badge link updates if the row's identity gets re-pointed.
|
||||
* * IdentityDetail watches for `observation.linked` / `merged` /
|
||||
* `unmerged` against the identity it's rendering.
|
||||
*
|
||||
* Each consumer applies its own filter inside `onEvent`; the hook
|
||||
* itself is dumb glue.
|
||||
*/
|
||||
import { useEffect, useRef } from 'react';
|
||||
|
||||
export type IdentityStreamEventName =
|
||||
| 'snapshot'
|
||||
| 'formed'
|
||||
| 'observation.linked'
|
||||
| 'merged'
|
||||
| 'unmerged';
|
||||
|
||||
export interface IdentityStreamEvent {
|
||||
name: IdentityStreamEventName | string;
|
||||
topic?: string;
|
||||
type?: string;
|
||||
ts?: string;
|
||||
payload: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface UseIdentityStreamOptions {
|
||||
enabled: boolean;
|
||||
onEvent: (event: IdentityStreamEvent) => void;
|
||||
onError?: () => void;
|
||||
}
|
||||
|
||||
const NAMED_EVENTS: IdentityStreamEventName[] = [
|
||||
'snapshot',
|
||||
'formed',
|
||||
'observation.linked',
|
||||
'merged',
|
||||
'unmerged',
|
||||
];
|
||||
|
||||
export function useIdentityStream({
|
||||
enabled,
|
||||
onEvent,
|
||||
onError,
|
||||
}: UseIdentityStreamOptions): void {
|
||||
const esRef = useRef<EventSource | null>(null);
|
||||
const reconnectRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
const onEventRef = useRef(onEvent);
|
||||
const onErrorRef = useRef(onError);
|
||||
useEffect(() => { onEventRef.current = onEvent; }, [onEvent]);
|
||||
useEffect(() => { onErrorRef.current = onError; }, [onError]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled) return;
|
||||
|
||||
const connect = () => {
|
||||
if (esRef.current) esRef.current.close();
|
||||
const token = localStorage.getItem('token') ?? '';
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
const url = `${baseUrl}/identities/events?token=${encodeURIComponent(token)}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
esRef.current = es;
|
||||
|
||||
const dispatch = (name: string) => (event: MessageEvent) => {
|
||||
try {
|
||||
const parsed = JSON.parse(event.data) as Partial<IdentityStreamEvent>;
|
||||
onEventRef.current({
|
||||
name,
|
||||
topic: parsed.topic,
|
||||
type: parsed.type,
|
||||
ts: parsed.ts,
|
||||
payload: (parsed.payload ?? {}) as Record<string, unknown>,
|
||||
});
|
||||
} catch (err) {
|
||||
console.error('useIdentityStream: parse failed', err);
|
||||
}
|
||||
};
|
||||
|
||||
for (const name of NAMED_EVENTS) {
|
||||
es.addEventListener(name, dispatch(name) as EventListener);
|
||||
}
|
||||
|
||||
es.onerror = () => {
|
||||
es.close();
|
||||
esRef.current = null;
|
||||
onErrorRef.current?.();
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
if (reconnectRef.current) clearTimeout(reconnectRef.current);
|
||||
if (esRef.current) esRef.current.close();
|
||||
esRef.current = null;
|
||||
};
|
||||
}, [enabled]);
|
||||
}
|
||||
Reference in New Issue
Block a user