diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 54660b92..f61789d3 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -25,6 +25,10 @@ from .identities.api_list_identities import router as identities_list_router from .identities.api_get_identity_detail import router as identity_detail_router from .identities.api_list_identity_observations import router as identity_observations_router from .identities.api_events import router as identity_events_router +from .campaigns.api_list_campaigns import router as campaigns_list_router +from .campaigns.api_get_campaign_detail import router as campaign_detail_router +from .campaigns.api_list_campaign_identities import router as campaign_identities_router +from .campaigns.api_events import router as campaign_events_router from .transcripts import transcripts_router from .config.api_get_config import router as config_get_router from .config.api_update_config import router as config_update_router @@ -96,6 +100,10 @@ api_router.include_router(identities_list_router) api_router.include_router(identity_detail_router) api_router.include_router(identity_observations_router) api_router.include_router(identity_events_router) +api_router.include_router(campaigns_list_router) +api_router.include_router(campaign_detail_router) +api_router.include_router(campaign_identities_router) +api_router.include_router(campaign_events_router) # Observability api_router.include_router(stats_router) diff --git a/decnet/web/router/campaigns/__init__.py b/decnet/web/router/campaigns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/decnet/web/router/campaigns/api_events.py b/decnet/web/router/campaigns/api_events.py new file mode 100644 index 00000000..13b138be --- /dev/null +++ b/decnet/web/router/campaigns/api_events.py @@ -0,0 +1,123 @@ +"""SSE stream of campaign events — one connection per viewer. + +Subscribes to ``campaign.>`` on the bus for the duration of the +request and forwards each matching event as a Server-Sent Event. +Emits a one-shot snapshot on connect (current paginated campaign +list). + +Mirror of :mod:`decnet.web.router.identities.api_events`. Auth: JWT +via ``?token=`` query param + ``require_stream_viewer`` role. +""" +from __future__ import annotations + +import asyncio +from typing import AsyncGenerator + +import orjson +from fastapi import APIRouter, Depends, Request +from fastapi.responses import StreamingResponse + +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_stream_viewer +from decnet.web.sse_limits import sse_connection_slot + +log = get_logger("api.campaigns.events") + +router = APIRouter() + +_KEEPALIVE_SECS = 15.0 +_SNAPSHOT_LIMIT = 50 + + +def _format_sse(event_name: str, data: dict) -> str: + return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n" + + +@router.get( + "/campaigns/events", + tags=["Campaign Clustering"], + responses={ + 200: { + "content": {"text/event-stream": {}}, + "description": "SSE stream of campaign-clustering events", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 429: {"description": "Per-user SSE connection cap reached"}, + }, +) +@_traced("api.campaigns.events") +async def api_campaigns_events( + request: Request, + user: dict = Depends(require_stream_viewer), +) -> StreamingResponse: + # Event types: snapshot, formed, identity.assigned, merged, unmerged. + snapshot = await repo.list_campaigns(limit=_SNAPSHOT_LIMIT, offset=0) + + async def generator() -> AsyncGenerator[str, None]: + async with sse_connection_slot(user["uuid"]): + yield ": keepalive\n\n" + yield _format_sse("snapshot", {"campaigns": snapshot}) + + bus = await get_app_bus() + if bus is None: + while not await request.is_disconnected(): + try: + await asyncio.sleep(_KEEPALIVE_SECS) + except asyncio.CancelledError: + break + yield ": keepalive\n\n" + return + + sub = bus.subscribe(f"{_topics.CAMPAIGN}.>") + try: + async with sub: + sub_iter = sub.__aiter__() + while True: + if await request.is_disconnected(): + break + next_task = asyncio.ensure_future(sub_iter.__anext__()) + try: + event = await asyncio.wait_for( + next_task, timeout=_KEEPALIVE_SECS, + ) + except asyncio.TimeoutError: + next_task.cancel() + yield ": keepalive\n\n" + continue + except StopAsyncIteration: + break + yield _format_sse( + _sse_name_for(event.topic), + { + "topic": event.topic, + "type": event.type, + "ts": event.ts, + "payload": event.payload, + }, + ) + except asyncio.CancelledError: + pass + except Exception: + log.exception("campaign events stream crashed") + yield _format_sse("error", {"message": "Stream interrupted"}) + + return StreamingResponse( + generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +def _sse_name_for(topic: str) -> str: + """``campaign.formed`` → ``formed``; + ``campaign.identity.assigned`` → ``identity.assigned``.""" + if topic.startswith(f"{_topics.CAMPAIGN}."): + return topic[len(_topics.CAMPAIGN) + 1:] + return topic diff --git a/decnet/web/router/campaigns/api_get_campaign_detail.py b/decnet/web/router/campaigns/api_get_campaign_detail.py new file mode 100644 index 00000000..ac2389a7 --- /dev/null +++ b/decnet/web/router/campaigns/api_get_campaign_detail.py @@ -0,0 +1,40 @@ +"""GET /api/v1/campaigns/{uuid} — single campaign row. + +Soft-merge handling: if the requested UUID has merged_into_uuid set, +the repository follows the chain and returns the winner. Mirror of +:mod:`decnet.web.router.identities.api_get_identity_detail`. +""" +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/campaigns/{uuid}", + tags=["Campaign Clustering"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Campaign not found"}, + }, +) +@_traced("api.get_campaign_detail") +async def get_campaign_detail( + uuid: str, + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + campaign = await repo.get_campaign_by_uuid(uuid) + if not campaign: + raise HTTPException(status_code=404, detail="Campaign not found") + # Cheap aggregate the CampaignDetail page surfaces — counted off + # the FK rather than the denormalized identity_count so the answer + # is always live. + campaign["identity_count_live"] = await repo.count_identities_for_campaign( + campaign["uuid"] + ) + return campaign diff --git a/decnet/web/router/campaigns/api_list_campaign_identities.py b/decnet/web/router/campaigns/api_list_campaign_identities.py new file mode 100644 index 00000000..df33598c --- /dev/null +++ b/decnet/web/router/campaigns/api_list_campaign_identities.py @@ -0,0 +1,41 @@ +"""GET /api/v1/campaigns/{uuid}/identities — identities for a campaign. + +Returns the ``AttackerIdentity`` rows whose ``campaign_id`` FK points +at this campaign. Mirror of +:mod:`decnet.web.router.identities.api_list_identity_observations`. +""" +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Query + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/campaigns/{uuid}/identities", + tags=["Campaign Clustering"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Campaign not found"}, + }, +) +@_traced("api.list_campaign_identities") +async def list_campaign_identities( + uuid: str, + limit: int = Query(50, ge=1, le=1000), + offset: int = Query(0, ge=0, le=2147483647), + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + campaign = await repo.get_campaign_by_uuid(uuid) + if not campaign: + raise HTTPException(status_code=404, detail="Campaign not found") + canonical_uuid = campaign["uuid"] + data = await repo.list_identities_for_campaign( + canonical_uuid, limit=limit, offset=offset + ) + total = await repo.count_identities_for_campaign(canonical_uuid) + return {"total": total, "limit": limit, "offset": offset, "data": data} diff --git a/decnet/web/router/campaigns/api_list_campaigns.py b/decnet/web/router/campaigns/api_list_campaigns.py new file mode 100644 index 00000000..c7ecab84 --- /dev/null +++ b/decnet/web/router/campaigns/api_list_campaigns.py @@ -0,0 +1,35 @@ +"""GET /api/v1/campaigns — paginated list of campaigns. + +Mirror of :mod:`decnet.web.router.identities.api_list_identities` for +the campaign layer. Returns an empty list while the campaign clusterer +hasn't run yet (the campaigns table ships empty). +""" +from typing import Any + +from fastapi import APIRouter, Depends, Query + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/campaigns", + tags=["Campaign Clustering"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 422: {"description": "Validation error"}, + }, +) +@_traced("api.list_campaigns") +async def list_campaigns( + limit: int = Query(50, ge=1, le=1000), + offset: int = Query(0, ge=0, le=2147483647), + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + """Paginated campaign list, newest-updated first.""" + data = await repo.list_campaigns(limit=limit, offset=offset) + total = await repo.count_campaigns() + return {"total": total, "limit": limit, "offset": offset, "data": data} diff --git a/decnet_web/src/App.tsx b/decnet_web/src/App.tsx index a56ad61e..cbf63554 100644 --- a/decnet_web/src/App.tsx +++ b/decnet_web/src/App.tsx @@ -20,6 +20,7 @@ const Webhooks = lazy(() => import('./components/Webhooks')); const Attackers = lazy(() => import('./components/Attackers')); const AttackerDetail = lazy(() => import('./components/AttackerDetail')); const IdentityDetail = lazy(() => import('./components/IdentityDetail')); +const CampaignDetail = lazy(() => import('./components/CampaignDetail')); const Config = lazy(() => import('./components/Config')); const Bounty = lazy(() => import('./components/Bounty')); const Credentials = lazy(() => import('./components/Credentials')); @@ -115,6 +116,7 @@ const AuthedShell: React.FC = ({ onLogout, onSearch, searchQue } /> } /> } /> + } /> } /> } /> } /> diff --git a/decnet_web/src/components/CampaignDetail.tsx b/decnet_web/src/components/CampaignDetail.tsx new file mode 100644 index 00000000..19c211a1 --- /dev/null +++ b/decnet_web/src/components/CampaignDetail.tsx @@ -0,0 +1,315 @@ +import React, { useEffect, useState } from 'react'; +import { useParams, useNavigate } from 'react-router-dom'; +import { ArrowLeft, Crosshair, Fingerprint, Globe, Radio } from '../icons'; +import api from '../utils/api'; +import { useCampaignStream } from './useCampaignStream'; +import './Dashboard.css'; + +/* + * CampaignDetail — read-only view of a campaign-clustered operation. + * + * The layer above identity resolution. Member identities are visible + * here as rows that link back to IdentityDetail. Same visual vocabulary + * as IdentityDetail by design — the substrate (soft merges, schema + * version, JSON fingerprint summaries, live SSE updates) is identical + * one layer up. + */ + +interface CampaignData { + uuid: string; + schema_version: number; + first_seen_at: string | null; + last_seen_at: string | null; + created_at: string; + updated_at: string; + confidence: number | null; + identity_count: number; + identity_count_live: number; + ja3_hashes: string | null; + hassh_hashes: string | null; + payload_simhashes: string | null; + c2_endpoints: string | null; + merged_into_uuid: string | null; + notes: string | null; +} + +interface IdentityRow { + uuid: string; + first_seen_at: string | null; + last_seen_at: string | null; + observation_count: number; + campaign_id: string | null; + merged_into_uuid: string | null; +} + +const safeParseJsonList = (raw: string | null): string[] => { + if (!raw) return []; + try { + const parsed = JSON.parse(raw); + return Array.isArray(parsed) ? parsed : []; + } catch { + return []; + } +}; + +const CampaignDetail: React.FC = () => { + const { id } = useParams<{ id: string }>(); + const navigate = useNavigate(); + const [campaign, setCampaign] = useState(null); + const [identities, setIdentities] = useState([]); + const [identityTotal, setIdentityTotal] = useState(0); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + useEffect(() => { + if (!id) return; + const fetchCampaign = async () => { + setLoading(true); + try { + const res = await api.get(`/campaigns/${id}`); + setCampaign(res.data); + setError(null); + } catch (err: any) { + if (err.response?.status === 404) { + setError('CAMPAIGN NOT FOUND'); + } else { + setError('FAILED TO LOAD CAMPAIGN'); + } + } finally { + setLoading(false); + } + }; + fetchCampaign(); + }, [id]); + + useEffect(() => { + if (!id) return; + const fetchIdentities = async () => { + try { + const res = await api.get(`/campaigns/${id}/identities?limit=50&offset=0`); + setIdentities(res.data.data ?? []); + setIdentityTotal(res.data.total ?? 0); + } catch { + setIdentities([]); + setIdentityTotal(0); + } + }; + fetchIdentities(); + }, [id]); + + // Live updates: refetch when a campaign event references this uuid. + useCampaignStream({ + enabled: !!id, + onEvent: (ev) => { + if (!id) return; + const payload = ev.payload || {}; + const refs = new Set(); + const addUuid = (v: unknown) => { + if (typeof v === 'string') refs.add(v); + }; + addUuid(payload.campaign_uuid); + addUuid(payload.winner_uuid); + addUuid(payload.loser_uuid); + addUuid(payload.resurrected_uuid); + addUuid(payload.former_winner_uuid); + + if (refs.has(id)) { + api.get(`/campaigns/${id}`) + .then((res) => setCampaign(res.data)) + .catch(() => {}); + api.get(`/campaigns/${id}/identities?limit=50&offset=0`) + .then((res) => { + setIdentities(res.data.data ?? []); + setIdentityTotal(res.data.total ?? 0); + }) + .catch(() => {}); + } + }, + }); + + if (loading) { + return ( +
+
+ LOADING CAMPAIGN… +
+
+ ); + } + + if (error || !campaign) { + return ( +
+ +
+ {error || 'CAMPAIGN NOT FOUND'} +
+
+ ); + } + + const ja3List = safeParseJsonList(campaign.ja3_hashes); + const hasshList = safeParseJsonList(campaign.hassh_hashes); + const payloadList = safeParseJsonList(campaign.payload_simhashes); + const c2List = safeParseJsonList(campaign.c2_endpoints); + + return ( +
+ + +
+ +

+ CAMPAIGN · {campaign.uuid} +

+ {campaign.merged_into_uuid && ( + navigate(`/campaigns/${campaign.merged_into_uuid}`)} + > + MERGED INTO {campaign.merged_into_uuid.slice(0, 8)} + + )} +
+ +
+
+
{campaign.identity_count_live}
+
IDENTITIES
+
+
+
{ja3List.length}
+
JA3
+
+
+
{hasshList.length}
+
HASSH
+
+
+
{payloadList.length}
+
PAYLOADS
+
+
+
{c2List.length}
+
C2 ENDPOINTS
+
+
+ + {(campaign.confidence !== null || campaign.schema_version > 1) && ( +
+ {campaign.confidence !== null && ( + + CONFIDENCE · {campaign.confidence.toFixed(3)} + + )} + + SCHEMA · v{campaign.schema_version} + +
+ )} + + {ja3List.length > 0 && ( + } label="JA3" items={ja3List} /> + )} + {hasshList.length > 0 && ( + } label="HASSH" items={hasshList} /> + )} + {c2List.length > 0 && ( + } label="C2 ENDPOINTS" items={c2List} /> + )} + +
+
+ +

