From 33f7d5a9ff572682d2a2798ca2ba348b9881ddbc Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 9 May 2026 02:21:59 -0400 Subject: [PATCH] feat(web): expose attribution state on AttackerDetail backend (Phase 6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET /api/v1/attackers/{uuid}/attribution Returns the merger output for an attacker's identity: { "identity_uuid": "abc..." | null, "primitives": [ {primitive, current_value, state, confidence, observation_count, last_change_ts, last_observation_ts}, ... ] } Pre-attribution-worker: identity_uuid=null, primitives=[]. Surfacing identity_uuid keeps the cross-attacker rollup story visible to the frontend ahead of v1's clusterer landing. api_events SSE relay also subscribes to attribution.> and forwards to the AttackerDetail page filtered on payload.identity_uuid (the identity is resolved at stream open from the URL's attacker_uuid; attribution payloads are identity-keyed, not attacker-keyed). New SSE event names: attribution.state_changed, attribution.multi_actor_suspected. Frontend (AttackerDetail.tsx badge rendering, useAttackerStream consumer) deferred — there's already WIP on AttackerDetail.tsx in the working tree; merging the badge logic is a separate commit once that lands. Tests: 4 endpoint scenarios — 401 unauth, 404 unknown attacker, 200 empty (no stub), 200 with primitive-ordered rows. --- decnet/web/router/__init__.py | 2 + decnet/web/router/attackers/api_events.py | 51 ++++++-- .../attackers/api_get_attacker_attribution.py | 92 ++++++++++++++ .../attackers/test_attribution_endpoint.py | 113 ++++++++++++++++++ 4 files changed, 248 insertions(+), 10 deletions(-) create mode 100644 decnet/web/router/attackers/api_get_attacker_attribution.py create mode 100644 tests/api/attackers/test_attribution_endpoint.py diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 38c9b8fb..8aadbe8b 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -23,6 +23,7 @@ from .attackers.api_get_attacker_transcripts import router as attacker_transcrip from .attackers.api_get_attacker_smtp_targets import router as attacker_smtp_targets_router from .attackers.api_get_attacker_mail import router as attacker_mail_router from .attackers.api_get_attacker_intel import router as attacker_intel_router +from .attackers.api_get_attacker_attribution import router as attacker_attribution_router from .identities.api_list_identities import router as identities_list_router from .identities.api_get_identity_detail import router as identity_detail_router from .identities.api_list_identity_observations import router as identity_observations_router @@ -111,6 +112,7 @@ api_router.include_router(attacker_transcripts_router) api_router.include_router(attacker_smtp_targets_router) api_router.include_router(attacker_mail_router) api_router.include_router(attacker_intel_router) +api_router.include_router(attacker_attribution_router) # Identity Resolution (read-only; populated by the clusterer worker — # see development/IDENTITY_RESOLUTION.md). Empty until the clusterer diff --git a/decnet/web/router/attackers/api_events.py b/decnet/web/router/attackers/api_events.py index 5d7fe4d8..25f7b168 100644 --- a/decnet/web/router/attackers/api_events.py +++ b/decnet/web/router/attackers/api_events.py @@ -59,8 +59,10 @@ def _sse_name_for(topic: str) -> str: register 37+ listeners or know the registry. Single event name keeps the EventSource handler shape uniform.) - ``attacker.fingerprint_rotated`` → ``fingerprint.rotated`` - ``attacker.scored`` → ``attacker.scored`` + ``attacker.fingerprint_rotated`` → ``fingerprint.rotated`` + ``attacker.scored`` → ``attacker.scored`` + ``attribution.profile.state_changed`` → ``attribution.state_changed`` + ``attribution.profile.multi_actor_suspected`` → ``attribution.multi_actor_suspected`` Anything else passes through unchanged so a future ``attacker.*`` family doesn't silently collapse onto a generic bucket. @@ -71,6 +73,12 @@ def _sse_name_for(topic: str) -> str: return "fingerprint.rotated" if topic == f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}": return "attacker.scored" + if topic == _topics.attribution(_topics.ATTRIBUTION_PROFILE_STATE_CHANGED): + return "attribution.state_changed" + if topic == _topics.attribution( + _topics.ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED, + ): + return "attribution.multi_actor_suspected" return topic @@ -95,7 +103,12 @@ async def api_attacker_events( user: dict = Depends(require_stream_viewer), ) -> StreamingResponse: # 404-after-auth so an existence probe can't enumerate attacker UUIDs. - await get_attacker_or_404(attacker_uuid) + attacker = await get_attacker_or_404(attacker_uuid) + # Pre-resolve the identity_uuid so attribution.profile.* events + # (keyed on identity_uuid, not attacker_uuid) can be filtered + # without a per-event repo lookup. None until the attribution + # worker stamps a stub on first observation. + identity_uuid = attacker.get("identity_id") if isinstance(attacker, dict) else None snapshot_per_primitive = await repo.latest_observation_per_primitive( attacker_uuid, @@ -129,9 +142,10 @@ async def api_attacker_events( yield ": keepalive\n\n" return - # Three subscriptions, merged through one queue. Per-attacker - # filter on payload["attacker_uuid"] — the profiler worker - # stamps it on every published payload (Phase 5 amendment). + # Five subscriptions, merged through one queue. Filter on + # payload["attacker_uuid"] for attacker.* events; on + # payload["identity_uuid"] (resolved at stream open) for + # attribution.profile.* events. obs_sub = bus.subscribe(f"{_topics.ATTACKER}.{_topics.ATTACKER_OBSERVATION_PREFIX}.>") fp_sub = bus.subscribe( f"{_topics.ATTACKER}.{_topics.ATTACKER_FINGERPRINT_ROTATED}", @@ -139,9 +153,10 @@ async def api_attacker_events( score_sub = bus.subscribe( f"{_topics.ATTACKER}.{_topics.ATTACKER_SCORED}", ) + attribution_sub = bus.subscribe(f"{_topics.ATTRIBUTION}.>") queue: asyncio.Queue = asyncio.Queue(maxsize=_QUEUE_MAX) - async def _pump(sub) -> None: + async def _pump_by_attacker(sub) -> None: async with sub: async for ev in sub: payload = ev.payload or {} @@ -155,10 +170,26 @@ async def api_attacker_events( # cover any gap a slow consumer creates. pass + async def _pump_by_identity(sub) -> None: + async with sub: + async for ev in sub: + payload = ev.payload or {} + # If the attacker has no stub identity yet, + # there's nothing to filter on — skip silently. + if identity_uuid is None: + continue + if payload.get("identity_uuid") != identity_uuid: + continue + try: + queue.put_nowait(ev) + except asyncio.QueueFull: + pass + tasks = [ - asyncio.create_task(_pump(obs_sub)), - asyncio.create_task(_pump(fp_sub)), - asyncio.create_task(_pump(score_sub)), + asyncio.create_task(_pump_by_attacker(obs_sub)), + asyncio.create_task(_pump_by_attacker(fp_sub)), + asyncio.create_task(_pump_by_attacker(score_sub)), + asyncio.create_task(_pump_by_identity(attribution_sub)), ] try: while True: diff --git a/decnet/web/router/attackers/api_get_attacker_attribution.py b/decnet/web/router/attackers/api_get_attacker_attribution.py new file mode 100644 index 00000000..5775a5f8 --- /dev/null +++ b/decnet/web/router/attackers/api_get_attacker_attribution.py @@ -0,0 +1,92 @@ +"""GET /api/v1/attackers/{uuid}/attribution — per-primitive +attribution state for one attacker. + +Returns the merger output produced by +:mod:`decnet.correlation.attribution_worker` over the observations +linked to this attacker's identity. Pre-clusterer (v0), every +attacker has a 1:1 stub identity, so the returned set is the merger +output for the single attacker; v1's clusterer makes the rollup +cross-attacker. + +Empty ``primitives`` is the honest answer when: + +- The attribution worker has not yet processed an observation for + this attacker (race with first-sight + behave-shell ingest), OR +- The attacker has fewer than ``MIN_OBSERVATIONS_FOR_STATE`` + observations of any primitive — every state row would be ``unknown``, + and the worker writes those, so the empty case is genuinely "engine + hasn't run yet". + +The response includes ``identity_uuid`` so AttackerDetail can render +a "rolls up to identity X" hint ahead of the v1 IdentityDetail wire- +up — we don't pretend the keying is per-attacker. +""" +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter, Depends + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +from ._guards import get_attacker_or_404 + +router = APIRouter() + + +@router.get( + "/attackers/{uuid}/attribution", + tags=["Attacker Profiles"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Attacker not found"}, + }, +) +@_traced("api.get_attacker_attribution") +async def get_attacker_attribution( + uuid: str, + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + """Return per-primitive attribution state for an attacker. + + Shape:: + + { + "identity_uuid": "abc123..." | null, + "primitives": [ + { + "primitive": "motor.input_modality", + "current_value": "pasted", + "state": "stable", + "confidence": 0.91, + "observation_count": 7, + "last_change_ts": 1714521660.456, + "last_observation_ts": 1714521660.456 + }, + ... + ] + } + """ + attacker = await get_attacker_or_404(uuid) + identity_uuid = attacker.get("identity_id") if isinstance(attacker, dict) else None + if not identity_uuid: + # Attacker exists but the attribution worker has not yet + # stamped a stub identity on first observation. + return {"identity_uuid": None, "primitives": []} + + rows = await repo.get_attribution_state_for_identity(identity_uuid) + primitives = [ + { + "primitive": row["primitive"], + "current_value": row["current_value"], + "state": row["state"], + "confidence": row["confidence"], + "observation_count": row["observation_count"], + "last_change_ts": row["last_change_ts"], + "last_observation_ts": row["last_observation_ts"], + } + for row in rows + ] + return {"identity_uuid": identity_uuid, "primitives": primitives} diff --git a/tests/api/attackers/test_attribution_endpoint.py b/tests/api/attackers/test_attribution_endpoint.py new file mode 100644 index 00000000..b7d087e6 --- /dev/null +++ b/tests/api/attackers/test_attribution_endpoint.py @@ -0,0 +1,113 @@ +"""Phase 6 — GET /api/v1/attackers/{uuid}/attribution. + +Pins the contract: 401 unauth, 404 unknown attacker, 200 with empty +``primitives`` for an attacker with no stub identity yet, 200 with +populated ``primitives`` after the attribution worker has run. +""" +from __future__ import annotations + +from datetime import datetime, timezone + +import httpx +import pytest + +from decnet.web.dependencies import repo as _repo + +_V1 = "/api/v1/attackers" +_OTHER_UUID = "00000000-0000-0000-0000-000000000099" + + +async def _seed_attacker(ip: str = "10.0.0.5") -> str: + return await _repo.upsert_attacker({ + "ip": ip, + "first_seen": datetime.now(timezone.utc), + "last_seen": datetime.now(timezone.utc), + }) + + +@pytest.mark.asyncio +async def test_attribution_unauthenticated( + client: httpx.AsyncClient, +) -> None: + """No Bearer token → 401, full stop.""" + auid = await _seed_attacker() + resp = await client.get(f"{_V1}/{auid}/attribution") + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_attribution_unknown_attacker_returns_404( + client: httpx.AsyncClient, auth_token: str, +) -> None: + resp = await client.get( + f"{_V1}/{_OTHER_UUID}/attribution", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_attribution_no_stub_yet( + client: httpx.AsyncClient, auth_token: str, +) -> None: + """Attacker exists but the attribution worker hasn't seen any + observations yet → 200 with identity_uuid=None and empty list.""" + auid = await _seed_attacker(ip="10.0.0.10") + resp = await client.get( + f"{_V1}/{auid}/attribution", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["identity_uuid"] is None + assert body["primitives"] == [] + + +@pytest.mark.asyncio +async def test_attribution_returns_state_rows( + client: httpx.AsyncClient, auth_token: str, +) -> None: + """After stub identity + state writes, the endpoint surfaces + every per-primitive row, primitive-ordered.""" + auid = await _seed_attacker(ip="10.0.0.11") + iuid = await _repo.ensure_stub_identity_for_attacker(auid) + assert iuid is not None + for primitive, state in [ + ("motor.input_modality", "stable"), + ("cognitive.feedback_loop_engagement", "drifting"), + ]: + await _repo.upsert_attribution_state({ + "identity_uuid": iuid, + "primitive": primitive, + "current_value": "x", + "state": state, + "confidence": 0.85, + "observation_count": 5, + "last_change_ts": 1714000000.0, + "last_observation_ts": 1714000000.0, + }) + + resp = await client.get( + f"{_V1}/{auid}/attribution", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["identity_uuid"] == iuid + primitives = body["primitives"] + assert len(primitives) == 2 + # Primitive-ordered. + assert [p["primitive"] for p in primitives] == [ + "cognitive.feedback_loop_engagement", + "motor.input_modality", + ] + # Schema sanity. + expected_keys = { + "primitive", "current_value", "state", "confidence", + "observation_count", "last_change_ts", "last_observation_ts", + } + for p in primitives: + assert set(p.keys()) == expected_keys + states = {p["primitive"]: p["state"] for p in primitives} + assert states["motor.input_modality"] == "stable" + assert states["cognitive.feedback_loop_engagement"] == "drifting"