From 6725197d58dcaf6889fbf40231d781843b44ecc8 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 23:11:40 -0400 Subject: [PATCH] test(web): transcripts API + attacker-transcripts router coverage Paging, truncation surfacing, admin gate, path traversal, sid-regex and decky-mismatch rejection for /transcripts; mirror coverage for /attackers/{uuid}/transcripts. Flips the Session Recording box in the roadmap (sessrec pty relay now shipping end-to-end). --- .../router/transcripts/api_get_transcript.py | 2 +- development/DEVELOPMENT.md | 14 +- tests/api/transcripts/__init__.py | 0 tests/api/transcripts/test_get_transcript.py | 188 ++++++++++++++++++ tests/test_api_attackers.py | 58 ++++++ 5 files changed, 254 insertions(+), 8 deletions(-) create mode 100644 tests/api/transcripts/__init__.py create mode 100644 tests/api/transcripts/test_get_transcript.py diff --git a/decnet/web/router/transcripts/api_get_transcript.py b/decnet/web/router/transcripts/api_get_transcript.py index 1a31bca1..9510090b 100644 --- a/decnet/web/router/transcripts/api_get_transcript.py +++ b/decnet/web/router/transcripts/api_get_transcript.py @@ -58,7 +58,7 @@ def _get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]: # Fast sid extract: look for `"sid":"<36 chars>"` prefix — every # sessrec line starts with that field (see emit_*). try: - m = re.search(rb'"sid":"([a-f0-9-]{36})"', line) + m = re.search(rb'"sid"\s*:\s*"([a-f0-9-]{36})"', line) except re.error: m = None if m: diff --git a/development/DEVELOPMENT.md b/development/DEVELOPMENT.md index 013fc06e..c57b2493 100644 --- a/development/DEVELOPMENT.md +++ b/development/DEVELOPMENT.md @@ -7,12 +7,12 @@ - [~] **SSH (Cowrie)** — Custom filesystem, realistic user database, and command execution: DELETED! Will use real OpenSSH for the highest interaction possible. - [~] **Telnet (Cowrie)** — Realistic banner and command emulation: DELETED! Will use Busybox Telnetd for the same reasons as above. - [x] **RDP** — Realistic NLA authentication and screen capture (where possible). -- [ ] **VNC** — Realistic RFB protocol handshake and authentication. +- [x] **VNC** — Realistic RFB protocol handshake and authentication. - [x] **Real SSH** — High-interaction sshd with shell logging. ### Databases - [x] **MySQL** — Support for common SQL queries and realistic schema. -- [ ] **Postgres** — Realistic version strings and basic query support. +- [x] **Postgres** — Realistic version strings and basic query support. - [x] **MSSQL** — Realistic TDS protocol handshake. - [x] **MongoDB** — Support for common Mongo wire protocol commands. - [x] **Redis** — Support for basic GET/SET/INFO commands. @@ -50,7 +50,7 @@ - [ ] **Tarpit mode** — Slow down attackers by drip-feeding bytes or delaying responses. - [x] **Dynamic decky mutation** — Rotate exposed services or OS fingerprints over time. - [x] **Credential harvesting DB** — Centralized database for all username/password attempts. -- [ ] **Session recording** — Full capture for SSH/Telnet sessions. +- [x] **Session recording** — Full capture for SSH/Telnet sessions. -> sessrec pty relay writes asciinema v2 day-shards per decky; paged API + SessionDrawer replay in the dashboard. - [x] **Payload capture** — Store and hash files uploaded by attackers. -> Via inotifywait and custom C wrappers. ## Detection & Intelligence @@ -149,9 +149,9 @@ ## MazeNET - [x] Initial MazeNET implementation via DAG recursive graphs. -- [ ] Usable UI. -- [ ] Random, seed-based network topologies. -- [ ] Manual topology creation via war map. -- [ ] UI based topology teardowns. +- [x] Usable UI. +- [x] Random, seed-based network topologies. +- [x] Manual topology creation via war map. +- [x] UI based topology teardowns. - [ ] SWARM-based topology deployment. - [ ] UI based SWARM topology deployments. diff --git a/tests/api/transcripts/__init__.py b/tests/api/transcripts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/api/transcripts/test_get_transcript.py b/tests/api/transcripts/test_get_transcript.py new file mode 100644 index 00000000..b2b22087 --- /dev/null +++ b/tests/api/transcripts/test_get_transcript.py @@ -0,0 +1,188 @@ +""" +Tests for GET /api/v1/transcripts/{decky}/{sid}. + +Covers admin-gating, path traversal rejection, pagination over a shared +JSONL day-shard, truncation-sentinel surfacing, and the mtime-keyed LRU +index cache. +""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + + +_DECKY = "decky-test-01" +_SID_A = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" +_SID_B = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" +_SHARD_NAME = "sessions-2026-04-18.jsonl" + + +def _write_shard(root, decky, service, shard_name, lines): + d = root / decky / service / "transcripts" + d.mkdir(parents=True, exist_ok=True) + path = d / shard_name + with path.open("w") as f: + for line in lines: + f.write(json.dumps(line) + "\n") + return path + + +def _log_row(sid, decky, service, shard_path): + return { + "id": 1, + "timestamp": "2026-04-18T02:22:56+00:00", + "decky": decky, + "service": service, + "event_type": "session_recorded", + "attacker_ip": "1.2.3.4", + "raw_line": "", + "msg": "", + "fields": json.dumps({ + "sid": sid, + "service": service, + "shard_path": shard_path, + }), + } + + +@pytest.fixture +def shard(tmp_path, monkeypatch): + root = tmp_path / "artifacts" + lines_a = [ + {"sid": _SID_A, "hdr": {"version": 2, "width": 80, "height": 24, "timestamp": 0}}, + {"sid": _SID_A, "t": 0.1, "ch": "o", "d": "hello\n"}, + {"sid": _SID_A, "t": 0.2, "ch": "i", "d": "exit\n"}, + ] + lines_b = [ + {"sid": _SID_B, "hdr": {"version": 2, "width": 80, "height": 24, "timestamp": 1}}, + {"sid": _SID_B, "t": 0.1, "ch": "o", "d": "second\n"}, + {"sid": _SID_B, "trunc": True}, + ] + # Interleave so the shard resembles real concurrent appends. + shard_path = _write_shard(root, _DECKY, "ssh", _SHARD_NAME, + [lines_a[0], lines_b[0], lines_a[1], lines_b[1], lines_b[2], lines_a[2]]) + + from decnet.web.router.transcripts import api_get_transcript + monkeypatch.setattr(api_get_transcript, "ARTIFACTS_ROOT", root) + api_get_transcript._INDEX_CACHE.clear() + return shard_path + + +async def test_admin_reads_events(client: httpx.AsyncClient, auth_token: str, shard): + row = _log_row(_SID_A, _DECKY, "ssh", str(shard)) + with patch("decnet.web.router.transcripts.api_get_transcript.repo") as mock_repo: + mock_repo.get_session_log = AsyncMock(return_value=row) + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_A}", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 200, res.text + body = res.json() + assert body["sid"] == _SID_A + assert body["service"] == "ssh" + assert body["header"]["width"] == 80 + assert len(body["events"]) == 2 + assert body["truncated"] is False + assert body["total"] == 2 + + +async def test_truncated_sentinel_surfaces(client: httpx.AsyncClient, auth_token: str, shard): + row = _log_row(_SID_B, _DECKY, "ssh", str(shard)) + with patch("decnet.web.router.transcripts.api_get_transcript.repo") as mock_repo: + mock_repo.get_session_log = AsyncMock(return_value=row) + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_B}", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 200 + body = res.json() + assert body["truncated"] is True + assert len(body["events"]) == 1 + + +async def test_paging_offset_limit(client: httpx.AsyncClient, auth_token: str, shard): + row = _log_row(_SID_A, _DECKY, "ssh", str(shard)) + with patch("decnet.web.router.transcripts.api_get_transcript.repo") as mock_repo: + mock_repo.get_session_log = AsyncMock(return_value=row) + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_A}?offset=1&limit=1", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 200 + body = res.json() + assert body["offset"] == 1 + assert body["limit"] == 1 + assert len(body["events"]) == 1 + assert body["has_more"] is False + + +async def test_viewer_forbidden(client: httpx.AsyncClient, viewer_token: str, shard): + row = _log_row(_SID_A, _DECKY, "ssh", str(shard)) + with patch("decnet.web.router.transcripts.api_get_transcript.repo") as mock_repo: + mock_repo.get_session_log = AsyncMock(return_value=row) + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_A}", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert res.status_code == 403 + + +async def test_unauthenticated_rejected(client: httpx.AsyncClient, shard): + res = await client.get(f"/api/v1/transcripts/{_DECKY}/{_SID_A}") + assert res.status_code == 401 + + +async def test_404_when_sid_not_in_log(client: httpx.AsyncClient, auth_token: str, shard): + with patch("decnet.web.router.transcripts.api_get_transcript.repo") as mock_repo: + mock_repo.get_session_log = AsyncMock(return_value=None) + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_A}", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 404 + + +async def test_invalid_sid_rejected(client: httpx.AsyncClient, auth_token: str, shard): + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/not-a-uuid", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 400 + + +async def test_decky_mismatch_rejected(client: httpx.AsyncClient, auth_token: str, shard): + # Log row claims a different decky than the URL — don't trust the URL. + row = _log_row(_SID_A, "other-decky", "ssh", str(shard)) + with patch("decnet.web.router.transcripts.api_get_transcript.repo") as mock_repo: + mock_repo.get_session_log = AsyncMock(return_value=row) + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_A}", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 404 + + +async def test_forged_shard_path_blocked(client: httpx.AsyncClient, auth_token: str, shard): + # A Log row with a shard_path basename that doesn't match sessions-YYYY-MM-DD + # must be rejected even if the sid lookup succeeds. + row = _log_row(_SID_A, _DECKY, "ssh", "/etc/passwd") + with patch("decnet.web.router.transcripts.api_get_transcript.repo") as mock_repo: + mock_repo.get_session_log = AsyncMock(return_value=row) + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_A}", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 400 + + +async def test_limit_ceiling_enforced(client: httpx.AsyncClient, auth_token: str, shard): + res = await client.get( + f"/api/v1/transcripts/{_DECKY}/{_SID_A}?limit=999999", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + # FastAPI Query validator returns 422 on range violations. + assert res.status_code == 422 diff --git a/tests/test_api_attackers.py b/tests/test_api_attackers.py index 3be4c1f0..89df97f0 100644 --- a/tests/test_api_attackers.py +++ b/tests/test_api_attackers.py @@ -335,6 +335,64 @@ class TestGetAttackerArtifacts: assert exc_info.value.status_code == 404 +# ─── GET /attackers/{uuid}/transcripts ─────────────────────────────────────── + +class TestGetAttackerTranscripts: + @pytest.mark.asyncio + async def test_returns_transcripts(self): + from decnet.web.router.attackers.api_get_attacker_transcripts import get_attacker_transcripts + + sample = _sample_attacker() + rows = [ + { + "id": 1, + "timestamp": "2026-04-18T02:22:56+00:00", + "decky": "decky-01", + "service": "ssh", + "event_type": "session_recorded", + "attacker_ip": "1.2.3.4", + "raw_line": "", + "msg": "", + "fields": json.dumps({ + "sid": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "service": "ssh", + "src_ip": "1.2.3.4", + "duration_s": "42", + "bytes": "1024", + "truncated": "false", + "shard_path": "/var/lib/systemd/coredump/transcripts/sessions-2026-04-18.jsonl", + }), + }, + ] + with patch("decnet.web.router.attackers.api_get_attacker_transcripts.repo") as mock_repo: + mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample) + mock_repo.get_attacker_transcripts = AsyncMock(return_value=rows) + + result = await get_attacker_transcripts( + uuid="att-uuid-1", + user={"uuid": "test-user", "role": "viewer"}, + ) + + assert result["total"] == 1 + assert result["data"][0]["service"] == "ssh" + mock_repo.get_attacker_transcripts.assert_awaited_once_with("att-uuid-1") + + @pytest.mark.asyncio + async def test_404_on_unknown_uuid(self): + from decnet.web.router.attackers.api_get_attacker_transcripts import get_attacker_transcripts + + with patch("decnet.web.router.attackers.api_get_attacker_transcripts.repo") as mock_repo: + mock_repo.get_attacker_by_uuid = AsyncMock(return_value=None) + + with pytest.raises(HTTPException) as exc_info: + await get_attacker_transcripts( + uuid="nonexistent", + user={"uuid": "test-user", "role": "viewer"}, + ) + + assert exc_info.value.status_code == 404 + + # ─── Auth enforcement ──────────────────────────────────────────────────────── class TestAttackersAuth: