From 97aa57faed2dac2cefca43ba63ba10caccd8a43e Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 08:36:17 -0400 Subject: [PATCH] feat(api): SSE stream for identity events at /api/v1/identities/events Mirrors GET /api/v1/topologies/{id}/events: subscribes to identity.> on the bus for the duration of the request and forwards each event as a named SSE frame (formed / observation.linked / merged / unmerged). The endpoint is broadly scoped (every identity event, not per-uuid) because both AttackerDetail and IdentityDetail need the same firehose: AttackerDetail watches for an identity.formed that finally binds its identity_id; IdentityDetail watches for observation.linked / merged / unmerged against its current row. A per-uuid filter would force the client to know its identity before subscribing, which it doesn't always. JWT via ?token= (EventSource can't set headers), require_stream_viewer gate, sse_connection_slot per-user cap, snapshot-on-connect with the first 50 identities so the client buffer renders without a separate REST call. Bus-disabled / unreachable path keeps the connection alive on keepalives so the client doesn't reconnect-storm; it can re-poll the REST API on its own timer. --- decnet/web/router/__init__.py | 2 + decnet/web/router/identities/api_events.py | 143 +++++++++++++++++++++ tests/api/identities/__init__.py | 0 tests/api/identities/test_events_stream.py | 126 ++++++++++++++++++ 4 files changed, 271 insertions(+) create mode 100644 decnet/web/router/identities/api_events.py create mode 100644 tests/api/identities/__init__.py create mode 100644 tests/api/identities/test_events_stream.py diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 1e232aa1..54660b92 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -24,6 +24,7 @@ from .attackers.api_get_attacker_intel import router as attacker_intel_router from .identities.api_list_identities import router as identities_list_router from .identities.api_get_identity_detail import router as identity_detail_router from .identities.api_list_identity_observations import router as identity_observations_router +from .identities.api_events import router as identity_events_router from .transcripts import transcripts_router from .config.api_get_config import router as config_get_router from .config.api_update_config import router as config_update_router @@ -94,6 +95,7 @@ api_router.include_router(attacker_intel_router) api_router.include_router(identities_list_router) api_router.include_router(identity_detail_router) api_router.include_router(identity_observations_router) +api_router.include_router(identity_events_router) # Observability api_router.include_router(stats_router) diff --git a/decnet/web/router/identities/api_events.py b/decnet/web/router/identities/api_events.py new file mode 100644 index 00000000..1e9b7fcc --- /dev/null +++ b/decnet/web/router/identities/api_events.py @@ -0,0 +1,143 @@ +"""SSE stream of identity-resolution events — one connection per viewer. + +Subscribes to ``identity.>`` on the :class:`~decnet.bus.base.BaseBus` for +the duration of the request and forwards each matching bus event as a +Server-Sent Event to the browser. Emits a one-shot snapshot on connect +(current paginated identity list) so the client doesn't need a separate +fetch to initialise. + +Authorization mirrors :mod:`decnet.web.router.topology.api_events` — a +JWT passed via the ``?token=`` query parameter (EventSource can't set +arbitrary headers) + ``require_stream_viewer`` role gate. + +The endpoint is broadly scoped (every identity event, not per-uuid) +because both ``AttackerDetail`` and ``IdentityDetail`` need the same +firehose: a bare ``AttackerDetail`` watches for ``identity.formed`` +events that finally bind its ``identity_id``, and ``IdentityDetail`` +watches for ``observation.linked`` / ``merged`` / ``unmerged`` against +the identity it's rendering. A per-uuid filter would force the client +to know its identity before subscribing, which it doesn't always. +""" +from __future__ import annotations + +import asyncio +from typing import AsyncGenerator + +import orjson +from fastapi import APIRouter, Depends, Request +from fastapi.responses import StreamingResponse + +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_stream_viewer +from decnet.web.sse_limits import sse_connection_slot + +log = get_logger("api.identities.events") + +router = APIRouter() + +_KEEPALIVE_SECS = 15.0 +_SNAPSHOT_LIMIT = 50 + + +def _format_sse(event_name: str, data: dict) -> str: + return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n" + + +@router.get( + "/identities/events", + tags=["Identity Resolution"], + responses={ + 200: { + "content": {"text/event-stream": {}}, + "description": "SSE stream of identity-resolution events", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 429: {"description": "Per-user SSE connection cap reached"}, + }, +) +@_traced("api.identities.events") +async def api_identities_events( + request: Request, + user: dict = Depends(require_stream_viewer), +) -> StreamingResponse: + # Event types emitted: snapshot, formed, observation.linked, + # merged, unmerged. All wrap bus events whose payload is also + # reachable via viewer-gated REST (GET /identities/*). + snapshot = await repo.list_identities(limit=_SNAPSHOT_LIMIT, offset=0) + + async def generator() -> AsyncGenerator[str, None]: + async with sse_connection_slot(user["uuid"]): + yield ": keepalive\n\n" + yield _format_sse("snapshot", {"identities": snapshot}) + + bus = await get_app_bus() + if bus is None: + # Bus disabled / unreachable — keep the connection + # alive so the client doesn't reconnect-storm; it can + # re-poll the REST API on its own timer. + while not await request.is_disconnected(): + try: + await asyncio.sleep(_KEEPALIVE_SECS) + except asyncio.CancelledError: + break + yield ": keepalive\n\n" + return + + sub = bus.subscribe(f"{_topics.IDENTITY}.>") + try: + async with sub: + sub_iter = sub.__aiter__() + while True: + if await request.is_disconnected(): + break + next_task = asyncio.ensure_future(sub_iter.__anext__()) + try: + event = await asyncio.wait_for( + next_task, timeout=_KEEPALIVE_SECS, + ) + except asyncio.TimeoutError: + next_task.cancel() + yield ": keepalive\n\n" + continue + except StopAsyncIteration: + break + yield _format_sse( + _sse_name_for(event.topic), + { + "topic": event.topic, + "type": event.type, + "ts": event.ts, + "payload": event.payload, + }, + ) + except asyncio.CancelledError: + pass + except Exception: + log.exception("identity events stream crashed") + yield _format_sse("error", {"message": "Stream interrupted"}) + + return StreamingResponse( + generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +def _sse_name_for(topic: str) -> str: + """Derive an SSE ``event:`` name from a bus topic. + + ``identity.formed`` → ``formed`` + ``identity.observation.linked`` → ``observation.linked`` + Pass-through preserves dotted leaves so the frontend can switch on + a stable name. + """ + if topic.startswith(f"{_topics.IDENTITY}."): + return topic[len(_topics.IDENTITY) + 1:] + return topic diff --git a/tests/api/identities/__init__.py b/tests/api/identities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/api/identities/test_events_stream.py b/tests/api/identities/test_events_stream.py new file mode 100644 index 00000000..f7fc2fcb --- /dev/null +++ b/tests/api/identities/test_events_stream.py @@ -0,0 +1,126 @@ +"""SSE events stream — GET /api/v1/identities/events. + +Mirrors :mod:`tests.api.topology.test_events_stream` — the route is +thin glue, so we drive the generator directly to dodge the full +httpx streaming roundtrip under ASGITransport. +""" +from __future__ import annotations + +import asyncio + +import httpx +import pytest + +from decnet.bus import app as _bus_app +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.web.api import app + +_V1 = "/api/v1/identities" + + +@pytest.fixture +def _fake_app_bus(monkeypatch): + bus = FakeBus() + + async def _get() -> FakeBus: + if not bus._connected: + await bus.connect() + return bus + + monkeypatch.setattr(_bus_app, "get_app_bus", _get) + from decnet.web.router.identities import api_events as _ev + monkeypatch.setattr(_ev, "get_app_bus", _get) + return bus + + +@pytest.mark.anyio +async def test_identity_events_unauthenticated_401(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get(f"{_V1}/events") + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_identity_events_emits_snapshot_and_live_event(_fake_app_bus): + """Generator yields a snapshot frame on connect, then forwards + bus events under ``identity.>`` as named SSE events.""" + from decnet.web.router.identities import api_events as _ev + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + response = await _ev.api_identities_events( + request=_FakeRequest(), # type: ignore[arg-type] + user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"}, + ) + gen = response.body_iterator + + def _as_text(frame) -> str: + return frame if isinstance(frame, str) else frame.decode() + + async def _publish_after_snapshot() -> None: + await asyncio.sleep(0.1) + await _fake_app_bus.publish( + _topics.identity(_topics.IDENTITY_FORMED), + { + "identity_uuid": "id-1", + "observation_uuids": ["obs-1", "obs-2"], + }, + event_type=_topics.IDENTITY_FORMED, + ) + await asyncio.sleep(0.05) + await _fake_app_bus.publish( + _topics.identity(_topics.IDENTITY_UNMERGED), + {"resurrected_uuid": "id-2", "former_winner_uuid": "id-1"}, + event_type=_topics.IDENTITY_UNMERGED, + ) + + pub_task = asyncio.create_task(_publish_after_snapshot()) + + async def _drive() -> tuple[bool, bool, bool]: + saw_snapshot = False + saw_formed = False + saw_unmerged = False + for _ in range(8): + frame = _as_text(await gen.__anext__()) + if "event: snapshot" in frame: + saw_snapshot = True + if "event: formed" in frame: + saw_formed = True + if "event: unmerged" in frame: + saw_unmerged = True + if saw_snapshot and saw_formed and saw_unmerged: + break + return saw_snapshot, saw_formed, saw_unmerged + + try: + saw_snapshot, saw_formed, saw_unmerged = await asyncio.wait_for( + _drive(), timeout=5.0, + ) + finally: + pub_task.cancel() + try: + await pub_task + except (asyncio.CancelledError, Exception): + pass + await gen.aclose() + + assert saw_snapshot + assert saw_formed + assert saw_unmerged + + +def test_sse_name_maps_dotted_leaves(): + """``observation.linked`` survives the topic-to-event-name mapping + intact so the frontend can switch on the full dotted leaf.""" + from decnet.web.router.identities.api_events import _sse_name_for + assert _sse_name_for("identity.formed") == "formed" + assert _sse_name_for("identity.observation.linked") == "observation.linked" + assert _sse_name_for("identity.merged") == "merged" + assert _sse_name_for("identity.unmerged") == "unmerged" + # Non-identity topics pass through unchanged. + assert _sse_name_for("system.bus.health") == "system.bus.health"