New GET /attackers/{uuid}/commands?limit=&offset=&service= endpoint
serves commands with server-side pagination and optional service filter.
AttackerDetail frontend fetches commands from this endpoint with
page controls. Service badge filter now drives both the API query
and the local fingerprint filter.
292 lines
12 KiB
Python
292 lines
12 KiB
Python
"""
|
|
Tests for the attacker profile API routes.
|
|
|
|
Covers:
|
|
- GET /attackers: paginated list, search, sort_by
|
|
- GET /attackers/{uuid}: single profile detail, 404 on missing UUID
|
|
- Auth enforcement on both endpoints
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
from decnet.web.auth import create_access_token
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _auth_request(uuid: str = "test-user-uuid") -> MagicMock:
|
|
token = create_access_token({"uuid": uuid})
|
|
req = MagicMock()
|
|
req.headers = {"Authorization": f"Bearer {token}"}
|
|
return req
|
|
|
|
|
|
def _sample_attacker(uuid: str = "att-uuid-1", ip: str = "1.2.3.4") -> dict:
|
|
return {
|
|
"uuid": uuid,
|
|
"ip": ip,
|
|
"first_seen": datetime(2026, 4, 1, tzinfo=timezone.utc).isoformat(),
|
|
"last_seen": datetime(2026, 4, 10, tzinfo=timezone.utc).isoformat(),
|
|
"event_count": 42,
|
|
"service_count": 3,
|
|
"decky_count": 2,
|
|
"services": ["ssh", "http", "ftp"],
|
|
"deckies": ["decky-01", "decky-02"],
|
|
"traversal_path": "decky-01 → decky-02",
|
|
"is_traversal": True,
|
|
"bounty_count": 5,
|
|
"credential_count": 2,
|
|
"fingerprints": [{"type": "ja3", "hash": "abc"}],
|
|
"commands": [{"service": "ssh", "decky": "decky-01", "command": "id", "timestamp": "2026-04-01T10:00:00"}],
|
|
"updated_at": datetime(2026, 4, 10, tzinfo=timezone.utc).isoformat(),
|
|
}
|
|
|
|
|
|
# ─── GET /attackers ──────────────────────────────────────────────────────────
|
|
|
|
class TestGetAttackers:
|
|
@pytest.mark.asyncio
|
|
async def test_returns_paginated_response(self):
|
|
from decnet.web.router.attackers.api_get_attackers import get_attackers
|
|
|
|
sample = _sample_attacker()
|
|
with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
|
|
mock_repo.get_attackers = AsyncMock(return_value=[sample])
|
|
mock_repo.get_total_attackers = AsyncMock(return_value=1)
|
|
|
|
result = await get_attackers(
|
|
limit=50, offset=0, search=None, sort_by="recent",
|
|
current_user="test-user",
|
|
)
|
|
|
|
assert result["total"] == 1
|
|
assert result["limit"] == 50
|
|
assert result["offset"] == 0
|
|
assert len(result["data"]) == 1
|
|
assert result["data"][0]["uuid"] == "att-uuid-1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_parameter_forwarded(self):
|
|
from decnet.web.router.attackers.api_get_attackers import get_attackers
|
|
|
|
with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
|
|
mock_repo.get_attackers = AsyncMock(return_value=[])
|
|
mock_repo.get_total_attackers = AsyncMock(return_value=0)
|
|
|
|
await get_attackers(
|
|
limit=50, offset=0, search="192.168", sort_by="recent",
|
|
current_user="test-user",
|
|
)
|
|
|
|
mock_repo.get_attackers.assert_awaited_once_with(
|
|
limit=50, offset=0, search="192.168", sort_by="recent", service=None,
|
|
)
|
|
mock_repo.get_total_attackers.assert_awaited_once_with(search="192.168", service=None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_null_search_normalized(self):
|
|
from decnet.web.router.attackers.api_get_attackers import get_attackers
|
|
|
|
with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
|
|
mock_repo.get_attackers = AsyncMock(return_value=[])
|
|
mock_repo.get_total_attackers = AsyncMock(return_value=0)
|
|
|
|
await get_attackers(
|
|
limit=50, offset=0, search="null", sort_by="recent",
|
|
current_user="test-user",
|
|
)
|
|
|
|
mock_repo.get_attackers.assert_awaited_once_with(
|
|
limit=50, offset=0, search=None, sort_by="recent", service=None,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sort_by_active(self):
|
|
from decnet.web.router.attackers.api_get_attackers import get_attackers
|
|
|
|
with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
|
|
mock_repo.get_attackers = AsyncMock(return_value=[])
|
|
mock_repo.get_total_attackers = AsyncMock(return_value=0)
|
|
|
|
await get_attackers(
|
|
limit=50, offset=0, search=None, sort_by="active",
|
|
current_user="test-user",
|
|
)
|
|
|
|
mock_repo.get_attackers.assert_awaited_once_with(
|
|
limit=50, offset=0, search=None, sort_by="active", service=None,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_search_normalized_to_none(self):
|
|
from decnet.web.router.attackers.api_get_attackers import get_attackers
|
|
|
|
with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
|
|
mock_repo.get_attackers = AsyncMock(return_value=[])
|
|
mock_repo.get_total_attackers = AsyncMock(return_value=0)
|
|
|
|
await get_attackers(
|
|
limit=50, offset=0, search="", sort_by="recent",
|
|
current_user="test-user",
|
|
)
|
|
|
|
mock_repo.get_attackers.assert_awaited_once_with(
|
|
limit=50, offset=0, search=None, sort_by="recent", service=None,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_filter_forwarded(self):
|
|
from decnet.web.router.attackers.api_get_attackers import get_attackers
|
|
|
|
with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
|
|
mock_repo.get_attackers = AsyncMock(return_value=[])
|
|
mock_repo.get_total_attackers = AsyncMock(return_value=0)
|
|
|
|
await get_attackers(
|
|
limit=50, offset=0, search=None, sort_by="recent",
|
|
service="https", current_user="test-user",
|
|
)
|
|
|
|
mock_repo.get_attackers.assert_awaited_once_with(
|
|
limit=50, offset=0, search=None, sort_by="recent", service="https",
|
|
)
|
|
mock_repo.get_total_attackers.assert_awaited_once_with(search=None, service="https")
|
|
|
|
|
|
# ─── GET /attackers/{uuid} ───────────────────────────────────────────────────
|
|
|
|
class TestGetAttackerDetail:
|
|
@pytest.mark.asyncio
|
|
async def test_returns_attacker_by_uuid(self):
|
|
from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail
|
|
|
|
sample = _sample_attacker()
|
|
with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo:
|
|
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample)
|
|
|
|
result = await get_attacker_detail(uuid="att-uuid-1", current_user="test-user")
|
|
|
|
assert result["uuid"] == "att-uuid-1"
|
|
assert result["ip"] == "1.2.3.4"
|
|
assert result["is_traversal"] is True
|
|
assert isinstance(result["commands"], list)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_404_on_unknown_uuid(self):
|
|
from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail
|
|
|
|
with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo:
|
|
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=None)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await get_attacker_detail(uuid="nonexistent", current_user="test-user")
|
|
|
|
assert exc_info.value.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deserialized_json_fields(self):
|
|
from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail
|
|
|
|
sample = _sample_attacker()
|
|
with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo:
|
|
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample)
|
|
|
|
result = await get_attacker_detail(uuid="att-uuid-1", current_user="test-user")
|
|
|
|
assert isinstance(result["services"], list)
|
|
assert isinstance(result["deckies"], list)
|
|
assert isinstance(result["fingerprints"], list)
|
|
assert isinstance(result["commands"], list)
|
|
|
|
|
|
# ─── GET /attackers/{uuid}/commands ──────────────────────────────────────────
|
|
|
|
class TestGetAttackerCommands:
|
|
@pytest.mark.asyncio
|
|
async def test_returns_paginated_commands(self):
|
|
from decnet.web.router.attackers.api_get_attacker_commands import get_attacker_commands
|
|
|
|
sample = _sample_attacker()
|
|
cmds = [
|
|
{"service": "ssh", "decky": "decky-01", "command": "id", "timestamp": "2026-04-01T10:00:00"},
|
|
{"service": "ssh", "decky": "decky-01", "command": "whoami", "timestamp": "2026-04-01T10:01:00"},
|
|
]
|
|
with patch("decnet.web.router.attackers.api_get_attacker_commands.repo") as mock_repo:
|
|
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample)
|
|
mock_repo.get_attacker_commands = AsyncMock(return_value={"total": 2, "data": cmds})
|
|
|
|
result = await get_attacker_commands(
|
|
uuid="att-uuid-1", limit=50, offset=0, service=None,
|
|
current_user="test-user",
|
|
)
|
|
|
|
assert result["total"] == 2
|
|
assert len(result["data"]) == 2
|
|
assert result["limit"] == 50
|
|
assert result["offset"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_filter_forwarded(self):
|
|
from decnet.web.router.attackers.api_get_attacker_commands import get_attacker_commands
|
|
|
|
sample = _sample_attacker()
|
|
with patch("decnet.web.router.attackers.api_get_attacker_commands.repo") as mock_repo:
|
|
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample)
|
|
mock_repo.get_attacker_commands = AsyncMock(return_value={"total": 0, "data": []})
|
|
|
|
await get_attacker_commands(
|
|
uuid="att-uuid-1", limit=50, offset=0, service="ssh",
|
|
current_user="test-user",
|
|
)
|
|
|
|
mock_repo.get_attacker_commands.assert_awaited_once_with(
|
|
uuid="att-uuid-1", limit=50, offset=0, service="ssh",
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_404_on_unknown_uuid(self):
|
|
from decnet.web.router.attackers.api_get_attacker_commands import get_attacker_commands
|
|
|
|
with patch("decnet.web.router.attackers.api_get_attacker_commands.repo") as mock_repo:
|
|
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=None)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await get_attacker_commands(
|
|
uuid="nonexistent", limit=50, offset=0, service=None,
|
|
current_user="test-user",
|
|
)
|
|
|
|
assert exc_info.value.status_code == 404
|
|
|
|
|
|
# ─── Auth enforcement ────────────────────────────────────────────────────────
|
|
|
|
class TestAttackersAuth:
|
|
@pytest.mark.asyncio
|
|
async def test_list_requires_auth(self):
|
|
"""get_current_user dependency raises 401 when called without valid token."""
|
|
from decnet.web.dependencies import get_current_user
|
|
|
|
req = MagicMock()
|
|
req.headers = {}
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await get_current_user(req)
|
|
assert exc_info.value.status_code == 401
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detail_requires_auth(self):
|
|
from decnet.web.dependencies import get_current_user
|
|
|
|
req = MagicMock()
|
|
req.headers = {"Authorization": "Bearer bad-token"}
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await get_current_user(req)
|
|
assert exc_info.value.status_code == 401
|