GET /api/v1/attackers/{uuid}/attribution
Returns the merger output for an attacker's identity:
{
"identity_uuid": "abc..." | null,
"primitives": [
{primitive, current_value, state, confidence,
observation_count, last_change_ts, last_observation_ts},
...
]
}
Pre-attribution-worker: identity_uuid=null, primitives=[]. Surfacing
identity_uuid keeps the cross-attacker rollup story visible to the
frontend ahead of v1's clusterer landing.
api_events SSE relay also subscribes to attribution.> and forwards
to the AttackerDetail page filtered on payload.identity_uuid (the
identity is resolved at stream open from the URL's attacker_uuid;
attribution payloads are identity-keyed, not attacker-keyed). New
SSE event names: attribution.state_changed,
attribution.multi_actor_suspected.
Frontend (AttackerDetail.tsx badge rendering, useAttackerStream
consumer) deferred — there's already WIP on AttackerDetail.tsx in
the working tree; merging the badge logic is a separate commit
once that lands.
Tests: 4 endpoint scenarios — 401 unauth, 404 unknown attacker,
200 empty (no stub), 200 with primitive-ordered rows.
234 lines
9.3 KiB
Python
234 lines
9.3 KiB
Python
"""SSE stream of per-attacker behavioural events — one connection per
|
|
AttackerDetail page.
|
|
|
|
Subscribes to ``attacker.observation.>``,
|
|
``attacker.fingerprint_rotated`` and ``attacker.scored`` on the
|
|
:class:`~decnet.bus.base.BaseBus` for the duration of the request and
|
|
forwards each event whose payload's ``attacker_uuid`` matches this
|
|
stream's attacker. Emits a one-shot snapshot on connect (latest
|
|
observation per primitive) so the panel hydrates immediately.
|
|
|
|
Authorization mirrors :mod:`decnet.web.router.topology.api_events` —
|
|
JWT via the ``?token=`` query parameter (EventSource can't set
|
|
arbitrary headers) + ``require_stream_viewer`` role gate. The 404
|
|
fires after auth so an existence probe can't leak an attacker UUID
|
|
to an unauthenticated caller.
|
|
|
|
Per-attacker filter is keyed on the DECNET-side ``attacker_uuid``
|
|
denorm the profiler worker stamps onto every published payload (see
|
|
``BEHAVE-INTEGRATION.md`` §339-366 deviation note + Phase 5
|
|
amendment in ``decnet/profiler/behave_shell/_handler.py``).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import AsyncGenerator
|
|
|
|
import orjson
|
|
from fastapi import APIRouter, Depends, Request
|
|
from fastapi.responses import StreamingResponse
|
|
|
|
from decnet.bus import topics as _topics
|
|
from decnet.bus.app import get_app_bus
|
|
from decnet.logging import get_logger
|
|
from decnet.telemetry import traced as _traced
|
|
from decnet.web.dependencies import repo, require_stream_viewer
|
|
from decnet.web.sse_limits import sse_connection_slot
|
|
|
|
from ._guards import get_attacker_or_404
|
|
|
|
log = get_logger("api.attackers.events")
|
|
|
|
router = APIRouter()
|
|
|
|
_KEEPALIVE_SECS = 15.0
|
|
_QUEUE_MAX = 256
|
|
|
|
|
|
def _format_sse(event_name: str, data: dict) -> str:
|
|
"""Build one SSE frame: ``event: <name>\\ndata: <json>\\n\\n``."""
|
|
return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n"
|
|
|
|
|
|
def _sse_name_for(topic: str) -> str:
|
|
"""Derive an SSE ``event:`` name from a bus topic.
|
|
|
|
``attacker.observation.<primitive>`` → ``observation``
|
|
(the primitive ride-along is in the payload, not the event name —
|
|
a per-primitive event name would force the frontend hook to
|
|
register 37+ listeners or know the registry. Single event name
|
|
keeps the EventSource handler shape uniform.)
|
|
|
|
``attacker.fingerprint_rotated`` → ``fingerprint.rotated``
|
|
``attacker.scored`` → ``attacker.scored``
|
|
``attribution.profile.state_changed`` → ``attribution.state_changed``
|
|
``attribution.profile.multi_actor_suspected`` → ``attribution.multi_actor_suspected``
|
|
|
|
Anything else passes through unchanged so a future ``attacker.*``
|
|
family doesn't silently collapse onto a generic bucket.
|
|
"""
|
|
if topic.startswith("attacker.observation."):
|
|
return "observation"
|
|
if topic == f"{_topics.ATTACKER}.{_topics.ATTACKER_FINGERPRINT_ROTATED}":
|
|
return "fingerprint.rotated"
|
|
if topic == f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}":
|
|
return "attacker.scored"
|
|
if topic == _topics.attribution(_topics.ATTRIBUTION_PROFILE_STATE_CHANGED):
|
|
return "attribution.state_changed"
|
|
if topic == _topics.attribution(
|
|
_topics.ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED,
|
|
):
|
|
return "attribution.multi_actor_suspected"
|
|
return topic
|
|
|
|
|
|
@router.get(
|
|
"/attackers/{attacker_uuid}/events",
|
|
tags=["Attacker Profiles"],
|
|
responses={
|
|
200: {
|
|
"content": {"text/event-stream": {}},
|
|
"description": "SSE stream of behavioural events for one attacker",
|
|
},
|
|
401: {"description": "Could not validate credentials"},
|
|
403: {"description": "Insufficient permissions"},
|
|
404: {"description": "Attacker not found"},
|
|
429: {"description": "Per-user SSE connection cap reached"},
|
|
},
|
|
)
|
|
@_traced("api.attackers.events")
|
|
async def api_attacker_events(
|
|
attacker_uuid: str,
|
|
request: Request,
|
|
user: dict = Depends(require_stream_viewer),
|
|
) -> StreamingResponse:
|
|
# 404-after-auth so an existence probe can't enumerate attacker UUIDs.
|
|
attacker = await get_attacker_or_404(attacker_uuid)
|
|
# Pre-resolve the identity_uuid so attribution.profile.* events
|
|
# (keyed on identity_uuid, not attacker_uuid) can be filtered
|
|
# without a per-event repo lookup. None until the attribution
|
|
# worker stamps a stub on first observation.
|
|
identity_uuid = attacker.get("identity_id") if isinstance(attacker, dict) else None
|
|
|
|
snapshot_per_primitive = await repo.latest_observation_per_primitive(
|
|
attacker_uuid,
|
|
)
|
|
snapshot_observations = [
|
|
{"primitive": primitive, **payload}
|
|
for primitive, payload in sorted(snapshot_per_primitive.items())
|
|
]
|
|
|
|
async def generator() -> AsyncGenerator[str, None]:
|
|
async with sse_connection_slot(user["uuid"]):
|
|
# Flush headers immediately so the browser's EventSource
|
|
# sees a live connection before the first real event.
|
|
yield ": keepalive\n\n"
|
|
|
|
yield _format_sse("snapshot", {
|
|
"attacker_uuid": attacker_uuid,
|
|
"observations": snapshot_observations,
|
|
})
|
|
|
|
bus = await get_app_bus()
|
|
if bus is None:
|
|
# Bus disabled (NullBus) or unreachable. The snapshot
|
|
# is still useful; idle on keepalives so the client
|
|
# stays connected and re-polls on its own timers.
|
|
while not await request.is_disconnected():
|
|
try:
|
|
await asyncio.sleep(_KEEPALIVE_SECS)
|
|
except asyncio.CancelledError:
|
|
break
|
|
yield ": keepalive\n\n"
|
|
return
|
|
|
|
# Five subscriptions, merged through one queue. Filter on
|
|
# payload["attacker_uuid"] for attacker.* events; on
|
|
# payload["identity_uuid"] (resolved at stream open) for
|
|
# attribution.profile.* events.
|
|
obs_sub = bus.subscribe(f"{_topics.ATTACKER}.{_topics.ATTACKER_OBSERVATION_PREFIX}.>")
|
|
fp_sub = bus.subscribe(
|
|
f"{_topics.ATTACKER}.{_topics.ATTACKER_FINGERPRINT_ROTATED}",
|
|
)
|
|
score_sub = bus.subscribe(
|
|
f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}",
|
|
)
|
|
attribution_sub = bus.subscribe(f"{_topics.ATTRIBUTION}.>")
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=_QUEUE_MAX)
|
|
|
|
async def _pump_by_attacker(sub) -> None:
|
|
async with sub:
|
|
async for ev in sub:
|
|
payload = ev.payload or {}
|
|
if payload.get("attacker_uuid") != attacker_uuid:
|
|
continue
|
|
try:
|
|
queue.put_nowait(ev)
|
|
except asyncio.QueueFull:
|
|
# Drop on overflow rather than backpressuring
|
|
# the bus; the snapshot + reconnect path will
|
|
# cover any gap a slow consumer creates.
|
|
pass
|
|
|
|
async def _pump_by_identity(sub) -> None:
|
|
async with sub:
|
|
async for ev in sub:
|
|
payload = ev.payload or {}
|
|
# If the attacker has no stub identity yet,
|
|
# there's nothing to filter on — skip silently.
|
|
if identity_uuid is None:
|
|
continue
|
|
if payload.get("identity_uuid") != identity_uuid:
|
|
continue
|
|
try:
|
|
queue.put_nowait(ev)
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
tasks = [
|
|
asyncio.create_task(_pump_by_attacker(obs_sub)),
|
|
asyncio.create_task(_pump_by_attacker(fp_sub)),
|
|
asyncio.create_task(_pump_by_attacker(score_sub)),
|
|
asyncio.create_task(_pump_by_identity(attribution_sub)),
|
|
]
|
|
try:
|
|
while True:
|
|
if await request.is_disconnected():
|
|
break
|
|
try:
|
|
event = await asyncio.wait_for(
|
|
queue.get(), timeout=_KEEPALIVE_SECS,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
yield ": keepalive\n\n"
|
|
continue
|
|
yield _format_sse(
|
|
_sse_name_for(event.topic),
|
|
{
|
|
"topic": event.topic,
|
|
"type": event.type,
|
|
"ts": event.ts,
|
|
"payload": event.payload,
|
|
},
|
|
)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception:
|
|
log.exception(
|
|
"attacker events stream crashed attacker_uuid=%s",
|
|
attacker_uuid,
|
|
)
|
|
yield _format_sse("error", {"message": "Stream interrupted"})
|
|
finally:
|
|
for t in tasks:
|
|
t.cancel()
|
|
|
|
return StreamingResponse(
|
|
generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|