Files
DECNET/tests/web/test_api_identities.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

315 lines
12 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Tests for the identity-resolution read API.
The clusterer that populates identities is a separate downstream effort
(see development/IDENTITY_RESOLUTION.md); these tests cover the
read-only API that ships first. The identities table is empty at
deployment time, so the headline cases are:
* GET /identities returns {total: 0, data: []} cleanly
* GET /identities/{uuid} returns 404 cleanly
* GET /identities/{uuid}/observations returns 404 if identity missing
* with seeded data, the routes return what the repository returns
* a soft-merged identity (merged_into_uuid set) resolves to the winner
"""
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import HTTPException
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _identity_row(
uuid: str = "id-uuid-1",
merged_into_uuid: str | None = None,
observation_count: int = 0,
) -> dict:
now = datetime(2026, 4, 26, tzinfo=timezone.utc).isoformat()
return {
"uuid": uuid,
"schema_version": 1,
"campaign_id": None,
"first_seen_at": None,
"last_seen_at": None,
"created_at": now,
"updated_at": now,
"confidence": None,
"observation_count": observation_count,
"ja3_hashes": None,
"hassh_hashes": None,
"payload_simhashes": None,
"c2_endpoints": None,
"kd_digraph_simhash": None,
"merged_into_uuid": merged_into_uuid,
"notes": None,
}
def _observation_row(uuid: str, identity_id: str | None) -> dict:
return {
"uuid": uuid,
"ip": "203.0.113.7",
"identity_id": identity_id,
"first_seen": datetime(2026, 4, 1, tzinfo=timezone.utc).isoformat(),
"last_seen": datetime(2026, 4, 26, tzinfo=timezone.utc).isoformat(),
"event_count": 5,
}
# ─── GET /identities ─────────────────────────────────────────────────────────
class TestListIdentities:
@pytest.mark.asyncio
async def test_empty_table_returns_zero_total(self):
from decnet.web.router.identities.api_list_identities import list_identities
with patch(
"decnet.web.router.identities.api_list_identities.repo"
) as mock_repo:
mock_repo.list_identities = AsyncMock(return_value=[])
mock_repo.count_identities = AsyncMock(return_value=0)
result = await list_identities(
limit=50, offset=0, user={"uuid": "u", "role": "viewer"}
)
assert result == {"total": 0, "limit": 50, "offset": 0, "data": []}
@pytest.mark.asyncio
async def test_returns_seeded_data(self):
from decnet.web.router.identities.api_list_identities import list_identities
rows = [_identity_row(f"id-{n}") for n in range(3)]
with patch(
"decnet.web.router.identities.api_list_identities.repo"
) as mock_repo:
mock_repo.list_identities = AsyncMock(return_value=rows)
mock_repo.count_identities = AsyncMock(return_value=3)
result = await list_identities(
limit=50, offset=0, user={"uuid": "u", "role": "viewer"}
)
assert result["total"] == 3
assert [r["uuid"] for r in result["data"]] == ["id-0", "id-1", "id-2"]
@pytest.mark.asyncio
async def test_pagination_args_forwarded(self):
from decnet.web.router.identities.api_list_identities import list_identities
with patch(
"decnet.web.router.identities.api_list_identities.repo"
) as mock_repo:
mock_repo.list_identities = AsyncMock(return_value=[])
mock_repo.count_identities = AsyncMock(return_value=0)
await list_identities(
limit=10, offset=20, user={"uuid": "u", "role": "viewer"}
)
mock_repo.list_identities.assert_awaited_once_with(limit=10, offset=20)
# ─── GET /identities/{uuid} ──────────────────────────────────────────────────
class TestGetIdentityDetail:
@pytest.mark.asyncio
async def test_404_on_missing_uuid(self):
from decnet.web.router.identities.api_get_identity_detail import (
get_identity_detail,
)
with patch(
"decnet.web.router.identities.api_get_identity_detail.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=None)
with pytest.raises(HTTPException) as exc:
await get_identity_detail(
uuid="ghost", user={"uuid": "u", "role": "viewer"}
)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_returns_identity_with_live_observation_count(self):
from decnet.web.router.identities.api_get_identity_detail import (
get_identity_detail,
)
identity = _identity_row("id-real", observation_count=2)
with patch(
"decnet.web.router.identities.api_get_identity_detail.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=identity)
# Live count overrides the (potentially stale) denormalized
# observation_count on the row.
mock_repo.count_observations_for_identity = AsyncMock(return_value=5)
result = await get_identity_detail(
uuid="id-real", user={"uuid": "u", "role": "viewer"}
)
assert result["uuid"] == "id-real"
assert result["observation_count_live"] == 5
# Original denormalized count is preserved on the row.
assert result["observation_count"] == 2
# ─── GET /identities/{uuid}/observations ─────────────────────────────────────
class TestListIdentityObservations:
@pytest.mark.asyncio
async def test_404_when_identity_missing(self):
from decnet.web.router.identities.api_list_identity_observations import (
list_identity_observations,
)
with patch(
"decnet.web.router.identities.api_list_identity_observations.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=None)
with pytest.raises(HTTPException) as exc:
await list_identity_observations(
uuid="ghost",
limit=50,
offset=0,
user={"uuid": "u", "role": "viewer"},
)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_returns_observations_for_existing_identity(self):
from decnet.web.router.identities.api_list_identity_observations import (
list_identity_observations,
)
identity = _identity_row("id-real")
observations = [
_observation_row("att-1", identity_id="id-real"),
_observation_row("att-2", identity_id="id-real"),
]
with patch(
"decnet.web.router.identities.api_list_identity_observations.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=identity)
mock_repo.list_observations_for_identity = AsyncMock(
return_value=observations
)
mock_repo.count_observations_for_identity = AsyncMock(return_value=2)
result = await list_identity_observations(
uuid="id-real",
limit=50,
offset=0,
user={"uuid": "u", "role": "viewer"},
)
assert result["total"] == 2
assert [r["uuid"] for r in result["data"]] == ["att-1", "att-2"]
@pytest.mark.asyncio
async def test_merged_uuid_resolves_to_winners_observations(self):
"""
When the user requests observations for a soft-merged identity,
get_identity_by_uuid follows the merged_into chain and returns
the winner. The endpoint MUST list observations under the
winner's UUID, not the loser's. Otherwise an operator linking
through cached merge events sees an empty page.
"""
from decnet.web.router.identities.api_list_identity_observations import (
list_identity_observations,
)
# Repo returns the WINNER row even though we asked for the loser's uuid.
winner = _identity_row("id-winner")
with patch(
"decnet.web.router.identities.api_list_identity_observations.repo"
) as mock_repo:
mock_repo.get_identity_by_uuid = AsyncMock(return_value=winner)
mock_repo.list_observations_for_identity = AsyncMock(return_value=[])
mock_repo.count_observations_for_identity = AsyncMock(return_value=0)
await list_identity_observations(
uuid="id-loser",
limit=50,
offset=0,
user={"uuid": "u", "role": "viewer"},
)
# Critical assertion: list_observations_for_identity is called
# with the winner's UUID, not the requested (loser's) one.
mock_repo.list_observations_for_identity.assert_awaited_once_with(
"id-winner", limit=50, offset=0
)
# ─── Repo-level integration: empty schema returns expected shapes ────────────
@pytest.mark.asyncio
async def test_repo_methods_against_empty_schema(tmp_path):
"""
With a freshly initialized SQLite database (no rows), every read
method returns the expected empty/None response. Smoke-tests the
repository layer without going through the FastAPI route layer.
"""
from decnet.web.db.sqlite.repository import SQLiteRepository
from decnet.web.db.sqlite.database import init_db
db_path = str(tmp_path / "ids.db")
init_db(db_path)
repo = SQLiteRepository(db_path=db_path)
assert await repo.list_identities(limit=50, offset=0) == []
assert await repo.count_identities() == 0
assert await repo.get_identity_by_uuid("anything") is None
assert await repo.list_observations_for_identity("anything") == []
assert await repo.count_observations_for_identity("anything") == 0
@pytest.mark.asyncio
async def test_repo_follows_merged_into_chain(tmp_path):
"""
get_identity_by_uuid must transparently follow merged_into_uuid to
surface the canonical winner. This is the contract the endpoint
relies on for soft-merged identity resolution.
"""
from decnet.web.db.models import AttackerIdentity
from decnet.web.db.sqlite.database import init_db
from decnet.web.db.sqlite.repository import SQLiteRepository
from sqlmodel import Session
from decnet.web.db.sqlite.database import get_sync_engine
db_path = str(tmp_path / "merge.db")
init_db(db_path)
# Insert two identities via direct SQL: a winner and a loser whose
# merged_into_uuid points at the winner.
engine = get_sync_engine(db_path)
with Session(engine) as session:
winner = AttackerIdentity(uuid="winner-uuid")
loser = AttackerIdentity(uuid="loser-uuid", merged_into_uuid="winner-uuid")
session.add(winner)
session.add(loser)
session.commit()
repo = SQLiteRepository(db_path=db_path)
resolved = await repo.get_identity_by_uuid("loser-uuid")
assert resolved is not None
assert resolved["uuid"] == "winner-uuid", (
"get_identity_by_uuid must follow merged_into_uuid to the winner"
)
# And the winner queried directly resolves to itself.
direct = await repo.get_identity_by_uuid("winner-uuid")
assert direct["uuid"] == "winner-uuid"
assert direct["merged_into_uuid"] is None