/** * Campaign-clustering event stream — opens an SSE connection to * `/campaigns/events` and dispatches typed events to the caller. * * Mirror of `useIdentityStream` for the layer above. CampaignDetail * subscribes to refresh its own row + linked-identity list when * `campaign.identity.assigned` / `campaign.merged` / `campaign.unmerged` * fires. */ import { useEffect, useRef } from 'react'; export type CampaignStreamEventName = | 'snapshot' | 'formed' | 'identity.assigned' | 'merged' | 'unmerged'; export interface CampaignStreamEvent { name: CampaignStreamEventName | string; topic?: string; type?: string; ts?: string; payload: Record; } export interface UseCampaignStreamOptions { enabled: boolean; onEvent: (event: CampaignStreamEvent) => void; onError?: () => void; } const NAMED_EVENTS: CampaignStreamEventName[] = [ 'snapshot', 'formed', 'identity.assigned', 'merged', 'unmerged', ]; export function useCampaignStream({ enabled, onEvent, onError, }: UseCampaignStreamOptions): void { const esRef = useRef(null); const reconnectRef = useRef | null>(null); const onEventRef = useRef(onEvent); const onErrorRef = useRef(onError); useEffect(() => { onEventRef.current = onEvent; }, [onEvent]); useEffect(() => { onErrorRef.current = onError; }, [onError]); useEffect(() => { if (!enabled) return; const connect = () => { if (esRef.current) esRef.current.close(); const token = localStorage.getItem('token') ?? ''; const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1'; const url = `${baseUrl}/campaigns/events?token=${encodeURIComponent(token)}`; const es = new EventSource(url); esRef.current = es; const dispatch = (name: string) => (event: MessageEvent) => { try { const parsed = JSON.parse(event.data) as Partial; onEventRef.current({ name, topic: parsed.topic, type: parsed.type, ts: parsed.ts, payload: (parsed.payload ?? {}) as Record, }); } catch (err) { console.error('useCampaignStream: parse failed', err); } }; for (const name of NAMED_EVENTS) { es.addEventListener(name, dispatch(name) as EventListener); } es.onerror = () => { es.close(); esRef.current = null; onErrorRef.current?.(); reconnectRef.current = setTimeout(connect, 3000); }; }; connect(); return () => { if (reconnectRef.current) clearTimeout(reconnectRef.current); if (esRef.current) esRef.current.close(); esRef.current = null; }; }, [enabled]); }