diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index fdd5576f..38c9b8fb 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -15,6 +15,7 @@ from .fleet.api_deploy_deckies import router as deploy_deckies_router from .stream.api_stream_events import router as stream_router from .attackers.api_get_attackers import router as attackers_router from .attackers.api_export_attackers import router as attackers_export_router +from .attackers.api_events import router as attacker_events_router from .attackers.api_get_attacker_detail import router as attacker_detail_router from .attackers.api_get_attacker_commands import router as attacker_commands_router from .attackers.api_get_attacker_artifacts import router as attacker_artifacts_router @@ -103,6 +104,7 @@ api_router.include_router(deploy_deckies_router) api_router.include_router(attackers_router) api_router.include_router(attackers_export_router) api_router.include_router(attacker_detail_router) +api_router.include_router(attacker_events_router) api_router.include_router(attacker_commands_router) api_router.include_router(attacker_artifacts_router) api_router.include_router(attacker_transcripts_router) diff --git a/decnet/web/router/attackers/_guards.py b/decnet/web/router/attackers/_guards.py new file mode 100644 index 00000000..3f23d342 --- /dev/null +++ b/decnet/web/router/attackers/_guards.py @@ -0,0 +1,27 @@ +"""Shared helpers for the per-attacker routes. + +Currently houses the 404 guard used by the SSE events stream +(:mod:`api_events`). Mirrors the topology router's +``_guards.get_topology_or_404`` shape so a future grep for "guard" +finds both. +""" +from __future__ import annotations + +from typing import Any + +from fastapi import HTTPException + +from decnet.web.dependencies import repo + + +async def get_attacker_or_404(attacker_uuid: str) -> dict[str, Any]: + """Fetch an Attacker row by UUID or raise 404. + + The 404 fires *after* auth (the route's role gate runs first), so + an existence probe can't leak a UUID's presence to an + unauthenticated caller. + """ + attacker = await repo.get_attacker_by_uuid(attacker_uuid) + if not attacker: + raise HTTPException(status_code=404, detail="Attacker not found") + return attacker diff --git a/decnet/web/router/attackers/api_events.py b/decnet/web/router/attackers/api_events.py new file mode 100644 index 00000000..5d7fe4d8 --- /dev/null +++ b/decnet/web/router/attackers/api_events.py @@ -0,0 +1,202 @@ +"""SSE stream of per-attacker behavioural events — one connection per +AttackerDetail page. + +Subscribes to ``attacker.observation.>``, +``attacker.fingerprint_rotated`` and ``attacker.scored`` on the +:class:`~decnet.bus.base.BaseBus` for the duration of the request and +forwards each event whose payload's ``attacker_uuid`` matches this +stream's attacker. Emits a one-shot snapshot on connect (latest +observation per primitive) so the panel hydrates immediately. + +Authorization mirrors :mod:`decnet.web.router.topology.api_events` — +JWT via the ``?token=`` query parameter (EventSource can't set +arbitrary headers) + ``require_stream_viewer`` role gate. The 404 +fires after auth so an existence probe can't leak an attacker UUID +to an unauthenticated caller. + +Per-attacker filter is keyed on the DECNET-side ``attacker_uuid`` +denorm the profiler worker stamps onto every published payload (see +``BEHAVE-INTEGRATION.md`` §339-366 deviation note + Phase 5 +amendment in ``decnet/profiler/behave_shell/_handler.py``). +""" +from __future__ import annotations + +import asyncio +from typing import AsyncGenerator + +import orjson +from fastapi import APIRouter, Depends, Request +from fastapi.responses import StreamingResponse + +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_stream_viewer +from decnet.web.sse_limits import sse_connection_slot + +from ._guards import get_attacker_or_404 + +log = get_logger("api.attackers.events") + +router = APIRouter() + +_KEEPALIVE_SECS = 15.0 +_QUEUE_MAX = 256 + + +def _format_sse(event_name: str, data: dict) -> str: + """Build one SSE frame: ``event: \\ndata: \\n\\n``.""" + return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n" + + +def _sse_name_for(topic: str) -> str: + """Derive an SSE ``event:`` name from a bus topic. + + ``attacker.observation.`` → ``observation`` + (the primitive ride-along is in the payload, not the event name — + a per-primitive event name would force the frontend hook to + register 37+ listeners or know the registry. Single event name + keeps the EventSource handler shape uniform.) + + ``attacker.fingerprint_rotated`` → ``fingerprint.rotated`` + ``attacker.scored`` → ``attacker.scored`` + + Anything else passes through unchanged so a future ``attacker.*`` + family doesn't silently collapse onto a generic bucket. + """ + if topic.startswith("attacker.observation."): + return "observation" + if topic == f"{_topics.ATTACKER}.{_topics.ATTACKER_FINGERPRINT_ROTATED}": + return "fingerprint.rotated" + if topic == f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}": + return "attacker.scored" + return topic + + +@router.get( + "/attackers/{attacker_uuid}/events", + tags=["Attacker Profiles"], + responses={ + 200: { + "content": {"text/event-stream": {}}, + "description": "SSE stream of behavioural events for one attacker", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Attacker not found"}, + 429: {"description": "Per-user SSE connection cap reached"}, + }, +) +@_traced("api.attackers.events") +async def api_attacker_events( + attacker_uuid: str, + request: Request, + user: dict = Depends(require_stream_viewer), +) -> StreamingResponse: + # 404-after-auth so an existence probe can't enumerate attacker UUIDs. + await get_attacker_or_404(attacker_uuid) + + snapshot_per_primitive = await repo.latest_observation_per_primitive( + attacker_uuid, + ) + snapshot_observations = [ + {"primitive": primitive, **payload} + for primitive, payload in sorted(snapshot_per_primitive.items()) + ] + + async def generator() -> AsyncGenerator[str, None]: + async with sse_connection_slot(user["uuid"]): + # Flush headers immediately so the browser's EventSource + # sees a live connection before the first real event. + yield ": keepalive\n\n" + + yield _format_sse("snapshot", { + "attacker_uuid": attacker_uuid, + "observations": snapshot_observations, + }) + + bus = await get_app_bus() + if bus is None: + # Bus disabled (NullBus) or unreachable. The snapshot + # is still useful; idle on keepalives so the client + # stays connected and re-polls on its own timers. + while not await request.is_disconnected(): + try: + await asyncio.sleep(_KEEPALIVE_SECS) + except asyncio.CancelledError: + break + yield ": keepalive\n\n" + return + + # Three subscriptions, merged through one queue. Per-attacker + # filter on payload["attacker_uuid"] — the profiler worker + # stamps it on every published payload (Phase 5 amendment). + obs_sub = bus.subscribe(f"{_topics.ATTACKER}.{_topics.ATTACKER_OBSERVATION_PREFIX}.>") + fp_sub = bus.subscribe( + f"{_topics.ATTACKER}.{_topics.ATTACKER_FINGERPRINT_ROTATED}", + ) + score_sub = bus.subscribe( + f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}", + ) + queue: asyncio.Queue = asyncio.Queue(maxsize=_QUEUE_MAX) + + async def _pump(sub) -> None: + async with sub: + async for ev in sub: + payload = ev.payload or {} + if payload.get("attacker_uuid") != attacker_uuid: + continue + try: + queue.put_nowait(ev) + except asyncio.QueueFull: + # Drop on overflow rather than backpressuring + # the bus; the snapshot + reconnect path will + # cover any gap a slow consumer creates. + pass + + tasks = [ + asyncio.create_task(_pump(obs_sub)), + asyncio.create_task(_pump(fp_sub)), + asyncio.create_task(_pump(score_sub)), + ] + try: + while True: + if await request.is_disconnected(): + break + try: + event = await asyncio.wait_for( + queue.get(), timeout=_KEEPALIVE_SECS, + ) + except asyncio.TimeoutError: + yield ": keepalive\n\n" + continue + yield _format_sse( + _sse_name_for(event.topic), + { + "topic": event.topic, + "type": event.type, + "ts": event.ts, + "payload": event.payload, + }, + ) + except asyncio.CancelledError: + pass + except Exception: + log.exception( + "attacker events stream crashed attacker_uuid=%s", + attacker_uuid, + ) + yield _format_sse("error", {"message": "Stream interrupted"}) + finally: + for t in tasks: + t.cancel() + + return StreamingResponse( + generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) diff --git a/tests/api/attackers/__init__.py b/tests/api/attackers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/api/attackers/test_events_stream.py b/tests/api/attackers/test_events_stream.py new file mode 100644 index 00000000..9851d5c7 --- /dev/null +++ b/tests/api/attackers/test_events_stream.py @@ -0,0 +1,352 @@ +"""SSE events stream — GET /attackers/{uuid}/events (Phase 5). + +Mirrors the topology events test pattern at +``tests/api/topology/test_events_stream.py`` — drives the generator +directly to avoid the full httpx streaming roundtrip, which is +painful under ASGITransport + an infinite SSE loop. +""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from typing import Any + +import httpx +import pytest + +from decnet.bus import app as _bus_app +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.web.api import app +from decnet.web.dependencies import repo as _repo + +_V1 = "/api/v1/attackers" +_OTHER_UUID = "ffffffff-eeee-dddd-cccc-bbbbbbbbbbbb" + + +# ── Fixtures ──────────────────────────────────────────────────────── + + +@pytest.fixture +def _fake_app_bus(monkeypatch): + bus = FakeBus() + + async def _get() -> FakeBus: + if not bus._connected: + await bus.connect() + return bus + + monkeypatch.setattr(_bus_app, "get_app_bus", _get) + from decnet.web.router.attackers import api_events as _ev + monkeypatch.setattr(_ev, "get_app_bus", _get) + return bus + + +async def _seed_attacker(ip: str = "10.0.0.5") -> str: + """Persist a minimal Attacker row, return its uuid.""" + return await _repo.upsert_attacker({ + "ip": ip, + "first_seen": datetime.now(timezone.utc), + "last_seen": datetime.now(timezone.utc), + "event_count": 1, + "service_count": 1, + "decky_count": 1, + "services": "[\"ssh\"]", + "deckies": "[\"d1\"]", + "traversal_path": None, + "is_traversal": False, + "bounty_count": 0, + "credential_count": 0, + "fingerprints": "[]", + "commands": "[]", + "country_code": None, + "country_source": None, + "asn": None, + "as_name": None, + "asn_source": None, + "updated_at": datetime.now(timezone.utc), + }) + + +async def _seed_observation( + attacker_uuid: str, + primitive: str, + value: str, + confidence: float = 0.85, +) -> None: + await _repo.upsert_observation({ + "primitive": primitive, + "value": value, + "confidence": confidence, + "window_start_ts": 0.0, + "window_end_ts": 1.0, + "source": "test", + "evidence_ref": f"shard:test#{primitive}", + "envelope_v": 1, + "ts": 1714521660.456, + "attacker_uuid": attacker_uuid, + }) + + +# ── Auth / 404 paths ──────────────────────────────────────────────── + + +@pytest.mark.anyio +async def test_events_unauthenticated_401(_fake_app_bus): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get(f"{_V1}/any-uuid/events") + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_events_missing_attacker_404(auth_token, _fake_app_bus): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get( + f"{_V1}/{_OTHER_UUID}/events", + params={"token": auth_token}, + ) + assert r.status_code == 404 + + +# ── Generator-driven tests ────────────────────────────────────────── + + +def _as_text(frame) -> str: + return frame if isinstance(frame, str) else frame.decode() + + +async def _drive_until(gen, predicate, *, max_frames: int = 8) -> tuple[bool, list[str]]: + """Pump frames out of the generator until ``predicate(frame)`` is + True or ``max_frames`` is exhausted. Returns ``(matched, frames_seen)``.""" + seen: list[str] = [] + for _ in range(max_frames): + frame = _as_text(await gen.__anext__()) + seen.append(frame) + if predicate(frame): + return True, seen + return False, seen + + +@pytest.mark.anyio +async def test_emits_snapshot_on_connect(auth_token, _fake_app_bus): + """Snapshot frame fires immediately and contains seeded observations.""" + attacker_uuid = await _seed_attacker(ip="10.0.0.5") + await _seed_observation(attacker_uuid, "motor.input_modality", "typed") + + from decnet.web.router.attackers import api_events as _ev + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + response = await _ev.api_attacker_events( + attacker_uuid=attacker_uuid, + request=_FakeRequest(), # type: ignore[arg-type] + user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"}, + ) + gen = response.body_iterator + try: + matched, seen = await asyncio.wait_for( + _drive_until( + gen, + lambda f: "event: snapshot" in f and "motor.input_modality" in f, + ), + timeout=5.0, + ) + finally: + await gen.aclose() + assert matched, f"snapshot not found in frames: {seen}" + + +@pytest.mark.anyio +async def test_forwards_observation_for_this_attacker(auth_token, _fake_app_bus): + """A live attacker.observation event reaches the SSE stream.""" + attacker_uuid = await _seed_attacker(ip="10.0.0.6") + + from decnet.web.router.attackers import api_events as _ev + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + response = await _ev.api_attacker_events( + attacker_uuid=attacker_uuid, + request=_FakeRequest(), # type: ignore[arg-type] + user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"}, + ) + gen = response.body_iterator + + async def _publish_after_snapshot() -> None: + await asyncio.sleep(0.1) + await _fake_app_bus.publish( + _topics.attacker_observation("motor.input_modality"), + {"attacker_uuid": attacker_uuid, "primitive": "motor.input_modality", + "value": "pasted", "confidence": 0.9}, + event_type="motor.input_modality", + ) + + pub_task = asyncio.create_task(_publish_after_snapshot()) + try: + matched, seen = await asyncio.wait_for( + _drive_until( + gen, + # Event name is "observation"; primitive rides in payload. + lambda f: "event: observation" in f + and "motor.input_modality" in f, + ), + timeout=5.0, + ) + finally: + pub_task.cancel() + try: + await pub_task + except (asyncio.CancelledError, Exception): + pass + await gen.aclose() + assert matched, f"live frame not found: {seen}" + + +@pytest.mark.anyio +async def test_drops_observation_for_other_attackers(auth_token, _fake_app_bus): + """An event with a different attacker_uuid must NOT be forwarded. + + We can't wait forever for a nothing — so we publish ONE matching + event first, drive past it, then publish a non-matching event, + then publish another matching event, and assert the + middle-non-matching frame never appeared between the two matches. + """ + attacker_uuid = await _seed_attacker(ip="10.0.0.7") + + from decnet.web.router.attackers import api_events as _ev + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + response = await _ev.api_attacker_events( + attacker_uuid=attacker_uuid, + request=_FakeRequest(), # type: ignore[arg-type] + user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"}, + ) + gen = response.body_iterator + + async def _publish_sequence() -> None: + await asyncio.sleep(0.1) + # Non-matching event — must be dropped by the per-attacker filter. + await _fake_app_bus.publish( + _topics.attacker_observation("motor.input_modality"), + {"attacker_uuid": _OTHER_UUID, "primitive": "motor.input_modality", + "value": "should-not-appear"}, + event_type="motor.input_modality", + ) + await asyncio.sleep(0.05) + # Matching event — drives the loop forward, so we know the + # non-matching one had its chance. + await _fake_app_bus.publish( + _topics.attacker_observation("cognitive.cognitive_load"), + {"attacker_uuid": attacker_uuid, "primitive": "cognitive.cognitive_load", + "value": "high"}, + event_type="cognitive.cognitive_load", + ) + + pub_task = asyncio.create_task(_publish_sequence()) + try: + matched, seen = await asyncio.wait_for( + _drive_until( + gen, + lambda f: "event: observation" in f + and "cognitive.cognitive_load" in f, + ), + timeout=5.0, + ) + finally: + pub_task.cancel() + try: + await pub_task + except (asyncio.CancelledError, Exception): + pass + await gen.aclose() + assert matched, f"matching frame missing: {seen}" + # The dropped event's distinguishing string must never appear. + assert not any("should-not-appear" in f for f in seen), ( + f"per-attacker filter leaked: {seen}" + ) + + +@pytest.mark.anyio +async def test_includes_fingerprint_rotated_for_this_attacker( + auth_token, _fake_app_bus, +): + attacker_uuid = await _seed_attacker(ip="10.0.0.8") + + from decnet.web.router.attackers import api_events as _ev + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + response = await _ev.api_attacker_events( + attacker_uuid=attacker_uuid, + request=_FakeRequest(), # type: ignore[arg-type] + user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"}, + ) + gen = response.body_iterator + + async def _publish() -> None: + await asyncio.sleep(0.1) + await _fake_app_bus.publish( + _topics.attacker(_topics.ATTACKER_FINGERPRINT_ROTATED), + {"attacker_uuid": attacker_uuid, "old_fp": "a", "new_fp": "b"}, + event_type=_topics.ATTACKER_FINGERPRINT_ROTATED, + ) + + pub_task = asyncio.create_task(_publish()) + try: + matched, seen = await asyncio.wait_for( + _drive_until(gen, lambda f: "event: fingerprint.rotated" in f), + timeout=5.0, + ) + finally: + pub_task.cancel() + try: + await pub_task + except (asyncio.CancelledError, Exception): + pass + await gen.aclose() + assert matched + + +# ── _sse_name_for unit ────────────────────────────────────────────── + + +def test_sse_name_for_observation_collapses_to_single_event_name(): + """Per-primitive events all share the SSE event name 'observation'; + the primitive rides in payload.""" + from decnet.web.router.attackers.api_events import _sse_name_for + assert ( + _sse_name_for("attacker.observation.motor.input_modality") + == "observation" + ) + assert ( + _sse_name_for("attacker.observation.motor.shell_mastery.tab_completion") + == "observation" + ) + + +def test_sse_name_for_fingerprint_rotated(): + from decnet.web.router.attackers.api_events import _sse_name_for + assert _sse_name_for("attacker.fingerprint_rotated") == "fingerprint.rotated" + + +def test_sse_name_for_scored(): + from decnet.web.router.attackers.api_events import _sse_name_for + assert _sse_name_for("attacker.scored") == "attacker.scored" + + +def test_sse_name_for_unknown_passes_through(): + from decnet.web.router.attackers.api_events import _sse_name_for + assert _sse_name_for("attacker.something_new") == "attacker.something_new"