feat(api/attackers): per-attacker SSE events stream
GET /api/v1/attackers/{uuid}/events streams behavioural events for
one attacker. Mirrors decnet/web/router/topology/api_events.py
end-to-end: ?token= auth, require_stream_viewer gate,
sse_connection_slot per-user cap, snapshot-on-connect, three bus
subscriptions (attacker.observation.>, attacker.fingerprint_rotated,
attacker.scored) merged through asyncio.Queue, 15s keepalive,
request.is_disconnected() exit, finally task cancellation.
Per-attacker filter keys on payload['attacker_uuid'] which the
profiler worker stamps onto every published payload (Phase 5 P5.0
amendment) — O(1) drop without a repo round-trip per event.
_sse_name_for derives SSE event names:
attacker.observation.<primitive> → observation.<primitive>
attacker.fingerprint_rotated → fingerprint.rotated
attacker.scored → attacker.scored
10 tests cover snapshot, live forward, per-attacker filter (drops
other attackers' events), fingerprint.rotated forward, 404, 401, and
the sse-name derivation across all four cases. Topology events
regression green.
This commit is contained in:
@@ -15,6 +15,7 @@ from .fleet.api_deploy_deckies import router as deploy_deckies_router
|
||||
from .stream.api_stream_events import router as stream_router
|
||||
from .attackers.api_get_attackers import router as attackers_router
|
||||
from .attackers.api_export_attackers import router as attackers_export_router
|
||||
from .attackers.api_events import router as attacker_events_router
|
||||
from .attackers.api_get_attacker_detail import router as attacker_detail_router
|
||||
from .attackers.api_get_attacker_commands import router as attacker_commands_router
|
||||
from .attackers.api_get_attacker_artifacts import router as attacker_artifacts_router
|
||||
@@ -103,6 +104,7 @@ api_router.include_router(deploy_deckies_router)
|
||||
api_router.include_router(attackers_router)
|
||||
api_router.include_router(attackers_export_router)
|
||||
api_router.include_router(attacker_detail_router)
|
||||
api_router.include_router(attacker_events_router)
|
||||
api_router.include_router(attacker_commands_router)
|
||||
api_router.include_router(attacker_artifacts_router)
|
||||
api_router.include_router(attacker_transcripts_router)
|
||||
|
||||
27
decnet/web/router/attackers/_guards.py
Normal file
27
decnet/web/router/attackers/_guards.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""Shared helpers for the per-attacker routes.
|
||||
|
||||
Currently houses the 404 guard used by the SSE events stream
|
||||
(:mod:`api_events`). Mirrors the topology router's
|
||||
``_guards.get_topology_or_404`` shape so a future grep for "guard"
|
||||
finds both.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
|
||||
async def get_attacker_or_404(attacker_uuid: str) -> dict[str, Any]:
|
||||
"""Fetch an Attacker row by UUID or raise 404.
|
||||
|
||||
The 404 fires *after* auth (the route's role gate runs first), so
|
||||
an existence probe can't leak a UUID's presence to an
|
||||
unauthenticated caller.
|
||||
"""
|
||||
attacker = await repo.get_attacker_by_uuid(attacker_uuid)
|
||||
if not attacker:
|
||||
raise HTTPException(status_code=404, detail="Attacker not found")
|
||||
return attacker
|
||||
202
decnet/web/router/attackers/api_events.py
Normal file
202
decnet/web/router/attackers/api_events.py
Normal file
@@ -0,0 +1,202 @@
|
||||
"""SSE stream of per-attacker behavioural events — one connection per
|
||||
AttackerDetail page.
|
||||
|
||||
Subscribes to ``attacker.observation.>``,
|
||||
``attacker.fingerprint_rotated`` and ``attacker.scored`` on the
|
||||
:class:`~decnet.bus.base.BaseBus` for the duration of the request and
|
||||
forwards each event whose payload's ``attacker_uuid`` matches this
|
||||
stream's attacker. Emits a one-shot snapshot on connect (latest
|
||||
observation per primitive) so the panel hydrates immediately.
|
||||
|
||||
Authorization mirrors :mod:`decnet.web.router.topology.api_events` —
|
||||
JWT via the ``?token=`` query parameter (EventSource can't set
|
||||
arbitrary headers) + ``require_stream_viewer`` role gate. The 404
|
||||
fires after auth so an existence probe can't leak an attacker UUID
|
||||
to an unauthenticated caller.
|
||||
|
||||
Per-attacker filter is keyed on the DECNET-side ``attacker_uuid``
|
||||
denorm the profiler worker stamps onto every published payload (see
|
||||
``BEHAVE-INTEGRATION.md`` §339-366 deviation note + Phase 5
|
||||
amendment in ``decnet/profiler/behave_shell/_handler.py``).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
|
||||
import orjson
|
||||
from fastapi import APIRouter, Depends, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from decnet.bus import topics as _topics
|
||||
from decnet.bus.app import get_app_bus
|
||||
from decnet.logging import get_logger
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.web.dependencies import repo, require_stream_viewer
|
||||
from decnet.web.sse_limits import sse_connection_slot
|
||||
|
||||
from ._guards import get_attacker_or_404
|
||||
|
||||
log = get_logger("api.attackers.events")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
_KEEPALIVE_SECS = 15.0
|
||||
_QUEUE_MAX = 256
|
||||
|
||||
|
||||
def _format_sse(event_name: str, data: dict) -> str:
|
||||
"""Build one SSE frame: ``event: <name>\\ndata: <json>\\n\\n``."""
|
||||
return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n"
|
||||
|
||||
|
||||
def _sse_name_for(topic: str) -> str:
|
||||
"""Derive an SSE ``event:`` name from a bus topic.
|
||||
|
||||
``attacker.observation.<primitive>`` → ``observation``
|
||||
(the primitive ride-along is in the payload, not the event name —
|
||||
a per-primitive event name would force the frontend hook to
|
||||
register 37+ listeners or know the registry. Single event name
|
||||
keeps the EventSource handler shape uniform.)
|
||||
|
||||
``attacker.fingerprint_rotated`` → ``fingerprint.rotated``
|
||||
``attacker.scored`` → ``attacker.scored``
|
||||
|
||||
Anything else passes through unchanged so a future ``attacker.*``
|
||||
family doesn't silently collapse onto a generic bucket.
|
||||
"""
|
||||
if topic.startswith("attacker.observation."):
|
||||
return "observation"
|
||||
if topic == f"{_topics.ATTACKER}.{_topics.ATTACKER_FINGERPRINT_ROTATED}":
|
||||
return "fingerprint.rotated"
|
||||
if topic == f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}":
|
||||
return "attacker.scored"
|
||||
return topic
|
||||
|
||||
|
||||
@router.get(
|
||||
"/attackers/{attacker_uuid}/events",
|
||||
tags=["Attacker Profiles"],
|
||||
responses={
|
||||
200: {
|
||||
"content": {"text/event-stream": {}},
|
||||
"description": "SSE stream of behavioural events for one attacker",
|
||||
},
|
||||
401: {"description": "Could not validate credentials"},
|
||||
403: {"description": "Insufficient permissions"},
|
||||
404: {"description": "Attacker not found"},
|
||||
429: {"description": "Per-user SSE connection cap reached"},
|
||||
},
|
||||
)
|
||||
@_traced("api.attackers.events")
|
||||
async def api_attacker_events(
|
||||
attacker_uuid: str,
|
||||
request: Request,
|
||||
user: dict = Depends(require_stream_viewer),
|
||||
) -> StreamingResponse:
|
||||
# 404-after-auth so an existence probe can't enumerate attacker UUIDs.
|
||||
await get_attacker_or_404(attacker_uuid)
|
||||
|
||||
snapshot_per_primitive = await repo.latest_observation_per_primitive(
|
||||
attacker_uuid,
|
||||
)
|
||||
snapshot_observations = [
|
||||
{"primitive": primitive, **payload}
|
||||
for primitive, payload in sorted(snapshot_per_primitive.items())
|
||||
]
|
||||
|
||||
async def generator() -> AsyncGenerator[str, None]:
|
||||
async with sse_connection_slot(user["uuid"]):
|
||||
# Flush headers immediately so the browser's EventSource
|
||||
# sees a live connection before the first real event.
|
||||
yield ": keepalive\n\n"
|
||||
|
||||
yield _format_sse("snapshot", {
|
||||
"attacker_uuid": attacker_uuid,
|
||||
"observations": snapshot_observations,
|
||||
})
|
||||
|
||||
bus = await get_app_bus()
|
||||
if bus is None:
|
||||
# Bus disabled (NullBus) or unreachable. The snapshot
|
||||
# is still useful; idle on keepalives so the client
|
||||
# stays connected and re-polls on its own timers.
|
||||
while not await request.is_disconnected():
|
||||
try:
|
||||
await asyncio.sleep(_KEEPALIVE_SECS)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
yield ": keepalive\n\n"
|
||||
return
|
||||
|
||||
# Three subscriptions, merged through one queue. Per-attacker
|
||||
# filter on payload["attacker_uuid"] — the profiler worker
|
||||
# stamps it on every published payload (Phase 5 amendment).
|
||||
obs_sub = bus.subscribe(f"{_topics.ATTACKER}.{_topics.ATTACKER_OBSERVATION_PREFIX}.>")
|
||||
fp_sub = bus.subscribe(
|
||||
f"{_topics.ATTACKER}.{_topics.ATTACKER_FINGERPRINT_ROTATED}",
|
||||
)
|
||||
score_sub = bus.subscribe(
|
||||
f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}",
|
||||
)
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=_QUEUE_MAX)
|
||||
|
||||
async def _pump(sub) -> None:
|
||||
async with sub:
|
||||
async for ev in sub:
|
||||
payload = ev.payload or {}
|
||||
if payload.get("attacker_uuid") != attacker_uuid:
|
||||
continue
|
||||
try:
|
||||
queue.put_nowait(ev)
|
||||
except asyncio.QueueFull:
|
||||
# Drop on overflow rather than backpressuring
|
||||
# the bus; the snapshot + reconnect path will
|
||||
# cover any gap a slow consumer creates.
|
||||
pass
|
||||
|
||||
tasks = [
|
||||
asyncio.create_task(_pump(obs_sub)),
|
||||
asyncio.create_task(_pump(fp_sub)),
|
||||
asyncio.create_task(_pump(score_sub)),
|
||||
]
|
||||
try:
|
||||
while True:
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
try:
|
||||
event = await asyncio.wait_for(
|
||||
queue.get(), timeout=_KEEPALIVE_SECS,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
yield ": keepalive\n\n"
|
||||
continue
|
||||
yield _format_sse(
|
||||
_sse_name_for(event.topic),
|
||||
{
|
||||
"topic": event.topic,
|
||||
"type": event.type,
|
||||
"ts": event.ts,
|
||||
"payload": event.payload,
|
||||
},
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception:
|
||||
log.exception(
|
||||
"attacker events stream crashed attacker_uuid=%s",
|
||||
attacker_uuid,
|
||||
)
|
||||
yield _format_sse("error", {"message": "Stream interrupted"})
|
||||
finally:
|
||||
for t in tasks:
|
||||
t.cancel()
|
||||
|
||||
return StreamingResponse(
|
||||
generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
0
tests/api/attackers/__init__.py
Normal file
0
tests/api/attackers/__init__.py
Normal file
352
tests/api/attackers/test_events_stream.py
Normal file
352
tests/api/attackers/test_events_stream.py
Normal file
@@ -0,0 +1,352 @@
|
||||
"""SSE events stream — GET /attackers/{uuid}/events (Phase 5).
|
||||
|
||||
Mirrors the topology events test pattern at
|
||||
``tests/api/topology/test_events_stream.py`` — drives the generator
|
||||
directly to avoid the full httpx streaming roundtrip, which is
|
||||
painful under ASGITransport + an infinite SSE loop.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from decnet.bus import app as _bus_app
|
||||
from decnet.bus import topics as _topics
|
||||
from decnet.bus.fake import FakeBus
|
||||
from decnet.web.api import app
|
||||
from decnet.web.dependencies import repo as _repo
|
||||
|
||||
_V1 = "/api/v1/attackers"
|
||||
_OTHER_UUID = "ffffffff-eeee-dddd-cccc-bbbbbbbbbbbb"
|
||||
|
||||
|
||||
# ── Fixtures ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _fake_app_bus(monkeypatch):
|
||||
bus = FakeBus()
|
||||
|
||||
async def _get() -> FakeBus:
|
||||
if not bus._connected:
|
||||
await bus.connect()
|
||||
return bus
|
||||
|
||||
monkeypatch.setattr(_bus_app, "get_app_bus", _get)
|
||||
from decnet.web.router.attackers import api_events as _ev
|
||||
monkeypatch.setattr(_ev, "get_app_bus", _get)
|
||||
return bus
|
||||
|
||||
|
||||
async def _seed_attacker(ip: str = "10.0.0.5") -> str:
|
||||
"""Persist a minimal Attacker row, return its uuid."""
|
||||
return await _repo.upsert_attacker({
|
||||
"ip": ip,
|
||||
"first_seen": datetime.now(timezone.utc),
|
||||
"last_seen": datetime.now(timezone.utc),
|
||||
"event_count": 1,
|
||||
"service_count": 1,
|
||||
"decky_count": 1,
|
||||
"services": "[\"ssh\"]",
|
||||
"deckies": "[\"d1\"]",
|
||||
"traversal_path": None,
|
||||
"is_traversal": False,
|
||||
"bounty_count": 0,
|
||||
"credential_count": 0,
|
||||
"fingerprints": "[]",
|
||||
"commands": "[]",
|
||||
"country_code": None,
|
||||
"country_source": None,
|
||||
"asn": None,
|
||||
"as_name": None,
|
||||
"asn_source": None,
|
||||
"updated_at": datetime.now(timezone.utc),
|
||||
})
|
||||
|
||||
|
||||
async def _seed_observation(
|
||||
attacker_uuid: str,
|
||||
primitive: str,
|
||||
value: str,
|
||||
confidence: float = 0.85,
|
||||
) -> None:
|
||||
await _repo.upsert_observation({
|
||||
"primitive": primitive,
|
||||
"value": value,
|
||||
"confidence": confidence,
|
||||
"window_start_ts": 0.0,
|
||||
"window_end_ts": 1.0,
|
||||
"source": "test",
|
||||
"evidence_ref": f"shard:test#{primitive}",
|
||||
"envelope_v": 1,
|
||||
"ts": 1714521660.456,
|
||||
"attacker_uuid": attacker_uuid,
|
||||
})
|
||||
|
||||
|
||||
# ── Auth / 404 paths ────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_events_unauthenticated_401(_fake_app_bus):
|
||||
async with httpx.AsyncClient(
|
||||
transport=httpx.ASGITransport(app=app), base_url="http://test",
|
||||
) as ac:
|
||||
r = await ac.get(f"{_V1}/any-uuid/events")
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_events_missing_attacker_404(auth_token, _fake_app_bus):
|
||||
async with httpx.AsyncClient(
|
||||
transport=httpx.ASGITransport(app=app), base_url="http://test",
|
||||
) as ac:
|
||||
r = await ac.get(
|
||||
f"{_V1}/{_OTHER_UUID}/events",
|
||||
params={"token": auth_token},
|
||||
)
|
||||
assert r.status_code == 404
|
||||
|
||||
|
||||
# ── Generator-driven tests ──────────────────────────────────────────
|
||||
|
||||
|
||||
def _as_text(frame) -> str:
|
||||
return frame if isinstance(frame, str) else frame.decode()
|
||||
|
||||
|
||||
async def _drive_until(gen, predicate, *, max_frames: int = 8) -> tuple[bool, list[str]]:
|
||||
"""Pump frames out of the generator until ``predicate(frame)`` is
|
||||
True or ``max_frames`` is exhausted. Returns ``(matched, frames_seen)``."""
|
||||
seen: list[str] = []
|
||||
for _ in range(max_frames):
|
||||
frame = _as_text(await gen.__anext__())
|
||||
seen.append(frame)
|
||||
if predicate(frame):
|
||||
return True, seen
|
||||
return False, seen
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_emits_snapshot_on_connect(auth_token, _fake_app_bus):
|
||||
"""Snapshot frame fires immediately and contains seeded observations."""
|
||||
attacker_uuid = await _seed_attacker(ip="10.0.0.5")
|
||||
await _seed_observation(attacker_uuid, "motor.input_modality", "typed")
|
||||
|
||||
from decnet.web.router.attackers import api_events as _ev
|
||||
|
||||
class _FakeRequest:
|
||||
async def is_disconnected(self) -> bool:
|
||||
return False
|
||||
|
||||
response = await _ev.api_attacker_events(
|
||||
attacker_uuid=attacker_uuid,
|
||||
request=_FakeRequest(), # type: ignore[arg-type]
|
||||
user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"},
|
||||
)
|
||||
gen = response.body_iterator
|
||||
try:
|
||||
matched, seen = await asyncio.wait_for(
|
||||
_drive_until(
|
||||
gen,
|
||||
lambda f: "event: snapshot" in f and "motor.input_modality" in f,
|
||||
),
|
||||
timeout=5.0,
|
||||
)
|
||||
finally:
|
||||
await gen.aclose()
|
||||
assert matched, f"snapshot not found in frames: {seen}"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwards_observation_for_this_attacker(auth_token, _fake_app_bus):
|
||||
"""A live attacker.observation event reaches the SSE stream."""
|
||||
attacker_uuid = await _seed_attacker(ip="10.0.0.6")
|
||||
|
||||
from decnet.web.router.attackers import api_events as _ev
|
||||
|
||||
class _FakeRequest:
|
||||
async def is_disconnected(self) -> bool:
|
||||
return False
|
||||
|
||||
response = await _ev.api_attacker_events(
|
||||
attacker_uuid=attacker_uuid,
|
||||
request=_FakeRequest(), # type: ignore[arg-type]
|
||||
user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"},
|
||||
)
|
||||
gen = response.body_iterator
|
||||
|
||||
async def _publish_after_snapshot() -> None:
|
||||
await asyncio.sleep(0.1)
|
||||
await _fake_app_bus.publish(
|
||||
_topics.attacker_observation("motor.input_modality"),
|
||||
{"attacker_uuid": attacker_uuid, "primitive": "motor.input_modality",
|
||||
"value": "pasted", "confidence": 0.9},
|
||||
event_type="motor.input_modality",
|
||||
)
|
||||
|
||||
pub_task = asyncio.create_task(_publish_after_snapshot())
|
||||
try:
|
||||
matched, seen = await asyncio.wait_for(
|
||||
_drive_until(
|
||||
gen,
|
||||
# Event name is "observation"; primitive rides in payload.
|
||||
lambda f: "event: observation" in f
|
||||
and "motor.input_modality" in f,
|
||||
),
|
||||
timeout=5.0,
|
||||
)
|
||||
finally:
|
||||
pub_task.cancel()
|
||||
try:
|
||||
await pub_task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
await gen.aclose()
|
||||
assert matched, f"live frame not found: {seen}"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_drops_observation_for_other_attackers(auth_token, _fake_app_bus):
|
||||
"""An event with a different attacker_uuid must NOT be forwarded.
|
||||
|
||||
We can't wait forever for a nothing — so we publish ONE matching
|
||||
event first, drive past it, then publish a non-matching event,
|
||||
then publish another matching event, and assert the
|
||||
middle-non-matching frame never appeared between the two matches.
|
||||
"""
|
||||
attacker_uuid = await _seed_attacker(ip="10.0.0.7")
|
||||
|
||||
from decnet.web.router.attackers import api_events as _ev
|
||||
|
||||
class _FakeRequest:
|
||||
async def is_disconnected(self) -> bool:
|
||||
return False
|
||||
|
||||
response = await _ev.api_attacker_events(
|
||||
attacker_uuid=attacker_uuid,
|
||||
request=_FakeRequest(), # type: ignore[arg-type]
|
||||
user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"},
|
||||
)
|
||||
gen = response.body_iterator
|
||||
|
||||
async def _publish_sequence() -> None:
|
||||
await asyncio.sleep(0.1)
|
||||
# Non-matching event — must be dropped by the per-attacker filter.
|
||||
await _fake_app_bus.publish(
|
||||
_topics.attacker_observation("motor.input_modality"),
|
||||
{"attacker_uuid": _OTHER_UUID, "primitive": "motor.input_modality",
|
||||
"value": "should-not-appear"},
|
||||
event_type="motor.input_modality",
|
||||
)
|
||||
await asyncio.sleep(0.05)
|
||||
# Matching event — drives the loop forward, so we know the
|
||||
# non-matching one had its chance.
|
||||
await _fake_app_bus.publish(
|
||||
_topics.attacker_observation("cognitive.cognitive_load"),
|
||||
{"attacker_uuid": attacker_uuid, "primitive": "cognitive.cognitive_load",
|
||||
"value": "high"},
|
||||
event_type="cognitive.cognitive_load",
|
||||
)
|
||||
|
||||
pub_task = asyncio.create_task(_publish_sequence())
|
||||
try:
|
||||
matched, seen = await asyncio.wait_for(
|
||||
_drive_until(
|
||||
gen,
|
||||
lambda f: "event: observation" in f
|
||||
and "cognitive.cognitive_load" in f,
|
||||
),
|
||||
timeout=5.0,
|
||||
)
|
||||
finally:
|
||||
pub_task.cancel()
|
||||
try:
|
||||
await pub_task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
await gen.aclose()
|
||||
assert matched, f"matching frame missing: {seen}"
|
||||
# The dropped event's distinguishing string must never appear.
|
||||
assert not any("should-not-appear" in f for f in seen), (
|
||||
f"per-attacker filter leaked: {seen}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_includes_fingerprint_rotated_for_this_attacker(
|
||||
auth_token, _fake_app_bus,
|
||||
):
|
||||
attacker_uuid = await _seed_attacker(ip="10.0.0.8")
|
||||
|
||||
from decnet.web.router.attackers import api_events as _ev
|
||||
|
||||
class _FakeRequest:
|
||||
async def is_disconnected(self) -> bool:
|
||||
return False
|
||||
|
||||
response = await _ev.api_attacker_events(
|
||||
attacker_uuid=attacker_uuid,
|
||||
request=_FakeRequest(), # type: ignore[arg-type]
|
||||
user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"},
|
||||
)
|
||||
gen = response.body_iterator
|
||||
|
||||
async def _publish() -> None:
|
||||
await asyncio.sleep(0.1)
|
||||
await _fake_app_bus.publish(
|
||||
_topics.attacker(_topics.ATTACKER_FINGERPRINT_ROTATED),
|
||||
{"attacker_uuid": attacker_uuid, "old_fp": "a", "new_fp": "b"},
|
||||
event_type=_topics.ATTACKER_FINGERPRINT_ROTATED,
|
||||
)
|
||||
|
||||
pub_task = asyncio.create_task(_publish())
|
||||
try:
|
||||
matched, seen = await asyncio.wait_for(
|
||||
_drive_until(gen, lambda f: "event: fingerprint.rotated" in f),
|
||||
timeout=5.0,
|
||||
)
|
||||
finally:
|
||||
pub_task.cancel()
|
||||
try:
|
||||
await pub_task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
await gen.aclose()
|
||||
assert matched
|
||||
|
||||
|
||||
# ── _sse_name_for unit ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_sse_name_for_observation_collapses_to_single_event_name():
|
||||
"""Per-primitive events all share the SSE event name 'observation';
|
||||
the primitive rides in payload."""
|
||||
from decnet.web.router.attackers.api_events import _sse_name_for
|
||||
assert (
|
||||
_sse_name_for("attacker.observation.motor.input_modality")
|
||||
== "observation"
|
||||
)
|
||||
assert (
|
||||
_sse_name_for("attacker.observation.motor.shell_mastery.tab_completion")
|
||||
== "observation"
|
||||
)
|
||||
|
||||
|
||||
def test_sse_name_for_fingerprint_rotated():
|
||||
from decnet.web.router.attackers.api_events import _sse_name_for
|
||||
assert _sse_name_for("attacker.fingerprint_rotated") == "fingerprint.rotated"
|
||||
|
||||
|
||||
def test_sse_name_for_scored():
|
||||
from decnet.web.router.attackers.api_events import _sse_name_for
|
||||
assert _sse_name_for("attacker.scored") == "attacker.scored"
|
||||
|
||||
|
||||
def test_sse_name_for_unknown_passes_through():
|
||||
from decnet.web.router.attackers.api_events import _sse_name_for
|
||||
assert _sse_name_for("attacker.something_new") == "attacker.something_new"
|
||||
Reference in New Issue
Block a user