test(web): transcripts API + attacker-transcripts router coverage
Paging, truncation surfacing, admin gate, path traversal, sid-regex and
decky-mismatch rejection for /transcripts; mirror coverage for
/attackers/{uuid}/transcripts. Flips the Session Recording box in the
roadmap (sessrec pty relay now shipping end-to-end).
This commit is contained in:
@@ -335,6 +335,64 @@ class TestGetAttackerArtifacts:
|
||||
assert exc_info.value.status_code == 404
|
||||
|
||||
|
||||
# ─── GET /attackers/{uuid}/transcripts ───────────────────────────────────────
|
||||
|
||||
class TestGetAttackerTranscripts:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_transcripts(self):
|
||||
from decnet.web.router.attackers.api_get_attacker_transcripts import get_attacker_transcripts
|
||||
|
||||
sample = _sample_attacker()
|
||||
rows = [
|
||||
{
|
||||
"id": 1,
|
||||
"timestamp": "2026-04-18T02:22:56+00:00",
|
||||
"decky": "decky-01",
|
||||
"service": "ssh",
|
||||
"event_type": "session_recorded",
|
||||
"attacker_ip": "1.2.3.4",
|
||||
"raw_line": "",
|
||||
"msg": "",
|
||||
"fields": json.dumps({
|
||||
"sid": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
|
||||
"service": "ssh",
|
||||
"src_ip": "1.2.3.4",
|
||||
"duration_s": "42",
|
||||
"bytes": "1024",
|
||||
"truncated": "false",
|
||||
"shard_path": "/var/lib/systemd/coredump/transcripts/sessions-2026-04-18.jsonl",
|
||||
}),
|
||||
},
|
||||
]
|
||||
with patch("decnet.web.router.attackers.api_get_attacker_transcripts.repo") as mock_repo:
|
||||
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample)
|
||||
mock_repo.get_attacker_transcripts = AsyncMock(return_value=rows)
|
||||
|
||||
result = await get_attacker_transcripts(
|
||||
uuid="att-uuid-1",
|
||||
user={"uuid": "test-user", "role": "viewer"},
|
||||
)
|
||||
|
||||
assert result["total"] == 1
|
||||
assert result["data"][0]["service"] == "ssh"
|
||||
mock_repo.get_attacker_transcripts.assert_awaited_once_with("att-uuid-1")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_404_on_unknown_uuid(self):
|
||||
from decnet.web.router.attackers.api_get_attacker_transcripts import get_attacker_transcripts
|
||||
|
||||
with patch("decnet.web.router.attackers.api_get_attacker_transcripts.repo") as mock_repo:
|
||||
mock_repo.get_attacker_by_uuid = AsyncMock(return_value=None)
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await get_attacker_transcripts(
|
||||
uuid="nonexistent",
|
||||
user={"uuid": "test-user", "role": "viewer"},
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 404
|
||||
|
||||
|
||||
# ─── Auth enforcement ────────────────────────────────────────────────────────
|
||||
|
||||
class TestAttackersAuth:
|
||||
|
||||
Reference in New Issue
Block a user