feat(web): read-only /api/v1/identities/* endpoints + repo methods

Second of the five-step identity-resolution substrate. Ships the API
surface against the empty AttackerIdentity table from commit 1 — every
endpoint returns empty/404 cleanly until the clusterer populates rows.

Routes (auth-gated, viewer role):
* GET /api/v1/identities — paginated list, excludes merged-out rows
* GET /api/v1/identities/{uuid} — detail; transparently follows
  merged_into_uuid to surface the canonical winner
* GET /api/v1/identities/{uuid}/observations — Attacker rows FK'd
  to the (resolved) identity uuid

Repository (BaseRepository abstract + SQLModelRepository concrete):
* get_identity_by_uuid (with merge-chain following, hop-bounded)
* list_identities / count_identities (excluding merged-out)
* list_observations_for_identity / count_observations_for_identity

Tests: 12 new (empty-table behavior, seeded data, merge-chain
resolution, repo-level smoke against real SQLite). Also fixes the
pre-existing test_base_repo_coverage failure (DEBT-041 added abstract
methods without updating the DummyRepo stub) — included here because
this PR adds 5 more abstract methods, fixing it as a bonus.

474 db/web/profiler/correlation tests green.
This commit is contained in:
2026-04-26 07:08:55 -04:00
parent 84c1ca9c9b
commit dc3d08dd41
9 changed files with 591 additions and 0 deletions

View File

@@ -364,6 +364,48 @@ class BaseRepository(ABC):
"""Retrieve the total count of attacker profile records, optionally filtered."""
pass
# ─── Identity resolution (Observation → Identity → Campaign) ───────────
# The clusterer that populates these rows is a separate downstream
# effort. The read-only API ships first; until the clusterer runs,
# every method below returns empty/None against an empty table.
# See development/IDENTITY_RESOLUTION.md.
@abstractmethod
async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
"""
Return one ``AttackerIdentity`` row by UUID, or ``None`` if absent.
If the row has ``merged_into_uuid`` set (i.e. the clusterer
soft-merged it into another identity), implementations MUST
follow the chain and return the winner — callers should never
see a merged-out row as the answer to a fresh query.
"""
pass
@abstractmethod
async def list_identities(
self, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
"""Paginated list of identity rows, newest-updated first."""
pass
@abstractmethod
async def count_identities(self) -> int:
"""Total identity rows. Excludes merged-out rows."""
pass
@abstractmethod
async def list_observations_for_identity(
self, identity_uuid: str, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
"""``Attacker`` observation rows linked to the given identity, newest first."""
pass
@abstractmethod
async def count_observations_for_identity(self, identity_uuid: str) -> int:
"""Total ``Attacker`` rows FK'd to this identity."""
pass
@abstractmethod
async def get_attacker_commands(
self,

View File

@@ -36,6 +36,7 @@ from decnet.web.db.models import (
State,
Attacker,
AttackerBehavior,
AttackerIdentity,
AttackerIntel,
SessionProfile,
SmtpTarget,
@@ -1390,6 +1391,83 @@ class SQLModelRepository(BaseRepository):
result = await session.execute(statement)
return result.scalar() or 0
# ─── Identity resolution reads ────────────────────────────────────────
async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
# Follow merged_into_uuid up to the winner. Loop bounded by
# _MAX_MERGE_HOPS so a (hypothetically) corrupted ring can't
# spin the worker. Clusterer is responsible for never producing
# a cycle; this guard is belt-and-braces.
_MAX_MERGE_HOPS = 8
async with self._session() as session:
current_uuid = uuid
for _ in range(_MAX_MERGE_HOPS):
result = await session.execute(
select(AttackerIdentity).where(AttackerIdentity.uuid == current_uuid)
)
identity = result.scalar_one_or_none()
if identity is None:
return None
if identity.merged_into_uuid is None:
return identity.model_dump(mode="json")
current_uuid = identity.merged_into_uuid
# Hit the hop cap — surface what we have rather than recurse.
return identity.model_dump(mode="json")
async def list_identities(
self, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
# Exclude merged-out rows so the list view is the de-duped truth.
# The history is still queryable per-uuid via get_identity_by_uuid
# and a future "merged into" endpoint when we need it.
statement = (
select(AttackerIdentity)
.where(AttackerIdentity.merged_into_uuid.is_(None))
.order_by(desc(AttackerIdentity.updated_at))
.offset(offset)
.limit(limit)
)
async with self._session() as session:
result = await session.execute(statement)
return [i.model_dump(mode="json") for i in result.scalars().all()]
async def count_identities(self) -> int:
statement = (
select(func.count())
.select_from(AttackerIdentity)
.where(AttackerIdentity.merged_into_uuid.is_(None))
)
async with self._session() as session:
result = await session.execute(statement)
return result.scalar() or 0
async def list_observations_for_identity(
self, identity_uuid: str, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
statement = (
select(Attacker)
.where(Attacker.identity_id == identity_uuid)
.order_by(desc(Attacker.last_seen))
.offset(offset)
.limit(limit)
)
async with self._session() as session:
result = await session.execute(statement)
return [
self._deserialize_attacker(a.model_dump(mode="json"))
for a in result.scalars().all()
]
async def count_observations_for_identity(self, identity_uuid: str) -> int:
statement = (
select(func.count())
.select_from(Attacker)
.where(Attacker.identity_id == identity_uuid)
)
async with self._session() as session:
result = await session.execute(statement)
return result.scalar() or 0
async def get_attacker_commands(
self,
uuid: str,

View File

@@ -21,6 +21,9 @@ from .attackers.api_get_attacker_transcripts import router as attacker_transcrip
from .attackers.api_get_attacker_smtp_targets import router as attacker_smtp_targets_router
from .attackers.api_get_attacker_mail import router as attacker_mail_router
from .attackers.api_get_attacker_intel import router as attacker_intel_router
from .identities.api_list_identities import router as identities_list_router
from .identities.api_get_identity_detail import router as identity_detail_router
from .identities.api_list_identity_observations import router as identity_observations_router
from .transcripts import transcripts_router
from .config.api_get_config import router as config_get_router
from .config.api_update_config import router as config_update_router
@@ -84,6 +87,14 @@ api_router.include_router(attacker_smtp_targets_router)
api_router.include_router(attacker_mail_router)
api_router.include_router(attacker_intel_router)
# Identity Resolution (read-only; populated by the clusterer worker —
# see development/IDENTITY_RESOLUTION.md). Empty until the clusterer
# ships; the API surface lands first so frontend + downstream work
# can target a stable shape.
api_router.include_router(identities_list_router)
api_router.include_router(identity_detail_router)
api_router.include_router(identity_observations_router)
# Observability
api_router.include_router(stats_router)
api_router.include_router(stream_router)

View File

View File

@@ -0,0 +1,44 @@
"""GET /api/v1/identities/{uuid} — single identity row.
Soft-merge handling: if the requested UUID has merged_into_uuid set,
the repository follows the chain and returns the winner. Callers always
receive the canonical identity for any UUID that has ever been part of
the merge tree.
Returns 404 against an empty/unknown UUID — expected response while the
clusterer hasn't run yet.
"""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/identities/{uuid}",
tags=["Identity Resolution"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Identity not found"},
},
)
@_traced("api.get_identity_detail")
async def get_identity_detail(
uuid: str,
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
identity = await repo.get_identity_by_uuid(uuid)
if not identity:
raise HTTPException(status_code=404, detail="Identity not found")
# Cheap aggregates the IdentityDetail page surfaces. Counted off the
# FK rather than maintained in observation_count so the answer is
# always live (the denormalized field can lag the clusterer briefly).
identity["observation_count_live"] = await repo.count_observations_for_identity(
identity["uuid"]
)
return identity

View File

@@ -0,0 +1,35 @@
"""GET /api/v1/identities — paginated list of resolved identities.
Returns an empty list while the clusterer hasn't run yet (the
identities table ships empty in the schema-only PR). See
development/IDENTITY_RESOLUTION.md.
"""
from typing import Any
from fastapi import APIRouter, Depends, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/identities",
tags=["Identity Resolution"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
422: {"description": "Validation error"},
},
)
@_traced("api.list_identities")
async def list_identities(
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0, le=2147483647),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
"""Paginated identity list, newest-updated first."""
data = await repo.list_identities(limit=limit, offset=offset)
total = await repo.count_identities()
return {"total": total, "limit": limit, "offset": offset, "data": data}

View File

@@ -0,0 +1,48 @@
"""GET /api/v1/identities/{uuid}/observations — observations for an identity.
Returns the per-IP ``Attacker`` rows whose ``identity_id`` FK points at
this identity. The shape mirrors ``AttackersResponse`` so the frontend
can reuse the same row component as the main attackers list.
Empty result while the clusterer hasn't linked any observations yet.
"""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/identities/{uuid}/observations",
tags=["Identity Resolution"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Identity not found"},
},
)
@_traced("api.list_identity_observations")
async def list_identity_observations(
uuid: str,
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0, le=2147483647),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
# 404 if the identity itself doesn't exist. Otherwise return the
# observations linked to it (which may be empty — a freshly-formed
# identity briefly has no observations yet from the FK side).
identity = await repo.get_identity_by_uuid(uuid)
if not identity:
raise HTTPException(status_code=404, detail="Identity not found")
# If the requested uuid was merged, return observations under the
# winner's uuid (which is what get_identity_by_uuid resolves to).
canonical_uuid = identity["uuid"]
data = await repo.list_observations_for_identity(
canonical_uuid, limit=limit, offset=offset
)
total = await repo.count_observations_for_identity(canonical_uuid)
return {"total": total, "limit": limit, "offset": offset, "data": data}