+ IDENTITIES · {identityTotal} +

+
+ {identities.length === 0 ? ( +
+ No identities linked yet. The campaign clusterer assigns + identities asynchronously; they should appear shortly after + the next clusterer pass. +
+ ) : ( + + + + + + + + + + + {identities.map((ident) => ( + navigate(`/identities/${ident.uuid}`)} + > + + + + + + ))} + +
IDENTITYFIRST SEENLAST SEENOBSERVATIONS
{ident.uuid.slice(0, 12)}…{ident.first_seen_at ?? '—'}{ident.last_seen_at ?? '—'}{ident.observation_count}
+ )} +
+ + {campaign.notes && ( +
+
+ ANALYST NOTES +
+
+ {campaign.notes} +
+
+ )} +
+ ); +}; + +const FingerprintList: React.FC<{ + icon: React.ReactNode; + label: string; + items: string[]; +}> = ({ icon, label, items }) => ( +
+
+ {icon} + + {label} + +
+
+ {items.map((v) => ( + + {v} + + ))} +
+
+); + +export default CampaignDetail; diff --git a/decnet_web/src/components/IdentityDetail.tsx b/decnet_web/src/components/IdentityDetail.tsx index e156261a..f3bd6a88 100644 --- a/decnet_web/src/components/IdentityDetail.tsx +++ b/decnet_web/src/components/IdentityDetail.tsx @@ -184,8 +184,9 @@ const IdentityDetail: React.FC = () => { {identity.campaign_id && ( navigate(`/campaigns/${identity.campaign_id}`)} > CAMPAIGN · {identity.campaign_id.slice(0, 8)} diff --git a/decnet_web/src/components/useCampaignStream.ts b/decnet_web/src/components/useCampaignStream.ts new file mode 100644 index 00000000..c1f01185 --- /dev/null +++ b/decnet_web/src/components/useCampaignStream.ts @@ -0,0 +1,100 @@ +/** + * Campaign-clustering event stream — opens an SSE connection to + * `/campaigns/events` and dispatches typed events to the caller. + * + * Mirror of `useIdentityStream` for the layer above. CampaignDetail + * subscribes to refresh its own row + linked-identity list when + * `campaign.identity.assigned` / `campaign.merged` / `campaign.unmerged` + * fires. + */ +import { useEffect, useRef } from 'react'; + +export type CampaignStreamEventName = + | 'snapshot' + | 'formed' + | 'identity.assigned' + | 'merged' + | 'unmerged'; + +export interface CampaignStreamEvent { + name: CampaignStreamEventName | string; + topic?: string; + type?: string; + ts?: string; + payload: Record; +} + +export interface UseCampaignStreamOptions { + enabled: boolean; + onEvent: (event: CampaignStreamEvent) => void; + onError?: () => void; +} + +const NAMED_EVENTS: CampaignStreamEventName[] = [ + 'snapshot', + 'formed', + 'identity.assigned', + 'merged', + 'unmerged', +]; + +export function useCampaignStream({ + enabled, + onEvent, + onError, +}: UseCampaignStreamOptions): void { + const esRef = useRef(null); + const reconnectRef = useRef | null>(null); + const onEventRef = useRef(onEvent); + const onErrorRef = useRef(onError); + useEffect(() => { onEventRef.current = onEvent; }, [onEvent]); + useEffect(() => { onErrorRef.current = onError; }, [onError]); + + useEffect(() => { + if (!enabled) return; + + const connect = () => { + if (esRef.current) esRef.current.close(); + const token = localStorage.getItem('token') ?? ''; + const baseUrl = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1'; + const url = `${baseUrl}/campaigns/events?token=${encodeURIComponent(token)}`; + + const es = new EventSource(url); + esRef.current = es; + + const dispatch = (name: string) => (event: MessageEvent) => { + try { + const parsed = JSON.parse(event.data) as Partial; + onEventRef.current({ + name, + topic: parsed.topic, + type: parsed.type, + ts: parsed.ts, + payload: (parsed.payload ?? {}) as Record, + }); + } catch (err) { + console.error('useCampaignStream: parse failed', err); + } + }; + + for (const name of NAMED_EVENTS) { + es.addEventListener(name, dispatch(name) as EventListener); + } + + es.onerror = () => { + es.close(); + esRef.current = null; + onErrorRef.current?.(); + reconnectRef.current = setTimeout(connect, 3000); + }; + }; + + connect(); + + return () => { + if (reconnectRef.current) clearTimeout(reconnectRef.current); + if (esRef.current) esRef.current.close(); + esRef.current = null; + }; + }, [enabled]); +} diff --git a/decnet_web/src/components/useIdentityStream.ts b/decnet_web/src/components/useIdentityStream.ts index d424df26..7f9a9d44 100644 --- a/decnet_web/src/components/useIdentityStream.ts +++ b/decnet_web/src/components/useIdentityStream.ts @@ -25,7 +25,8 @@ export type IdentityStreamEventName = | 'formed' | 'observation.linked' | 'merged' - | 'unmerged'; + | 'unmerged' + | 'campaign.assigned'; export interface IdentityStreamEvent { name: IdentityStreamEventName | string; @@ -47,6 +48,7 @@ const NAMED_EVENTS: IdentityStreamEventName[] = [ 'observation.linked', 'merged', 'unmerged', + 'campaign.assigned', ]; export function useIdentityStream({ diff --git a/tests/api/campaigns/__init__.py b/tests/api/campaigns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/api/campaigns/test_events_stream.py b/tests/api/campaigns/test_events_stream.py new file mode 100644 index 00000000..eaf76dbf --- /dev/null +++ b/tests/api/campaigns/test_events_stream.py @@ -0,0 +1,111 @@ +"""SSE events stream — GET /api/v1/campaigns/events. + +Mirror of :mod:`tests.api.identities.test_events_stream`. Drives the +generator directly to dodge the full httpx streaming roundtrip. +""" +from __future__ import annotations + +import asyncio + +import httpx +import pytest + +from decnet.bus import app as _bus_app +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.web.api import app + +_V1 = "/api/v1/campaigns" + + +@pytest.fixture +def _fake_app_bus(monkeypatch): + bus = FakeBus() + + async def _get() -> FakeBus: + if not bus._connected: + await bus.connect() + return bus + + monkeypatch.setattr(_bus_app, "get_app_bus", _get) + from decnet.web.router.campaigns import api_events as _ev + monkeypatch.setattr(_ev, "get_app_bus", _get) + return bus + + +@pytest.mark.anyio +async def test_campaign_events_unauthenticated_401(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get(f"{_V1}/events") + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_campaign_events_emits_snapshot_and_live_event(_fake_app_bus): + """Snapshot on connect + live forwarding under ``campaign.>``.""" + from decnet.web.router.campaigns import api_events as _ev + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + response = await _ev.api_campaigns_events( + request=_FakeRequest(), # type: ignore[arg-type] + user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"}, + ) + gen = response.body_iterator + + def _as_text(frame) -> str: + return frame if isinstance(frame, str) else frame.decode() + + async def _publish_after_snapshot() -> None: + await asyncio.sleep(0.1) + await _fake_app_bus.publish( + _topics.campaign(_topics.CAMPAIGN_FORMED), + {"campaign_uuid": "c-1", "identity_uuids": ["i-1"]}, + event_type=_topics.CAMPAIGN_FORMED, + ) + await asyncio.sleep(0.05) + await _fake_app_bus.publish( + _topics.campaign(_topics.CAMPAIGN_IDENTITY_ASSIGNED), + {"campaign_uuid": "c-1", "identity_uuid": "i-2"}, + event_type=_topics.CAMPAIGN_IDENTITY_ASSIGNED, + ) + + pub_task = asyncio.create_task(_publish_after_snapshot()) + + async def _drive(): + saw = {"snapshot": False, "formed": False, "identity.assigned": False} + for _ in range(8): + frame = _as_text(await gen.__anext__()) + for key in saw: + if f"event: {key}" in frame: + saw[key] = True + if all(saw.values()): + break + return saw + + try: + seen = await asyncio.wait_for(_drive(), timeout=5.0) + finally: + pub_task.cancel() + try: + await pub_task + except (asyncio.CancelledError, Exception): + pass + await gen.aclose() + + assert seen["snapshot"] + assert seen["formed"] + assert seen["identity.assigned"] + + +def test_sse_name_maps_dotted_leaves(): + from decnet.web.router.campaigns.api_events import _sse_name_for + assert _sse_name_for("campaign.formed") == "formed" + assert _sse_name_for("campaign.identity.assigned") == "identity.assigned" + assert _sse_name_for("campaign.merged") == "merged" + assert _sse_name_for("campaign.unmerged") == "unmerged" + assert _sse_name_for("system.bus.health") == "system.bus.health" diff --git a/tests/web/test_api_campaigns.py b/tests/web/test_api_campaigns.py new file mode 100644 index 00000000..31dbacb7 --- /dev/null +++ b/tests/web/test_api_campaigns.py @@ -0,0 +1,254 @@ +"""Tests for the campaign-clustering read API. + +Mirrors :mod:`tests.web.test_api_identities` for the layer above. +The campaign clusterer is a separate worker; these tests cover the +read-only API which ships in the same wave. Empty-table behaviour, +soft-merge resolution, and pagination forwarding are the headline +cases. +""" +from datetime import datetime, timezone +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi import HTTPException + + +def _campaign_row( + uuid: str = "c-uuid-1", + merged_into_uuid: str | None = None, + identity_count: int = 0, +) -> dict: + now = datetime(2026, 4, 26, tzinfo=timezone.utc).isoformat() + return { + "uuid": uuid, + "schema_version": 1, + "first_seen_at": None, + "last_seen_at": None, + "created_at": now, + "updated_at": now, + "confidence": None, + "identity_count": identity_count, + "ja3_hashes": None, + "hassh_hashes": None, + "payload_simhashes": None, + "c2_endpoints": None, + "merged_into_uuid": merged_into_uuid, + "notes": None, + } + + +def _identity_row(uuid: str, campaign_id: str | None) -> dict: + return { + "uuid": uuid, + "schema_version": 1, + "campaign_id": campaign_id, + "merged_into_uuid": None, + } + + +# ─── GET /campaigns ────────────────────────────────────────────────────────── + + +class TestListCampaigns: + @pytest.mark.asyncio + async def test_empty_table_returns_zero_total(self): + from decnet.web.router.campaigns.api_list_campaigns import list_campaigns + + with patch( + "decnet.web.router.campaigns.api_list_campaigns.repo" + ) as mock_repo: + mock_repo.list_campaigns = AsyncMock(return_value=[]) + mock_repo.count_campaigns = AsyncMock(return_value=0) + + result = await list_campaigns( + limit=50, offset=0, user={"uuid": "u", "role": "viewer"} + ) + + assert result == {"total": 0, "limit": 50, "offset": 0, "data": []} + + @pytest.mark.asyncio + async def test_returns_seeded_data(self): + from decnet.web.router.campaigns.api_list_campaigns import list_campaigns + + rows = [_campaign_row(f"c-{n}") for n in range(3)] + with patch( + "decnet.web.router.campaigns.api_list_campaigns.repo" + ) as mock_repo: + mock_repo.list_campaigns = AsyncMock(return_value=rows) + mock_repo.count_campaigns = AsyncMock(return_value=3) + + result = await list_campaigns( + limit=50, offset=0, user={"uuid": "u", "role": "viewer"} + ) + + assert result["total"] == 3 + assert [r["uuid"] for r in result["data"]] == ["c-0", "c-1", "c-2"] + + @pytest.mark.asyncio + async def test_pagination_args_forwarded(self): + from decnet.web.router.campaigns.api_list_campaigns import list_campaigns + + with patch( + "decnet.web.router.campaigns.api_list_campaigns.repo" + ) as mock_repo: + mock_repo.list_campaigns = AsyncMock(return_value=[]) + mock_repo.count_campaigns = AsyncMock(return_value=0) + + await list_campaigns( + limit=10, offset=20, user={"uuid": "u", "role": "viewer"} + ) + + mock_repo.list_campaigns.assert_awaited_once_with(limit=10, offset=20) + + +# ─── GET /campaigns/{uuid} ─────────────────────────────────────────────────── + + +class TestGetCampaignDetail: + @pytest.mark.asyncio + async def test_404_on_missing_uuid(self): + from decnet.web.router.campaigns.api_get_campaign_detail import ( + get_campaign_detail, + ) + + with patch( + "decnet.web.router.campaigns.api_get_campaign_detail.repo" + ) as mock_repo: + mock_repo.get_campaign_by_uuid = AsyncMock(return_value=None) + + with pytest.raises(HTTPException) as exc: + await get_campaign_detail( + uuid="ghost", user={"uuid": "u", "role": "viewer"} + ) + assert exc.value.status_code == 404 + + @pytest.mark.asyncio + async def test_returns_campaign_with_live_identity_count(self): + from decnet.web.router.campaigns.api_get_campaign_detail import ( + get_campaign_detail, + ) + + campaign = _campaign_row("c-real", identity_count=2) + with patch( + "decnet.web.router.campaigns.api_get_campaign_detail.repo" + ) as mock_repo: + mock_repo.get_campaign_by_uuid = AsyncMock(return_value=campaign) + mock_repo.count_identities_for_campaign = AsyncMock(return_value=5) + + result = await get_campaign_detail( + uuid="c-real", user={"uuid": "u", "role": "viewer"} + ) + + assert result["uuid"] == "c-real" + assert result["identity_count_live"] == 5 + assert result["identity_count"] == 2 + + +# ─── GET /campaigns/{uuid}/identities ──────────────────────────────────────── + + +class TestListCampaignIdentities: + @pytest.mark.asyncio + async def test_404_when_campaign_missing(self): + from decnet.web.router.campaigns.api_list_campaign_identities import ( + list_campaign_identities, + ) + + with patch( + "decnet.web.router.campaigns.api_list_campaign_identities.repo" + ) as mock_repo: + mock_repo.get_campaign_by_uuid = AsyncMock(return_value=None) + + with pytest.raises(HTTPException) as exc: + await list_campaign_identities( + uuid="ghost", limit=50, offset=0, + user={"uuid": "u", "role": "viewer"}, + ) + assert exc.value.status_code == 404 + + @pytest.mark.asyncio + async def test_returns_identities_for_existing_campaign(self): + from decnet.web.router.campaigns.api_list_campaign_identities import ( + list_campaign_identities, + ) + + campaign = _campaign_row("c-real") + idents = [ + _identity_row("i-1", "c-real"), + _identity_row("i-2", "c-real"), + ] + with patch( + "decnet.web.router.campaigns.api_list_campaign_identities.repo" + ) as mock_repo: + mock_repo.get_campaign_by_uuid = AsyncMock(return_value=campaign) + mock_repo.list_identities_for_campaign = AsyncMock(return_value=idents) + mock_repo.count_identities_for_campaign = AsyncMock(return_value=2) + + result = await list_campaign_identities( + uuid="c-real", limit=50, offset=0, + user={"uuid": "u", "role": "viewer"}, + ) + + assert result["total"] == 2 + assert [r["uuid"] for r in result["data"]] == ["i-1", "i-2"] + + @pytest.mark.asyncio + async def test_merged_uuid_resolves_to_winners_identities(self): + """Soft-merged campaigns: identities are listed under the winner.""" + from decnet.web.router.campaigns.api_list_campaign_identities import ( + list_campaign_identities, + ) + + winner = _campaign_row("c-winner") + with patch( + "decnet.web.router.campaigns.api_list_campaign_identities.repo" + ) as mock_repo: + mock_repo.get_campaign_by_uuid = AsyncMock(return_value=winner) + mock_repo.list_identities_for_campaign = AsyncMock(return_value=[]) + mock_repo.count_identities_for_campaign = AsyncMock(return_value=0) + + await list_campaign_identities( + uuid="c-loser", limit=50, offset=0, + user={"uuid": "u", "role": "viewer"}, + ) + + mock_repo.list_identities_for_campaign.assert_awaited_once_with( + "c-winner", limit=50, offset=0, + ) + + +# ─── Repo-level integration ────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_repo_methods_against_empty_schema(tmp_path): + from decnet.web.db.factory import get_repository + + repo = get_repository(db_path=str(tmp_path / "campaigns.db")) + await repo.initialize() + + assert await repo.list_campaigns(limit=50, offset=0) == [] + assert await repo.count_campaigns() == 0 + assert await repo.get_campaign_by_uuid("anything") is None + assert await repo.list_identities_for_campaign("anything") == [] + assert await repo.count_identities_for_campaign("anything") == 0 + + +@pytest.mark.asyncio +async def test_repo_follows_campaign_merge_chain(tmp_path): + from decnet.web.db.factory import get_repository + + repo = get_repository(db_path=str(tmp_path / "merge.db")) + await repo.initialize() + await repo.create_campaign({"uuid": "winner-uuid"}) + await repo.create_campaign( + {"uuid": "loser-uuid", "merged_into_uuid": "winner-uuid"} + ) + + resolved = await repo.get_campaign_by_uuid("loser-uuid") + assert resolved is not None + assert resolved["uuid"] == "winner-uuid" + + direct = await repo.get_campaign_by_uuid("winner-uuid") + assert direct["uuid"] == "winner-uuid" + assert direct["merged_into_uuid"] is None