feat(web): read-only campaigns API + SSE + frontend

API: /api/v1/campaigns (paginated list), /api/v1/campaigns/{uuid}
(soft-merge chain follow), /api/v1/campaigns/{uuid}/identities
(member identities), and /api/v1/campaigns/events (SSE under
campaign.> + JWT-via-?token=, snapshot-on-connect). Mirror of the
identity router; same auth, same shape, same OpenAPI tags pattern.

Frontend: CampaignDetail.tsx page (same visual vocabulary as
IdentityDetail), useCampaignStream hook (mirror of
useIdentityStream), /campaigns/:id route, IdentityDetail's
CAMPAIGN badge becomes clickable and navigates to the campaign.
useIdentityStream now listens for identity.campaign.assigned so
the badge appears live without a manual refresh.
This commit is contained in:
2026-04-26 09:20:17 -04:00
parent 75af00c9c8
commit d531cea536
14 changed files with 1035 additions and 3 deletions

View File

@@ -25,6 +25,10 @@ from .identities.api_list_identities import router as identities_list_router
from .identities.api_get_identity_detail import router as identity_detail_router
from .identities.api_list_identity_observations import router as identity_observations_router
from .identities.api_events import router as identity_events_router
from .campaigns.api_list_campaigns import router as campaigns_list_router
from .campaigns.api_get_campaign_detail import router as campaign_detail_router
from .campaigns.api_list_campaign_identities import router as campaign_identities_router
from .campaigns.api_events import router as campaign_events_router
from .transcripts import transcripts_router
from .config.api_get_config import router as config_get_router
from .config.api_update_config import router as config_update_router
@@ -96,6 +100,10 @@ api_router.include_router(identities_list_router)
api_router.include_router(identity_detail_router)
api_router.include_router(identity_observations_router)
api_router.include_router(identity_events_router)
api_router.include_router(campaigns_list_router)
api_router.include_router(campaign_detail_router)
api_router.include_router(campaign_identities_router)
api_router.include_router(campaign_events_router)
# Observability
api_router.include_router(stats_router)

View File

View File

@@ -0,0 +1,123 @@
"""SSE stream of campaign events — one connection per viewer.
Subscribes to ``campaign.>`` on the bus for the duration of the
request and forwards each matching event as a Server-Sent Event.
Emits a one-shot snapshot on connect (current paginated campaign
list).
Mirror of :mod:`decnet.web.router.identities.api_events`. Auth: JWT
via ``?token=`` query param + ``require_stream_viewer`` role.
"""
from __future__ import annotations
import asyncio
from typing import AsyncGenerator
import orjson
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from decnet.bus import topics as _topics
from decnet.bus.app import get_app_bus
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_stream_viewer
from decnet.web.sse_limits import sse_connection_slot
log = get_logger("api.campaigns.events")
router = APIRouter()
_KEEPALIVE_SECS = 15.0
_SNAPSHOT_LIMIT = 50
def _format_sse(event_name: str, data: dict) -> str:
return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n"
@router.get(
"/campaigns/events",
tags=["Campaign Clustering"],
responses={
200: {
"content": {"text/event-stream": {}},
"description": "SSE stream of campaign-clustering events",
},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
429: {"description": "Per-user SSE connection cap reached"},
},
)
@_traced("api.campaigns.events")
async def api_campaigns_events(
request: Request,
user: dict = Depends(require_stream_viewer),
) -> StreamingResponse:
# Event types: snapshot, formed, identity.assigned, merged, unmerged.
snapshot = await repo.list_campaigns(limit=_SNAPSHOT_LIMIT, offset=0)
async def generator() -> AsyncGenerator[str, None]:
async with sse_connection_slot(user["uuid"]):
yield ": keepalive\n\n"
yield _format_sse("snapshot", {"campaigns": snapshot})
bus = await get_app_bus()
if bus is None:
while not await request.is_disconnected():
try:
await asyncio.sleep(_KEEPALIVE_SECS)
except asyncio.CancelledError:
break
yield ": keepalive\n\n"
return
sub = bus.subscribe(f"{_topics.CAMPAIGN}.>")
try:
async with sub:
sub_iter = sub.__aiter__()
while True:
if await request.is_disconnected():
break
next_task = asyncio.ensure_future(sub_iter.__anext__())
try:
event = await asyncio.wait_for(
next_task, timeout=_KEEPALIVE_SECS,
)
except asyncio.TimeoutError:
next_task.cancel()
yield ": keepalive\n\n"
continue
except StopAsyncIteration:
break
yield _format_sse(
_sse_name_for(event.topic),
{
"topic": event.topic,
"type": event.type,
"ts": event.ts,
"payload": event.payload,
},
)
except asyncio.CancelledError:
pass
except Exception:
log.exception("campaign events stream crashed")
yield _format_sse("error", {"message": "Stream interrupted"})
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
def _sse_name_for(topic: str) -> str:
"""``campaign.formed`` → ``formed``;
``campaign.identity.assigned`` → ``identity.assigned``."""
if topic.startswith(f"{_topics.CAMPAIGN}."):
return topic[len(_topics.CAMPAIGN) + 1:]
return topic

View File

@@ -0,0 +1,40 @@
"""GET /api/v1/campaigns/{uuid} — single campaign row.
Soft-merge handling: if the requested UUID has merged_into_uuid set,
the repository follows the chain and returns the winner. Mirror of
:mod:`decnet.web.router.identities.api_get_identity_detail`.
"""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/campaigns/{uuid}",
tags=["Campaign Clustering"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Campaign not found"},
},
)
@_traced("api.get_campaign_detail")
async def get_campaign_detail(
uuid: str,
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
campaign = await repo.get_campaign_by_uuid(uuid)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
# Cheap aggregate the CampaignDetail page surfaces — counted off
# the FK rather than the denormalized identity_count so the answer
# is always live.
campaign["identity_count_live"] = await repo.count_identities_for_campaign(
campaign["uuid"]
)
return campaign

View File

@@ -0,0 +1,41 @@
"""GET /api/v1/campaigns/{uuid}/identities — identities for a campaign.
Returns the ``AttackerIdentity`` rows whose ``campaign_id`` FK points
at this campaign. Mirror of
:mod:`decnet.web.router.identities.api_list_identity_observations`.
"""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/campaigns/{uuid}/identities",
tags=["Campaign Clustering"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Campaign not found"},
},
)
@_traced("api.list_campaign_identities")
async def list_campaign_identities(
uuid: str,
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0, le=2147483647),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
campaign = await repo.get_campaign_by_uuid(uuid)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
canonical_uuid = campaign["uuid"]
data = await repo.list_identities_for_campaign(
canonical_uuid, limit=limit, offset=offset
)
total = await repo.count_identities_for_campaign(canonical_uuid)
return {"total": total, "limit": limit, "offset": offset, "data": data}

View File

@@ -0,0 +1,35 @@
"""GET /api/v1/campaigns — paginated list of campaigns.
Mirror of :mod:`decnet.web.router.identities.api_list_identities` for
the campaign layer. Returns an empty list while the campaign clusterer
hasn't run yet (the campaigns table ships empty).
"""
from typing import Any
from fastapi import APIRouter, Depends, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/campaigns",
tags=["Campaign Clustering"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
422: {"description": "Validation error"},
},
)
@_traced("api.list_campaigns")
async def list_campaigns(
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0, le=2147483647),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
"""Paginated campaign list, newest-updated first."""
data = await repo.list_campaigns(limit=limit, offset=offset)
total = await repo.count_campaigns()
return {"total": total, "limit": limit, "offset": offset, "data": data}