diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 17cdc336..6c13b051 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -364,6 +364,48 @@ class BaseRepository(ABC): """Retrieve the total count of attacker profile records, optionally filtered.""" pass + # ─── Identity resolution (Observation → Identity → Campaign) ─────────── + # The clusterer that populates these rows is a separate downstream + # effort. The read-only API ships first; until the clusterer runs, + # every method below returns empty/None against an empty table. + # See development/IDENTITY_RESOLUTION.md. + + @abstractmethod + async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + """ + Return one ``AttackerIdentity`` row by UUID, or ``None`` if absent. + + If the row has ``merged_into_uuid`` set (i.e. the clusterer + soft-merged it into another identity), implementations MUST + follow the chain and return the winner — callers should never + see a merged-out row as the answer to a fresh query. + """ + pass + + @abstractmethod + async def list_identities( + self, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + """Paginated list of identity rows, newest-updated first.""" + pass + + @abstractmethod + async def count_identities(self) -> int: + """Total identity rows. Excludes merged-out rows.""" + pass + + @abstractmethod + async def list_observations_for_identity( + self, identity_uuid: str, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + """``Attacker`` observation rows linked to the given identity, newest first.""" + pass + + @abstractmethod + async def count_observations_for_identity(self, identity_uuid: str) -> int: + """Total ``Attacker`` rows FK'd to this identity.""" + pass + @abstractmethod async def get_attacker_commands( self, diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 1a2f1721..676ab804 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -36,6 +36,7 @@ from decnet.web.db.models import ( State, Attacker, AttackerBehavior, + AttackerIdentity, AttackerIntel, SessionProfile, SmtpTarget, @@ -1390,6 +1391,83 @@ class SQLModelRepository(BaseRepository): result = await session.execute(statement) return result.scalar() or 0 + # ─── Identity resolution reads ──────────────────────────────────────── + + async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + # Follow merged_into_uuid up to the winner. Loop bounded by + # _MAX_MERGE_HOPS so a (hypothetically) corrupted ring can't + # spin the worker. Clusterer is responsible for never producing + # a cycle; this guard is belt-and-braces. + _MAX_MERGE_HOPS = 8 + async with self._session() as session: + current_uuid = uuid + for _ in range(_MAX_MERGE_HOPS): + result = await session.execute( + select(AttackerIdentity).where(AttackerIdentity.uuid == current_uuid) + ) + identity = result.scalar_one_or_none() + if identity is None: + return None + if identity.merged_into_uuid is None: + return identity.model_dump(mode="json") + current_uuid = identity.merged_into_uuid + # Hit the hop cap — surface what we have rather than recurse. + return identity.model_dump(mode="json") + + async def list_identities( + self, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + # Exclude merged-out rows so the list view is the de-duped truth. + # The history is still queryable per-uuid via get_identity_by_uuid + # and a future "merged into" endpoint when we need it. + statement = ( + select(AttackerIdentity) + .where(AttackerIdentity.merged_into_uuid.is_(None)) + .order_by(desc(AttackerIdentity.updated_at)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [i.model_dump(mode="json") for i in result.scalars().all()] + + async def count_identities(self) -> int: + statement = ( + select(func.count()) + .select_from(AttackerIdentity) + .where(AttackerIdentity.merged_into_uuid.is_(None)) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + + async def list_observations_for_identity( + self, identity_uuid: str, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + statement = ( + select(Attacker) + .where(Attacker.identity_id == identity_uuid) + .order_by(desc(Attacker.last_seen)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [ + self._deserialize_attacker(a.model_dump(mode="json")) + for a in result.scalars().all() + ] + + async def count_observations_for_identity(self, identity_uuid: str) -> int: + statement = ( + select(func.count()) + .select_from(Attacker) + .where(Attacker.identity_id == identity_uuid) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + async def get_attacker_commands( self, uuid: str, diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 71fc4402..1e232aa1 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -21,6 +21,9 @@ from .attackers.api_get_attacker_transcripts import router as attacker_transcrip from .attackers.api_get_attacker_smtp_targets import router as attacker_smtp_targets_router from .attackers.api_get_attacker_mail import router as attacker_mail_router from .attackers.api_get_attacker_intel import router as attacker_intel_router +from .identities.api_list_identities import router as identities_list_router +from .identities.api_get_identity_detail import router as identity_detail_router +from .identities.api_list_identity_observations import router as identity_observations_router from .transcripts import transcripts_router from .config.api_get_config import router as config_get_router from .config.api_update_config import router as config_update_router @@ -84,6 +87,14 @@ api_router.include_router(attacker_smtp_targets_router) api_router.include_router(attacker_mail_router) api_router.include_router(attacker_intel_router) +# Identity Resolution (read-only; populated by the clusterer worker — +# see development/IDENTITY_RESOLUTION.md). Empty until the clusterer +# ships; the API surface lands first so frontend + downstream work +# can target a stable shape. +api_router.include_router(identities_list_router) +api_router.include_router(identity_detail_router) +api_router.include_router(identity_observations_router) + # Observability api_router.include_router(stats_router) api_router.include_router(stream_router) diff --git a/decnet/web/router/identities/__init__.py b/decnet/web/router/identities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/decnet/web/router/identities/api_get_identity_detail.py b/decnet/web/router/identities/api_get_identity_detail.py new file mode 100644 index 00000000..f11e9ab5 --- /dev/null +++ b/decnet/web/router/identities/api_get_identity_detail.py @@ -0,0 +1,44 @@ +"""GET /api/v1/identities/{uuid} — single identity row. + +Soft-merge handling: if the requested UUID has merged_into_uuid set, +the repository follows the chain and returns the winner. Callers always +receive the canonical identity for any UUID that has ever been part of +the merge tree. + +Returns 404 against an empty/unknown UUID — expected response while the +clusterer hasn't run yet. +""" +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/identities/{uuid}", + tags=["Identity Resolution"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Identity not found"}, + }, +) +@_traced("api.get_identity_detail") +async def get_identity_detail( + uuid: str, + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + identity = await repo.get_identity_by_uuid(uuid) + if not identity: + raise HTTPException(status_code=404, detail="Identity not found") + # Cheap aggregates the IdentityDetail page surfaces. Counted off the + # FK rather than maintained in observation_count so the answer is + # always live (the denormalized field can lag the clusterer briefly). + identity["observation_count_live"] = await repo.count_observations_for_identity( + identity["uuid"] + ) + return identity diff --git a/decnet/web/router/identities/api_list_identities.py b/decnet/web/router/identities/api_list_identities.py new file mode 100644 index 00000000..749df9e2 --- /dev/null +++ b/decnet/web/router/identities/api_list_identities.py @@ -0,0 +1,35 @@ +"""GET /api/v1/identities — paginated list of resolved identities. + +Returns an empty list while the clusterer hasn't run yet (the +identities table ships empty in the schema-only PR). See +development/IDENTITY_RESOLUTION.md. +""" +from typing import Any + +from fastapi import APIRouter, Depends, Query + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/identities", + tags=["Identity Resolution"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 422: {"description": "Validation error"}, + }, +) +@_traced("api.list_identities") +async def list_identities( + limit: int = Query(50, ge=1, le=1000), + offset: int = Query(0, ge=0, le=2147483647), + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + """Paginated identity list, newest-updated first.""" + data = await repo.list_identities(limit=limit, offset=offset) + total = await repo.count_identities() + return {"total": total, "limit": limit, "offset": offset, "data": data} diff --git a/decnet/web/router/identities/api_list_identity_observations.py b/decnet/web/router/identities/api_list_identity_observations.py new file mode 100644 index 00000000..a4f36b09 --- /dev/null +++ b/decnet/web/router/identities/api_list_identity_observations.py @@ -0,0 +1,48 @@ +"""GET /api/v1/identities/{uuid}/observations — observations for an identity. + +Returns the per-IP ``Attacker`` rows whose ``identity_id`` FK points at +this identity. The shape mirrors ``AttackersResponse`` so the frontend +can reuse the same row component as the main attackers list. + +Empty result while the clusterer hasn't linked any observations yet. +""" +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Query + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/identities/{uuid}/observations", + tags=["Identity Resolution"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Identity not found"}, + }, +) +@_traced("api.list_identity_observations") +async def list_identity_observations( + uuid: str, + limit: int = Query(50, ge=1, le=1000), + offset: int = Query(0, ge=0, le=2147483647), + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + # 404 if the identity itself doesn't exist. Otherwise return the + # observations linked to it (which may be empty — a freshly-formed + # identity briefly has no observations yet from the FK side). + identity = await repo.get_identity_by_uuid(uuid) + if not identity: + raise HTTPException(status_code=404, detail="Identity not found") + # If the requested uuid was merged, return observations under the + # winner's uuid (which is what get_identity_by_uuid resolves to). + canonical_uuid = identity["uuid"] + data = await repo.list_observations_for_identity( + canonical_uuid, limit=limit, offset=offset + ) + total = await repo.count_observations_for_identity(canonical_uuid) + return {"total": total, "limit": limit, "offset": offset, "data": data} diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index c4c74c71..6f3bdc56 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -55,6 +55,17 @@ class DummyRepo(BaseRepository): async def get_attacker_artifacts(self, uuid): await super().get_attacker_artifacts(uuid) async def get_attacker_transcripts(self, uuid): await super().get_attacker_transcripts(uuid) async def get_session_log(self, sid): await super().get_session_log(sid) + # DEBT-041 / 3eb67c9 — attacker_intel re-key + async def find_credential_reuse_candidates(self, min_targets=2): await super().find_credential_reuse_candidates(min_targets); return [] + async def get_attacker_intel_by_uuid(self, u): await super().get_attacker_intel_by_uuid(u) + async def get_unenriched_attackers(self, limit=100): await super().get_unenriched_attackers(limit) + async def upsert_attacker_intel(self, d): await super().upsert_attacker_intel(d); return "" + # Identity resolution (this PR) + async def get_identity_by_uuid(self, u): await super().get_identity_by_uuid(u) + async def list_identities(self, limit=50, offset=0): await super().list_identities(limit, offset); return [] + async def count_identities(self): await super().count_identities(); return 0 + async def list_observations_for_identity(self, u, limit=50, offset=0): await super().list_observations_for_identity(u, limit, offset); return [] + async def count_observations_for_identity(self, u): await super().count_observations_for_identity(u); return 0 @pytest.mark.asyncio async def test_base_repo_coverage(): @@ -113,6 +124,15 @@ async def test_base_repo_coverage(): await dr.get_attacker_artifacts("a") await dr.get_attacker_transcripts("a") await dr.get_session_log("a") + await dr.find_credential_reuse_candidates() + await dr.get_attacker_intel_by_uuid("a") + await dr.get_unenriched_attackers() + await dr.upsert_attacker_intel({"attacker_uuid": "a", "attacker_ip": "1.1.1.1"}) + await dr.get_identity_by_uuid("a") + await dr.list_identities() + await dr.count_identities() + await dr.list_observations_for_identity("a") + await dr.count_observations_for_identity("a") # Swarm methods: default NotImplementedError on BaseRepository. Covering # them here keeps the coverage contract honest for the swarm CRUD surface. diff --git a/tests/web/test_api_identities.py b/tests/web/test_api_identities.py new file mode 100644 index 00000000..459cbfa6 --- /dev/null +++ b/tests/web/test_api_identities.py @@ -0,0 +1,313 @@ +""" +Tests for the identity-resolution read API. + +The clusterer that populates identities is a separate downstream effort +(see development/IDENTITY_RESOLUTION.md); these tests cover the +read-only API that ships first. The identities table is empty at +deployment time, so the headline cases are: + +* GET /identities returns {total: 0, data: []} cleanly +* GET /identities/{uuid} returns 404 cleanly +* GET /identities/{uuid}/observations returns 404 if identity missing +* with seeded data, the routes return what the repository returns +* a soft-merged identity (merged_into_uuid set) resolves to the winner +""" +from datetime import datetime, timezone +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi import HTTPException + + +# ─── Helpers ────────────────────────────────────────────────────────────────── + + +def _identity_row( + uuid: str = "id-uuid-1", + merged_into_uuid: str | None = None, + observation_count: int = 0, +) -> dict: + now = datetime(2026, 4, 26, tzinfo=timezone.utc).isoformat() + return { + "uuid": uuid, + "schema_version": 1, + "campaign_id": None, + "first_seen_at": None, + "last_seen_at": None, + "created_at": now, + "updated_at": now, + "confidence": None, + "observation_count": observation_count, + "ja3_hashes": None, + "hassh_hashes": None, + "payload_simhashes": None, + "c2_endpoints": None, + "kd_digraph_simhash": None, + "merged_into_uuid": merged_into_uuid, + "notes": None, + } + + +def _observation_row(uuid: str, identity_id: str | None) -> dict: + return { + "uuid": uuid, + "ip": "203.0.113.7", + "identity_id": identity_id, + "first_seen": datetime(2026, 4, 1, tzinfo=timezone.utc).isoformat(), + "last_seen": datetime(2026, 4, 26, tzinfo=timezone.utc).isoformat(), + "event_count": 5, + } + + +# ─── GET /identities ───────────────────────────────────────────────────────── + + +class TestListIdentities: + @pytest.mark.asyncio + async def test_empty_table_returns_zero_total(self): + from decnet.web.router.identities.api_list_identities import list_identities + + with patch( + "decnet.web.router.identities.api_list_identities.repo" + ) as mock_repo: + mock_repo.list_identities = AsyncMock(return_value=[]) + mock_repo.count_identities = AsyncMock(return_value=0) + + result = await list_identities( + limit=50, offset=0, user={"uuid": "u", "role": "viewer"} + ) + + assert result == {"total": 0, "limit": 50, "offset": 0, "data": []} + + @pytest.mark.asyncio + async def test_returns_seeded_data(self): + from decnet.web.router.identities.api_list_identities import list_identities + + rows = [_identity_row(f"id-{n}") for n in range(3)] + with patch( + "decnet.web.router.identities.api_list_identities.repo" + ) as mock_repo: + mock_repo.list_identities = AsyncMock(return_value=rows) + mock_repo.count_identities = AsyncMock(return_value=3) + + result = await list_identities( + limit=50, offset=0, user={"uuid": "u", "role": "viewer"} + ) + + assert result["total"] == 3 + assert [r["uuid"] for r in result["data"]] == ["id-0", "id-1", "id-2"] + + @pytest.mark.asyncio + async def test_pagination_args_forwarded(self): + from decnet.web.router.identities.api_list_identities import list_identities + + with patch( + "decnet.web.router.identities.api_list_identities.repo" + ) as mock_repo: + mock_repo.list_identities = AsyncMock(return_value=[]) + mock_repo.count_identities = AsyncMock(return_value=0) + + await list_identities( + limit=10, offset=20, user={"uuid": "u", "role": "viewer"} + ) + + mock_repo.list_identities.assert_awaited_once_with(limit=10, offset=20) + + +# ─── GET /identities/{uuid} ────────────────────────────────────────────────── + + +class TestGetIdentityDetail: + @pytest.mark.asyncio + async def test_404_on_missing_uuid(self): + from decnet.web.router.identities.api_get_identity_detail import ( + get_identity_detail, + ) + + with patch( + "decnet.web.router.identities.api_get_identity_detail.repo" + ) as mock_repo: + mock_repo.get_identity_by_uuid = AsyncMock(return_value=None) + + with pytest.raises(HTTPException) as exc: + await get_identity_detail( + uuid="ghost", user={"uuid": "u", "role": "viewer"} + ) + assert exc.value.status_code == 404 + + @pytest.mark.asyncio + async def test_returns_identity_with_live_observation_count(self): + from decnet.web.router.identities.api_get_identity_detail import ( + get_identity_detail, + ) + + identity = _identity_row("id-real", observation_count=2) + with patch( + "decnet.web.router.identities.api_get_identity_detail.repo" + ) as mock_repo: + mock_repo.get_identity_by_uuid = AsyncMock(return_value=identity) + # Live count overrides the (potentially stale) denormalized + # observation_count on the row. + mock_repo.count_observations_for_identity = AsyncMock(return_value=5) + + result = await get_identity_detail( + uuid="id-real", user={"uuid": "u", "role": "viewer"} + ) + + assert result["uuid"] == "id-real" + assert result["observation_count_live"] == 5 + # Original denormalized count is preserved on the row. + assert result["observation_count"] == 2 + + +# ─── GET /identities/{uuid}/observations ───────────────────────────────────── + + +class TestListIdentityObservations: + @pytest.mark.asyncio + async def test_404_when_identity_missing(self): + from decnet.web.router.identities.api_list_identity_observations import ( + list_identity_observations, + ) + + with patch( + "decnet.web.router.identities.api_list_identity_observations.repo" + ) as mock_repo: + mock_repo.get_identity_by_uuid = AsyncMock(return_value=None) + + with pytest.raises(HTTPException) as exc: + await list_identity_observations( + uuid="ghost", + limit=50, + offset=0, + user={"uuid": "u", "role": "viewer"}, + ) + assert exc.value.status_code == 404 + + @pytest.mark.asyncio + async def test_returns_observations_for_existing_identity(self): + from decnet.web.router.identities.api_list_identity_observations import ( + list_identity_observations, + ) + + identity = _identity_row("id-real") + observations = [ + _observation_row("att-1", identity_id="id-real"), + _observation_row("att-2", identity_id="id-real"), + ] + with patch( + "decnet.web.router.identities.api_list_identity_observations.repo" + ) as mock_repo: + mock_repo.get_identity_by_uuid = AsyncMock(return_value=identity) + mock_repo.list_observations_for_identity = AsyncMock( + return_value=observations + ) + mock_repo.count_observations_for_identity = AsyncMock(return_value=2) + + result = await list_identity_observations( + uuid="id-real", + limit=50, + offset=0, + user={"uuid": "u", "role": "viewer"}, + ) + + assert result["total"] == 2 + assert [r["uuid"] for r in result["data"]] == ["att-1", "att-2"] + + @pytest.mark.asyncio + async def test_merged_uuid_resolves_to_winners_observations(self): + """ + When the user requests observations for a soft-merged identity, + get_identity_by_uuid follows the merged_into chain and returns + the winner. The endpoint MUST list observations under the + winner's UUID, not the loser's. Otherwise an operator linking + through cached merge events sees an empty page. + """ + from decnet.web.router.identities.api_list_identity_observations import ( + list_identity_observations, + ) + + # Repo returns the WINNER row even though we asked for the loser's uuid. + winner = _identity_row("id-winner") + with patch( + "decnet.web.router.identities.api_list_identity_observations.repo" + ) as mock_repo: + mock_repo.get_identity_by_uuid = AsyncMock(return_value=winner) + mock_repo.list_observations_for_identity = AsyncMock(return_value=[]) + mock_repo.count_observations_for_identity = AsyncMock(return_value=0) + + await list_identity_observations( + uuid="id-loser", + limit=50, + offset=0, + user={"uuid": "u", "role": "viewer"}, + ) + + # Critical assertion: list_observations_for_identity is called + # with the winner's UUID, not the requested (loser's) one. + mock_repo.list_observations_for_identity.assert_awaited_once_with( + "id-winner", limit=50, offset=0 + ) + + +# ─── Repo-level integration: empty schema returns expected shapes ──────────── + + +@pytest.mark.asyncio +async def test_repo_methods_against_empty_schema(tmp_path): + """ + With a freshly initialized SQLite database (no rows), every read + method returns the expected empty/None response. Smoke-tests the + repository layer without going through the FastAPI route layer. + """ + from decnet.web.db.sqlite.repository import SQLiteRepository + from decnet.web.db.sqlite.database import init_db + + db_path = str(tmp_path / "ids.db") + init_db(db_path) + repo = SQLiteRepository(db_path=db_path) + + assert await repo.list_identities(limit=50, offset=0) == [] + assert await repo.count_identities() == 0 + assert await repo.get_identity_by_uuid("anything") is None + assert await repo.list_observations_for_identity("anything") == [] + assert await repo.count_observations_for_identity("anything") == 0 + + +@pytest.mark.asyncio +async def test_repo_follows_merged_into_chain(tmp_path): + """ + get_identity_by_uuid must transparently follow merged_into_uuid to + surface the canonical winner. This is the contract the endpoint + relies on for soft-merged identity resolution. + """ + from decnet.web.db.models import AttackerIdentity + from decnet.web.db.sqlite.database import init_db + from decnet.web.db.sqlite.repository import SQLiteRepository + from sqlmodel import Session + from decnet.web.db.sqlite.database import get_sync_engine + + db_path = str(tmp_path / "merge.db") + init_db(db_path) + + # Insert two identities via direct SQL: a winner and a loser whose + # merged_into_uuid points at the winner. + engine = get_sync_engine(db_path) + with Session(engine) as session: + winner = AttackerIdentity(uuid="winner-uuid") + loser = AttackerIdentity(uuid="loser-uuid", merged_into_uuid="winner-uuid") + session.add(winner) + session.add(loser) + session.commit() + + repo = SQLiteRepository(db_path=db_path) + resolved = await repo.get_identity_by_uuid("loser-uuid") + assert resolved is not None + assert resolved["uuid"] == "winner-uuid", ( + "get_identity_by_uuid must follow merged_into_uuid to the winner" + ) + + # And the winner queried directly resolves to itself. + direct = await repo.get_identity_by_uuid("winner-uuid") + assert direct["uuid"] == "winner-uuid" + assert direct["merged_into_uuid"] is None