feat(web): read-only /api/v1/identities/* endpoints + repo methods

Second of the five-step identity-resolution substrate. Ships the API
surface against the empty AttackerIdentity table from commit 1 — every
endpoint returns empty/404 cleanly until the clusterer populates rows.

Routes (auth-gated, viewer role):
* GET /api/v1/identities — paginated list, excludes merged-out rows
* GET /api/v1/identities/{uuid} — detail; transparently follows
  merged_into_uuid to surface the canonical winner
* GET /api/v1/identities/{uuid}/observations — Attacker rows FK'd
  to the (resolved) identity uuid

Repository (BaseRepository abstract + SQLModelRepository concrete):
* get_identity_by_uuid (with merge-chain following, hop-bounded)
* list_identities / count_identities (excluding merged-out)
* list_observations_for_identity / count_observations_for_identity

Tests: 12 new (empty-table behavior, seeded data, merge-chain
resolution, repo-level smoke against real SQLite). Also fixes the
pre-existing test_base_repo_coverage failure (DEBT-041 added abstract
methods without updating the DummyRepo stub) — included here because
this PR adds 5 more abstract methods, fixing it as a bonus.

474 db/web/profiler/correlation tests green.
This commit is contained in:
2026-04-26 07:08:55 -04:00
parent 84c1ca9c9b
commit dc3d08dd41
9 changed files with 591 additions and 0 deletions

View File

@@ -364,6 +364,48 @@ class BaseRepository(ABC):
"""Retrieve the total count of attacker profile records, optionally filtered.""" """Retrieve the total count of attacker profile records, optionally filtered."""
pass pass
# ─── Identity resolution (Observation → Identity → Campaign) ───────────
# The clusterer that populates these rows is a separate downstream
# effort. The read-only API ships first; until the clusterer runs,
# every method below returns empty/None against an empty table.
# See development/IDENTITY_RESOLUTION.md.
@abstractmethod
async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
"""
Return one ``AttackerIdentity`` row by UUID, or ``None`` if absent.
If the row has ``merged_into_uuid`` set (i.e. the clusterer
soft-merged it into another identity), implementations MUST
follow the chain and return the winner — callers should never
see a merged-out row as the answer to a fresh query.
"""
pass
@abstractmethod
async def list_identities(
self, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
"""Paginated list of identity rows, newest-updated first."""
pass
@abstractmethod
async def count_identities(self) -> int:
"""Total identity rows. Excludes merged-out rows."""
pass
@abstractmethod
async def list_observations_for_identity(
self, identity_uuid: str, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
"""``Attacker`` observation rows linked to the given identity, newest first."""
pass
@abstractmethod
async def count_observations_for_identity(self, identity_uuid: str) -> int:
"""Total ``Attacker`` rows FK'd to this identity."""
pass
@abstractmethod @abstractmethod
async def get_attacker_commands( async def get_attacker_commands(
self, self,

View File

@@ -36,6 +36,7 @@ from decnet.web.db.models import (
State, State,
Attacker, Attacker,
AttackerBehavior, AttackerBehavior,
AttackerIdentity,
AttackerIntel, AttackerIntel,
SessionProfile, SessionProfile,
SmtpTarget, SmtpTarget,
@@ -1390,6 +1391,83 @@ class SQLModelRepository(BaseRepository):
result = await session.execute(statement) result = await session.execute(statement)
return result.scalar() or 0 return result.scalar() or 0
# ─── Identity resolution reads ────────────────────────────────────────
async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
# Follow merged_into_uuid up to the winner. Loop bounded by
# _MAX_MERGE_HOPS so a (hypothetically) corrupted ring can't
# spin the worker. Clusterer is responsible for never producing
# a cycle; this guard is belt-and-braces.
_MAX_MERGE_HOPS = 8
async with self._session() as session:
current_uuid = uuid
for _ in range(_MAX_MERGE_HOPS):
result = await session.execute(
select(AttackerIdentity).where(AttackerIdentity.uuid == current_uuid)
)
identity = result.scalar_one_or_none()
if identity is None:
return None
if identity.merged_into_uuid is None:
return identity.model_dump(mode="json")
current_uuid = identity.merged_into_uuid
# Hit the hop cap — surface what we have rather than recurse.
return identity.model_dump(mode="json")
async def list_identities(
self, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
# Exclude merged-out rows so the list view is the de-duped truth.
# The history is still queryable per-uuid via get_identity_by_uuid
# and a future "merged into" endpoint when we need it.
statement = (
select(AttackerIdentity)
.where(AttackerIdentity.merged_into_uuid.is_(None))
.order_by(desc(AttackerIdentity.updated_at))
.offset(offset)
.limit(limit)
)
async with self._session() as session:
result = await session.execute(statement)
return [i.model_dump(mode="json") for i in result.scalars().all()]
async def count_identities(self) -> int:
statement = (
select(func.count())
.select_from(AttackerIdentity)
.where(AttackerIdentity.merged_into_uuid.is_(None))
)
async with self._session() as session:
result = await session.execute(statement)
return result.scalar() or 0
async def list_observations_for_identity(
self, identity_uuid: str, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
statement = (
select(Attacker)
.where(Attacker.identity_id == identity_uuid)
.order_by(desc(Attacker.last_seen))
.offset(offset)
.limit(limit)
)
async with self._session() as session:
result = await session.execute(statement)
return [
self._deserialize_attacker(a.model_dump(mode="json"))
for a in result.scalars().all()
]
async def count_observations_for_identity(self, identity_uuid: str) -> int:
statement = (
select(func.count())
.select_from(Attacker)
.where(Attacker.identity_id == identity_uuid)
)
async with self._session() as session:
result = await session.execute(statement)
return result.scalar() or 0
async def get_attacker_commands( async def get_attacker_commands(
self, self,
uuid: str, uuid: str,

View File

@@ -21,6 +21,9 @@ from .attackers.api_get_attacker_transcripts import router as attacker_transcrip
from .attackers.api_get_attacker_smtp_targets import router as attacker_smtp_targets_router from .attackers.api_get_attacker_smtp_targets import router as attacker_smtp_targets_router
from .attackers.api_get_attacker_mail import router as attacker_mail_router from .attackers.api_get_attacker_mail import router as attacker_mail_router
from .attackers.api_get_attacker_intel import router as attacker_intel_router from .attackers.api_get_attacker_intel import router as attacker_intel_router
from .identities.api_list_identities import router as identities_list_router
from .identities.api_get_identity_detail import router as identity_detail_router
from .identities.api_list_identity_observations import router as identity_observations_router
from .transcripts import transcripts_router from .transcripts import transcripts_router
from .config.api_get_config import router as config_get_router from .config.api_get_config import router as config_get_router
from .config.api_update_config import router as config_update_router from .config.api_update_config import router as config_update_router
@@ -84,6 +87,14 @@ api_router.include_router(attacker_smtp_targets_router)
api_router.include_router(attacker_mail_router) api_router.include_router(attacker_mail_router)
api_router.include_router(attacker_intel_router) api_router.include_router(attacker_intel_router)
# Identity Resolution (read-only; populated by the clusterer worker —
# see development/IDENTITY_RESOLUTION.md). Empty until the clusterer
# ships; the API surface lands first so frontend + downstream work
# can target a stable shape.
api_router.include_router(identities_list_router)
api_router.include_router(identity_detail_router)
api_router.include_router(identity_observations_router)
# Observability # Observability
api_router.include_router(stats_router) api_router.include_router(stats_router)
api_router.include_router(stream_router) api_router.include_router(stream_router)

View File

View File

@@ -0,0 +1,44 @@
"""GET /api/v1/identities/{uuid} — single identity row.
Soft-merge handling: if the requested UUID has merged_into_uuid set,
the repository follows the chain and returns the winner. Callers always
receive the canonical identity for any UUID that has ever been part of
the merge tree.
Returns 404 against an empty/unknown UUID — expected response while the
clusterer hasn't run yet.
"""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/identities/{uuid}",
tags=["Identity Resolution"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Identity not found"},
},
)
@_traced("api.get_identity_detail")
async def get_identity_detail(
uuid: str,
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
identity = await repo.get_identity_by_uuid(uuid)
if not identity:
raise HTTPException(status_code=404, detail="Identity not found")
# Cheap aggregates the IdentityDetail page surfaces. Counted off the
# FK rather than maintained in observation_count so the answer is
# always live (the denormalized field can lag the clusterer briefly).
identity["observation_count_live"] = await repo.count_observations_for_identity(
identity["uuid"]
)
return identity

View File

@@ -0,0 +1,35 @@
"""GET /api/v1/identities — paginated list of resolved identities.
Returns an empty list while the clusterer hasn't run yet (the
identities table ships empty in the schema-only PR). See
development/IDENTITY_RESOLUTION.md.
"""
from typing import Any
from fastapi import APIRouter, Depends, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/identities",
tags=["Identity Resolution"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
422: {"description": "Validation error"},
},
)
@_traced("api.list_identities")
async def list_identities(
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0, le=2147483647),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
"""Paginated identity list, newest-updated first."""
data = await repo.list_identities(limit=limit, offset=offset)
total = await repo.count_identities()
return {"total": total, "limit": limit, "offset": offset, "data": data}

View File

@@ -0,0 +1,48 @@
"""GET /api/v1/identities/{uuid}/observations — observations for an identity.
Returns the per-IP ``Attacker`` rows whose ``identity_id`` FK points at
this identity. The shape mirrors ``AttackersResponse`` so the frontend
can reuse the same row component as the main attackers list.
Empty result while the clusterer hasn't linked any observations yet.
"""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/identities/{uuid}/observations",
tags=["Identity Resolution"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Identity not found"},
},
)
@_traced("api.list_identity_observations")
async def list_identity_observations(
uuid: str,
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0, le=2147483647),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
# 404 if the identity itself doesn't exist. Otherwise return the
# observations linked to it (which may be empty — a freshly-formed
# identity briefly has no observations yet from the FK side).
identity = await repo.get_identity_by_uuid(uuid)
if not identity:
raise HTTPException(status_code=404, detail="Identity not found")
# If the requested uuid was merged, return observations under the
# winner's uuid (which is what get_identity_by_uuid resolves to).
canonical_uuid = identity["uuid"]
data = await repo.list_observations_for_identity(
canonical_uuid, limit=limit, offset=offset
)
total = await repo.count_observations_for_identity(canonical_uuid)
return {"total": total, "limit": limit, "offset": offset, "data": data}

View File

@@ -55,6 +55,17 @@ class DummyRepo(BaseRepository):
async def get_attacker_artifacts(self, uuid): await super().get_attacker_artifacts(uuid) async def get_attacker_artifacts(self, uuid): await super().get_attacker_artifacts(uuid)
async def get_attacker_transcripts(self, uuid): await super().get_attacker_transcripts(uuid) async def get_attacker_transcripts(self, uuid): await super().get_attacker_transcripts(uuid)
async def get_session_log(self, sid): await super().get_session_log(sid) async def get_session_log(self, sid): await super().get_session_log(sid)
# DEBT-041 / 3eb67c9 — attacker_intel re-key
async def find_credential_reuse_candidates(self, min_targets=2): await super().find_credential_reuse_candidates(min_targets); return []
async def get_attacker_intel_by_uuid(self, u): await super().get_attacker_intel_by_uuid(u)
async def get_unenriched_attackers(self, limit=100): await super().get_unenriched_attackers(limit)
async def upsert_attacker_intel(self, d): await super().upsert_attacker_intel(d); return ""
# Identity resolution (this PR)
async def get_identity_by_uuid(self, u): await super().get_identity_by_uuid(u)
async def list_identities(self, limit=50, offset=0): await super().list_identities(limit, offset); return []
async def count_identities(self): await super().count_identities(); return 0
async def list_observations_for_identity(self, u, limit=50, offset=0): await super().list_observations_for_identity(u, limit, offset); return []
async def count_observations_for_identity(self, u): await super().count_observations_for_identity(u); return 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_base_repo_coverage(): async def test_base_repo_coverage():
@@ -113,6 +124,15 @@ async def test_base_repo_coverage():
await dr.get_attacker_artifacts("a") await dr.get_attacker_artifacts("a")
await dr.get_attacker_transcripts("a") await dr.get_attacker_transcripts("a")
await dr.get_session_log("a") await dr.get_session_log("a")
await dr.find_credential_reuse_candidates()
await dr.get_attacker_intel_by_uuid("a")
await dr.get_unenriched_attackers()
await dr.upsert_attacker_intel({"attacker_uuid": "a", "attacker_ip": "1.1.1.1"})
await dr.get_identity_by_uuid("a")
await dr.list_identities()
await dr.count_identities()
await dr.list_observations_for_identity("a")
await dr.count_observations_for_identity("a")
# Swarm methods: default NotImplementedError on BaseRepository. Covering # Swarm methods: default NotImplementedError on BaseRepository. Covering
# them here keeps the coverage contract honest for the swarm CRUD surface. # them here keeps the coverage contract honest for the swarm CRUD surface.

View File

@@ -0,0 +1,313 @@
"""
Tests for the identity-resolution read API.
The clusterer that populates identities is a separate downstream effort
(see development/IDENTITY_RESOLUTION.md); these tests cover the
read-only API that ships first. The identities table is empty at
deployment time, so the headline cases are:
* GET /identities returns {total: 0, data: []} cleanly
* GET /identities/{uuid} returns 404 cleanly
* GET /identities/{uuid}/observations returns 404 if identity missing
* with seeded data, the routes return what the repository returns
* a soft-merged identity (merged_into_uuid set) resolves to the winner
"""
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import HTTPException
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _identity_row(
uuid: str = "id-uuid-1",
merged_into_uuid: str | None = None,
observation_count: int = 0,
) -> dict:
now = datetime(2026, 4, 26, tzinfo=timezone.utc).isoformat()
return {
"uuid": uuid,
"schema_version": 1,
"campaign_id": None,
"first_seen_at": None,
"last_seen_at": None,
"created_at": now,
"updated_at": now,
"confidence": None,
"observation_count": observation_count,
"ja3_hashes": None,
"hassh_hashes": None,
"payload_simhashes": None,
"c2_endpoints": None,
"kd_digraph_simhash": None,
"merged_into_uuid": merged_into_uuid,
"notes": None,
}
def _observation_row(uuid: str, identity_id: str | None) -> dict:
return {
"uuid": uuid,
"ip": "203.0.113.7",
"identity_id": identity_id,
"first_seen": datetime(2026, 4, 1, tzinfo=timezone.utc).isoformat(),
"last_seen": datetime(2026, 4, 26, tzinfo=timezone.utc).isoformat(),
"event_count": 5,
}
# ─── GET /identities ─────────────────────────────────────────────────────────
class TestListIdentities:
@pytest.mark.asyncio
async def test_empty_table_returns_zero_total(self):
from decnet.web.router.identities.api_list_identities import list_identities
with patch(
"decnet.web.router.identities.api_list_identities.repo"
) as mock_repo:
mock_repo.list_identities = AsyncMock(return_value=[])
mock_repo.count_identities = AsyncMock(return_value=0)
result = await list_identities(
limit=50, offset=0, user={"uuid": "u", "role": "viewer"}
)
assert result == {"total": 0, "limit": 50, "offset": 0, "data": []}
@pytest.mark.asyncio
async def test_returns_seeded_data(self):
from decnet.web.router.identities.api_list_identities import list_identities
rows = [_identity_row(f"id-{n}") for n in range(3)]
with patch(
"decnet.web.router.identities.api_list_identities.repo"
) as mock_repo:
mock_repo.list_identities = AsyncMock(return_value=rows)
mock_repo.count_identities = AsyncMock(return_value=3)
result = await list_identities(
limit=50, offset=0, user={"uuid": "u", "role": "viewer"}
)
assert result["total"] == 3
assert [r["uuid"] for r in result["data"]] == ["id-0", "id-1", "id-2"]
@pytest.mark.asyncio
async def test_pagination_args_forwarded(self):
from decnet.web.router.identities.api_list_identities import list_identities
with patch(
"decnet.web.router.identities.api_list_identities.repo"
) as mock_repo:
mock_repo.list_identities = AsyncMock(return_value=[])
mock_repo.count_identities = AsyncMock(return_value=0)
await list_identities(
limit=10, offset=20, user={"uuid": "u", "role": "viewer"}
)
mock_repo.list_identities.assert_awaited_once_with(limit=10, offset=20)
# ─── GET /identities/{uuid} ──────────────────────────────────────────────────
class TestGetIdentityDetail:
@pytest.mark.asyncio
async def test_404_on_missing_uuid(self):
from decnet.web.router.identities.api_get_identity_detail import (
get_identity_detail,
)
with patch(
"decnet.web.router.identities.api_get_identity_detail.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=None)
with pytest.raises(HTTPException) as exc:
await get_identity_detail(
uuid="ghost", user={"uuid": "u", "role": "viewer"}
)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_returns_identity_with_live_observation_count(self):
from decnet.web.router.identities.api_get_identity_detail import (
get_identity_detail,
)
identity = _identity_row("id-real", observation_count=2)
with patch(
"decnet.web.router.identities.api_get_identity_detail.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=identity)
# Live count overrides the (potentially stale) denormalized
# observation_count on the row.
mock_repo.count_observations_for_identity = AsyncMock(return_value=5)
result = await get_identity_detail(
uuid="id-real", user={"uuid": "u", "role": "viewer"}
)
assert result["uuid"] == "id-real"
assert result["observation_count_live"] == 5
# Original denormalized count is preserved on the row.
assert result["observation_count"] == 2
# ─── GET /identities/{uuid}/observations ─────────────────────────────────────
class TestListIdentityObservations:
@pytest.mark.asyncio
async def test_404_when_identity_missing(self):
from decnet.web.router.identities.api_list_identity_observations import (
list_identity_observations,
)
with patch(
"decnet.web.router.identities.api_list_identity_observations.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=None)
with pytest.raises(HTTPException) as exc:
await list_identity_observations(
uuid="ghost",
limit=50,
offset=0,
user={"uuid": "u", "role": "viewer"},
)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_returns_observations_for_existing_identity(self):
from decnet.web.router.identities.api_list_identity_observations import (
list_identity_observations,
)
identity = _identity_row("id-real")
observations = [
_observation_row("att-1", identity_id="id-real"),
_observation_row("att-2", identity_id="id-real"),
]
with patch(
"decnet.web.router.identities.api_list_identity_observations.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=identity)
mock_repo.list_observations_for_identity = AsyncMock(
return_value=observations
)
mock_repo.count_observations_for_identity = AsyncMock(return_value=2)
result = await list_identity_observations(
uuid="id-real",
limit=50,
offset=0,
user={"uuid": "u", "role": "viewer"},
)
assert result["total"] == 2
assert [r["uuid"] for r in result["data"]] == ["att-1", "att-2"]
@pytest.mark.asyncio
async def test_merged_uuid_resolves_to_winners_observations(self):
"""
When the user requests observations for a soft-merged identity,
get_identity_by_uuid follows the merged_into chain and returns
the winner. The endpoint MUST list observations under the
winner's UUID, not the loser's. Otherwise an operator linking
through cached merge events sees an empty page.
"""
from decnet.web.router.identities.api_list_identity_observations import (
list_identity_observations,
)
# Repo returns the WINNER row even though we asked for the loser's uuid.
winner = _identity_row("id-winner")
with patch(
"decnet.web.router.identities.api_list_identity_observations.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=winner)
mock_repo.list_observations_for_identity = AsyncMock(return_value=[])
mock_repo.count_observations_for_identity = AsyncMock(return_value=0)
await list_identity_observations(
uuid="id-loser",
limit=50,
offset=0,
user={"uuid": "u", "role": "viewer"},
)
# Critical assertion: list_observations_for_identity is called
# with the winner's UUID, not the requested (loser's) one.
mock_repo.list_observations_for_identity.assert_awaited_once_with(
"id-winner", limit=50, offset=0
)
# ─── Repo-level integration: empty schema returns expected shapes ────────────
@pytest.mark.asyncio
async def test_repo_methods_against_empty_schema(tmp_path):
"""
With a freshly initialized SQLite database (no rows), every read
method returns the expected empty/None response. Smoke-tests the
repository layer without going through the FastAPI route layer.
"""
from decnet.web.db.sqlite.repository import SQLiteRepository
from decnet.web.db.sqlite.database import init_db
db_path = str(tmp_path / "ids.db")
init_db(db_path)
repo = SQLiteRepository(db_path=db_path)
assert await repo.list_identities(limit=50, offset=0) == []
assert await repo.count_identities() == 0
assert await repo.get_identity_by_uuid("anything") is None
assert await repo.list_observations_for_identity("anything") == []
assert await repo.count_observations_for_identity("anything") == 0
@pytest.mark.asyncio
async def test_repo_follows_merged_into_chain(tmp_path):
"""
get_identity_by_uuid must transparently follow merged_into_uuid to
surface the canonical winner. This is the contract the endpoint
relies on for soft-merged identity resolution.
"""
from decnet.web.db.models import AttackerIdentity
from decnet.web.db.sqlite.database import init_db
from decnet.web.db.sqlite.repository import SQLiteRepository
from sqlmodel import Session
from decnet.web.db.sqlite.database import get_sync_engine
db_path = str(tmp_path / "merge.db")
init_db(db_path)
# Insert two identities via direct SQL: a winner and a loser whose
# merged_into_uuid points at the winner.
engine = get_sync_engine(db_path)
with Session(engine) as session:
winner = AttackerIdentity(uuid="winner-uuid")
loser = AttackerIdentity(uuid="loser-uuid", merged_into_uuid="winner-uuid")
session.add(winner)
session.add(loser)
session.commit()
repo = SQLiteRepository(db_path=db_path)
resolved = await repo.get_identity_by_uuid("loser-uuid")
assert resolved is not None
assert resolved["uuid"] == "winner-uuid", (
"get_identity_by_uuid must follow merged_into_uuid to the winner"
)
# And the winner queried directly resolves to itself.
direct = await repo.get_identity_by_uuid("winner-uuid")
assert direct["uuid"] == "winner-uuid"
assert direct["merged_into_uuid"] is None