fix(web): restore SSE streams via single-use ticket flow
The V3.1.1 backend change moved SSE auth off ?token=<JWT> onto a single-use
?ticket=, but the dashboard was never updated, so every live stream 401'd
('Could not validate credentials'). Add mintSseTicket() (POST /auth/sse-ticket
with the Bearer JWT, returns an opaque 60s single-use ticket) and refactor all
stream consumers to mint a fresh ticket at the top of each connect() — initial
and every reconnect — then open EventSource with ?ticket=. A reused single-use
ticket would 401-loop, so re-mint-per-connect is required.
Covers Dashboard /stream, LiveLogs, and the attacker/identity/campaign/
orchestrator/topology hooks. connect() is now async with an unmount guard
(cancelled flag checked after the await, before opening the stream); on a mint
401 the connect is skipped and the axios logout interceptor takes over.
This commit is contained in:
@@ -4,6 +4,7 @@ import { useNavigate } from 'react-router-dom';
|
||||
import './Dashboard.css';
|
||||
import { Shield, Users, Activity, Clock, Paperclip, Crosshair, Flame, Archive, ShieldOff, Server, LayoutDashboard } from '../icons';
|
||||
import { parseEventBody } from '../utils/parseEventBody';
|
||||
import { mintSseTicket } from '../utils/sseTicket';
|
||||
import ArtifactDrawer from './ArtifactDrawer';
|
||||
import EmptyState from './EmptyState/EmptyState';
|
||||
|
||||
@@ -93,12 +94,24 @@ const Dashboard: React.FC<DashboardProps> = ({ searchQuery }) => {
|
||||
const logsContainerRef = useRef<HTMLDivElement | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
const connect = () => {
|
||||
let cancelled = false;
|
||||
|
||||
const connect = async () => {
|
||||
if (eventSourceRef.current) eventSourceRef.current.close();
|
||||
|
||||
const token = localStorage.getItem('token');
|
||||
let ticket: string;
|
||||
try {
|
||||
ticket = await mintSseTicket();
|
||||
} catch {
|
||||
if (!cancelled) {
|
||||
reconnectTimerRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cancelled) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
let url = `${baseUrl}/stream?token=${token}`;
|
||||
let url = `${baseUrl}/stream?ticket=${encodeURIComponent(ticket)}`;
|
||||
if (searchQuery) url += `&search=${encodeURIComponent(searchQuery)}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
@@ -125,13 +138,16 @@ const Dashboard: React.FC<DashboardProps> = ({ searchQuery }) => {
|
||||
es.onerror = () => {
|
||||
es.close();
|
||||
eventSourceRef.current = null;
|
||||
reconnectTimerRef.current = setTimeout(connect, 3000);
|
||||
if (!cancelled) {
|
||||
reconnectTimerRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (reconnectTimerRef.current) clearTimeout(reconnectTimerRef.current);
|
||||
if (eventSourceRef.current) eventSourceRef.current.close();
|
||||
};
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
} from '../icons';
|
||||
import api from '../utils/api';
|
||||
import { parseEventBody } from '../utils/parseEventBody';
|
||||
import { mintSseTicket } from '../utils/sseTicket';
|
||||
import ArtifactDrawer from './ArtifactDrawer';
|
||||
import EmptyState from './EmptyState/EmptyState';
|
||||
import './Dashboard.css';
|
||||
@@ -75,34 +76,6 @@ const LiveLogs: React.FC = () => {
|
||||
}
|
||||
};
|
||||
|
||||
const setupSSE = () => {
|
||||
if (eventSourceRef.current) eventSourceRef.current.close();
|
||||
|
||||
const token = localStorage.getItem('token');
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
let url = `${baseUrl}/stream?token=${token}&search=${encodeURIComponent(query)}`;
|
||||
const startTime = startTimeParam();
|
||||
if (startTime) url += `&start_time=${startTime}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
eventSourceRef.current = es;
|
||||
|
||||
es.onmessage = (event) => {
|
||||
try {
|
||||
const payload = JSON.parse(event.data);
|
||||
if (payload.type === 'logs') {
|
||||
setLogs(prev => [...payload.data, ...prev].slice(0, 500));
|
||||
} else if (payload.type === 'stats') {
|
||||
setTotalLogs(payload.data.total_logs);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to parse SSE payload', err);
|
||||
}
|
||||
};
|
||||
|
||||
es.onerror = () => console.error('SSE connection lost, reconnecting...');
|
||||
};
|
||||
|
||||
// Always seed with REST backlog on mount / filter changes.
|
||||
useEffect(() => {
|
||||
fetchData();
|
||||
@@ -110,13 +83,56 @@ const LiveLogs: React.FC = () => {
|
||||
|
||||
// SSE follows the streaming toggle independently.
|
||||
useEffect(() => {
|
||||
if (streaming) {
|
||||
setupSSE();
|
||||
} else if (eventSourceRef.current) {
|
||||
eventSourceRef.current.close();
|
||||
eventSourceRef.current = null;
|
||||
if (!streaming) {
|
||||
if (eventSourceRef.current) {
|
||||
eventSourceRef.current.close();
|
||||
eventSourceRef.current = null;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let cancelled = false;
|
||||
|
||||
const setupSSE = async () => {
|
||||
if (eventSourceRef.current) eventSourceRef.current.close();
|
||||
|
||||
let ticket: string;
|
||||
try {
|
||||
ticket = await mintSseTicket();
|
||||
} catch {
|
||||
console.error('SSE ticket mint failed, will not connect stream');
|
||||
return;
|
||||
}
|
||||
if (cancelled) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
let url = `${baseUrl}/stream?ticket=${encodeURIComponent(ticket)}&search=${encodeURIComponent(query)}`;
|
||||
const startTime = startTimeParam();
|
||||
if (startTime) url += `&start_time=${startTime}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
eventSourceRef.current = es;
|
||||
|
||||
es.onmessage = (event) => {
|
||||
try {
|
||||
const payload = JSON.parse(event.data);
|
||||
if (payload.type === 'logs') {
|
||||
setLogs(prev => [...payload.data, ...prev].slice(0, 500));
|
||||
} else if (payload.type === 'stats') {
|
||||
setTotalLogs(payload.data.total_logs);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to parse SSE payload', err);
|
||||
}
|
||||
};
|
||||
|
||||
es.onerror = () => console.error('SSE connection lost, reconnecting...');
|
||||
};
|
||||
|
||||
setupSSE();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (eventSourceRef.current) {
|
||||
eventSourceRef.current.close();
|
||||
eventSourceRef.current = null;
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
* pending topologies don't open a useless channel.
|
||||
*/
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { mintSseTicket } from '../../utils/sseTicket';
|
||||
|
||||
export type TopologyStreamEventName =
|
||||
| 'snapshot'
|
||||
@@ -69,11 +70,25 @@ export function useTopologyStream({
|
||||
useEffect(() => {
|
||||
if (!enabled || !topologyId) return;
|
||||
|
||||
const connect = () => {
|
||||
let cancelled = false;
|
||||
|
||||
const connect = async () => {
|
||||
if (esRef.current) esRef.current.close();
|
||||
const token = localStorage.getItem('token') ?? '';
|
||||
|
||||
let ticket: string;
|
||||
try {
|
||||
ticket = await mintSseTicket();
|
||||
} catch {
|
||||
onErrorRef.current?.();
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cancelled) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
const url = `${baseUrl}/topologies/${topologyId}/events?token=${encodeURIComponent(token)}`;
|
||||
const url = `${baseUrl}/topologies/${topologyId}/events?ticket=${encodeURIComponent(ticket)}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
esRef.current = es;
|
||||
@@ -101,13 +116,16 @@ export function useTopologyStream({
|
||||
es.close();
|
||||
esRef.current = null;
|
||||
onErrorRef.current?.();
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (reconnectRef.current) clearTimeout(reconnectRef.current);
|
||||
if (esRef.current) esRef.current.close();
|
||||
esRef.current = null;
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
* multi_actor on the same identity.
|
||||
*/
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { mintSseTicket } from '../utils/sseTicket';
|
||||
|
||||
export interface ObservationFrame {
|
||||
primitive: string;
|
||||
@@ -142,11 +143,25 @@ export function useAttackerStream({
|
||||
useEffect(() => {
|
||||
if (!enabled || !attackerUuid) return;
|
||||
|
||||
const connect = () => {
|
||||
let cancelled = false;
|
||||
|
||||
const connect = async () => {
|
||||
if (esRef.current) esRef.current.close();
|
||||
const token = localStorage.getItem('token') ?? '';
|
||||
|
||||
let ticket: string;
|
||||
try {
|
||||
ticket = await mintSseTicket();
|
||||
} catch {
|
||||
onErrorRef.current?.();
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, RECONNECT_MS);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cancelled) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
const url = `${baseUrl}/attackers/${encodeURIComponent(attackerUuid)}/events?token=${encodeURIComponent(token)}`;
|
||||
const url = `${baseUrl}/attackers/${encodeURIComponent(attackerUuid)}/events?ticket=${encodeURIComponent(ticket)}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
esRef.current = es;
|
||||
@@ -194,13 +209,16 @@ export function useAttackerStream({
|
||||
es.close();
|
||||
esRef.current = null;
|
||||
onErrorRef.current?.();
|
||||
reconnectRef.current = setTimeout(connect, RECONNECT_MS);
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, RECONNECT_MS);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (reconnectRef.current) clearTimeout(reconnectRef.current);
|
||||
if (esRef.current) esRef.current.close();
|
||||
esRef.current = null;
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
* fires.
|
||||
*/
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { mintSseTicket } from '../utils/sseTicket';
|
||||
|
||||
export type CampaignStreamEventName =
|
||||
| 'snapshot'
|
||||
@@ -54,11 +55,25 @@ export function useCampaignStream({
|
||||
useEffect(() => {
|
||||
if (!enabled) return;
|
||||
|
||||
const connect = () => {
|
||||
let cancelled = false;
|
||||
|
||||
const connect = async () => {
|
||||
if (esRef.current) esRef.current.close();
|
||||
const token = localStorage.getItem('token') ?? '';
|
||||
|
||||
let ticket: string;
|
||||
try {
|
||||
ticket = await mintSseTicket();
|
||||
} catch {
|
||||
onErrorRef.current?.();
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cancelled) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
const url = `${baseUrl}/campaigns/events?token=${encodeURIComponent(token)}`;
|
||||
const url = `${baseUrl}/campaigns/events?ticket=${encodeURIComponent(ticket)}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
esRef.current = es;
|
||||
@@ -86,13 +101,16 @@ export function useCampaignStream({
|
||||
es.close();
|
||||
esRef.current = null;
|
||||
onErrorRef.current?.();
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (reconnectRef.current) clearTimeout(reconnectRef.current);
|
||||
if (esRef.current) esRef.current.close();
|
||||
esRef.current = null;
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
* itself is dumb glue.
|
||||
*/
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { mintSseTicket } from '../utils/sseTicket';
|
||||
|
||||
export type IdentityStreamEventName =
|
||||
| 'snapshot'
|
||||
@@ -67,11 +68,25 @@ export function useIdentityStream({
|
||||
useEffect(() => {
|
||||
if (!enabled) return;
|
||||
|
||||
const connect = () => {
|
||||
let cancelled = false;
|
||||
|
||||
const connect = async () => {
|
||||
if (esRef.current) esRef.current.close();
|
||||
const token = localStorage.getItem('token') ?? '';
|
||||
|
||||
let ticket: string;
|
||||
try {
|
||||
ticket = await mintSseTicket();
|
||||
} catch {
|
||||
onErrorRef.current?.();
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cancelled) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
const url = `${baseUrl}/identities/events?token=${encodeURIComponent(token)}`;
|
||||
const url = `${baseUrl}/identities/events?ticket=${encodeURIComponent(ticket)}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
esRef.current = es;
|
||||
@@ -99,13 +114,16 @@ export function useIdentityStream({
|
||||
es.close();
|
||||
esRef.current = null;
|
||||
onErrorRef.current?.();
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (reconnectRef.current) clearTimeout(reconnectRef.current);
|
||||
if (esRef.current) esRef.current.close();
|
||||
esRef.current = null;
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
* caller. Mirror of `useCampaignStream`.
|
||||
*/
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { mintSseTicket } from '../utils/sseTicket';
|
||||
|
||||
export type OrchestratorStreamEventName =
|
||||
| 'snapshot'
|
||||
@@ -49,12 +50,26 @@ export function useOrchestratorStream({
|
||||
useEffect(() => {
|
||||
if (!enabled) return;
|
||||
|
||||
const connect = () => {
|
||||
let cancelled = false;
|
||||
|
||||
const connect = async () => {
|
||||
if (esRef.current) esRef.current.close();
|
||||
onStatusRef.current?.('connecting');
|
||||
const token = localStorage.getItem('token') ?? '';
|
||||
|
||||
let ticket: string;
|
||||
try {
|
||||
ticket = await mintSseTicket();
|
||||
} catch {
|
||||
onStatusRef.current?.('error');
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cancelled) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
|
||||
const url = `${baseUrl}/orchestrator/events/stream?token=${encodeURIComponent(token)}`;
|
||||
const url = `${baseUrl}/orchestrator/events/stream?ticket=${encodeURIComponent(ticket)}`;
|
||||
|
||||
const es = new EventSource(url);
|
||||
esRef.current = es;
|
||||
@@ -84,13 +99,16 @@ export function useOrchestratorStream({
|
||||
es.close();
|
||||
esRef.current = null;
|
||||
onStatusRef.current?.('error');
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
if (!cancelled) {
|
||||
reconnectRef.current = setTimeout(connect, 3000);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (reconnectRef.current) clearTimeout(reconnectRef.current);
|
||||
if (esRef.current) esRef.current.close();
|
||||
esRef.current = null;
|
||||
|
||||
52
decnet_web/src/utils/sseTicket.test.ts
Normal file
52
decnet_web/src/utils/sseTicket.test.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
|
||||
// Mock the api module BEFORE importing the unit under test so the module
|
||||
// factory runs first and replaces the real axios instance.
|
||||
vi.mock('./api', () => ({
|
||||
default: {
|
||||
post: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
import api from './api';
|
||||
import { mintSseTicket } from './sseTicket';
|
||||
|
||||
const mockPost = api.post as ReturnType<typeof vi.fn>;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('mintSseTicket', () => {
|
||||
it('POSTs to /auth/sse-ticket and returns the ticket string', async () => {
|
||||
mockPost.mockResolvedValueOnce({
|
||||
data: { ticket: 'opaque-abc-123', expires_in: 60 },
|
||||
});
|
||||
|
||||
const ticket = await mintSseTicket();
|
||||
|
||||
expect(mockPost).toHaveBeenCalledTimes(1);
|
||||
expect(mockPost).toHaveBeenCalledWith('/auth/sse-ticket');
|
||||
expect(ticket).toBe('opaque-abc-123');
|
||||
});
|
||||
|
||||
it('propagates API errors to the caller', async () => {
|
||||
const err = Object.assign(new Error('Unauthorized'), {
|
||||
response: { status: 401, data: { detail: 'Could not validate credentials' } },
|
||||
});
|
||||
mockPost.mockRejectedValueOnce(err);
|
||||
|
||||
await expect(mintSseTicket()).rejects.toThrow('Unauthorized');
|
||||
expect(mockPost).toHaveBeenCalledWith('/auth/sse-ticket');
|
||||
});
|
||||
|
||||
it('propagates network errors (no response object) to the caller', async () => {
|
||||
mockPost.mockRejectedValueOnce(new Error('Network Error'));
|
||||
|
||||
await expect(mintSseTicket()).rejects.toThrow('Network Error');
|
||||
});
|
||||
});
|
||||
26
decnet_web/src/utils/sseTicket.ts
Normal file
26
decnet_web/src/utils/sseTicket.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
/**
|
||||
* SSE ticket helper — mints a single-use opaque ticket for authenticating
|
||||
* an EventSource connection. Native EventSource cannot set an Authorization
|
||||
* header, so the backend issues a short-lived (?60 s) ticket via a normal
|
||||
* Bearer-authenticated REST call and the ticket is passed as ?ticket= on
|
||||
* the stream URL.
|
||||
*
|
||||
* IMPORTANT: the ticket is SINGLE-USE. Mint a fresh ticket for every
|
||||
* connection attempt — initial connect AND every reconnect.
|
||||
*/
|
||||
import api from './api';
|
||||
|
||||
/**
|
||||
* POST /auth/sse-ticket with the normal Bearer JWT (attached automatically
|
||||
* by the axios `api` instance) and return the opaque ticket string.
|
||||
*
|
||||
* Throws if the API call fails (e.g. 401 when the JWT has expired).
|
||||
* Callers are responsible for handling the error — typically by invoking
|
||||
* their existing onError handler and scheduling a reconnect, which will
|
||||
* cause the axios 401 interceptor to fire `auth:logout`.
|
||||
*/
|
||||
export async function mintSseTicket(): Promise<string> {
|
||||
const res = await api.post<{ ticket: string; expires_in: number }>('/auth/sse-ticket');
|
||||
return res.data.ticket;
|
||||
}
|
||||
Reference in New Issue
Block a user