diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 4b52bda1..207dbc10 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -198,6 +198,16 @@ class BaseRepository(ABC): """Return `file_captured` log rows for this attacker, newest first.""" pass + @abstractmethod + async def get_attacker_transcripts(self, uuid: str) -> list[dict[str, Any]]: + """Return `session_recorded` log rows for this attacker, newest first.""" + pass + + @abstractmethod + async def get_session_log(self, sid: str) -> Optional[dict[str, Any]]: + """Look up the `session_recorded` Log row for a given session UUID.""" + pass + # ------------------------------------------------------------- swarm # Swarm methods have default no-op / empty implementations so existing # subclasses and non-swarm deployments continue to work without change. diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index ad88556f..059558cd 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -769,6 +769,48 @@ class SQLModelRepository(BaseRepository): ) return [r.model_dump(mode="json") for r in rows.scalars().all()] + async def get_session_log(self, sid: str) -> Optional[dict[str, Any]]: + """Look up the `session_recorded` Log row that owns a given sid. + + sid is a v4 UUID embedded in the row's ``fields`` JSON blob. Matched + with LIKE on the textual sid substring — cheap given the bounded + cardinality of session_recorded rows vs. the full logs table. + """ + needle = f'"sid":"{sid}"' + async with self._session() as session: + rows = await session.execute( + select(Log) + .where(Log.event_type == "session_recorded") + .where(Log.fields.contains(needle)) + .limit(1) + ) + row = rows.scalars().first() + return row.model_dump(mode="json") if row else None + + async def get_attacker_transcripts(self, uuid: str) -> list[dict[str, Any]]: + """Return `session_recorded` logs for the attacker identified by UUID. + + Mirror of :meth:`get_attacker_artifacts` — sessions ride in the same + Log table with event_type=session_recorded; the ingester decodes the + RFC 5424 SD fields (sid, service, decky, src_ip, duration_s, bytes, + truncated, shard_path) into the returned ``fields`` blob. + """ + async with self._session() as session: + ip_res = await session.execute( + select(Attacker.ip).where(Attacker.uuid == uuid) + ) + ip = ip_res.scalar_one_or_none() + if not ip: + return [] + rows = await session.execute( + select(Log) + .where(Log.attacker_ip == ip) + .where(Log.event_type == "session_recorded") + .order_by(desc(Log.timestamp)) + .limit(200) + ) + return [r.model_dump(mode="json") for r in rows.scalars().all()] + # ------------------------------------------------------------- swarm async def add_swarm_host(self, data: dict[str, Any]) -> None: diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index dca36ce0..5fb6d993 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -15,6 +15,8 @@ from .attackers.api_get_attackers import router as attackers_router from .attackers.api_get_attacker_detail import router as attacker_detail_router from .attackers.api_get_attacker_commands import router as attacker_commands_router from .attackers.api_get_attacker_artifacts import router as attacker_artifacts_router +from .attackers.api_get_attacker_transcripts import router as attacker_transcripts_router +from .transcripts import transcripts_router from .config.api_get_config import router as config_get_router from .config.api_update_config import router as config_update_router from .config.api_manage_users import router as config_users_router @@ -61,6 +63,7 @@ api_router.include_router(attackers_router) api_router.include_router(attacker_detail_router) api_router.include_router(attacker_commands_router) api_router.include_router(attacker_artifacts_router) +api_router.include_router(attacker_transcripts_router) # Observability api_router.include_router(stats_router) @@ -76,6 +79,9 @@ api_router.include_router(config_reinit_router) # Artifacts (captured attacker file drops) api_router.include_router(artifacts_router) +# Transcripts (PTY session recordings, paged asciinema events) +api_router.include_router(transcripts_router) + # Remote Updates (dashboard → worker updater daemons) api_router.include_router(swarm_updates_router) diff --git a/decnet/web/router/attackers/api_get_attacker_transcripts.py b/decnet/web/router/attackers/api_get_attacker_transcripts.py new file mode 100644 index 00000000..359734e5 --- /dev/null +++ b/decnet/web/router/attackers/api_get_attacker_transcripts.py @@ -0,0 +1,34 @@ +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import require_viewer, repo + +router = APIRouter() + + +@router.get( + "/attackers/{uuid}/transcripts", + tags=["Attacker Profiles"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Attacker not found"}, + }, +) +@_traced("api.get_attacker_transcripts") +async def get_attacker_transcripts( + uuid: str, + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + """List PTY session recordings for an attacker (newest first). + + Each entry is a `session_recorded` log row — the frontend lists them + in the AttackerDetail Sessions tab and opens SessionDrawer on click. + """ + attacker = await repo.get_attacker_by_uuid(uuid) + if not attacker: + raise HTTPException(status_code=404, detail="Attacker not found") + rows = await repo.get_attacker_transcripts(uuid) + return {"total": len(rows), "data": rows} diff --git a/decnet/web/router/transcripts/__init__.py b/decnet/web/router/transcripts/__init__.py new file mode 100644 index 00000000..260aff54 --- /dev/null +++ b/decnet/web/router/transcripts/__init__.py @@ -0,0 +1,6 @@ +from fastapi import APIRouter + +from .api_get_transcript import router as transcript_router + +transcripts_router = APIRouter() +transcripts_router.include_router(transcript_router) diff --git a/decnet/web/router/transcripts/api_get_transcript.py b/decnet/web/router/transcripts/api_get_transcript.py new file mode 100644 index 00000000..1a31bca1 --- /dev/null +++ b/decnet/web/router/transcripts/api_get_transcript.py @@ -0,0 +1,189 @@ +""" +Paged asciinema v2 transcript endpoint. + +Transcripts are stored as one JSONL day-shard per (decky, UTC day) under + /var/lib/decnet/artifacts/{decky}/{service}/transcripts/sessions-YYYY-MM-DD.jsonl +Each line carries a ``sid`` tag; multiple concurrent sessions interleave into +the same shard (O_APPEND + sub-PIPE_BUF writes keep lines atomic — see +decnet/templates/_shared/sessrec/sessrec.c for the guarantee). + +Rather than scanning the whole shard on every request, the first hit for a +given (shard path, mtime) builds an in-memory index of ``sid → [byte offsets]`` +by one pass. Subsequent paged reads pread() exact line slices in O(limit). +Index is bounded by the disk-free precheck (< 200 MB free → no recording) +and the 10 MB per-session cap. +""" + +from __future__ import annotations + +import json +import os +import re +from collections import OrderedDict +from pathlib import Path +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Query + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import require_admin, repo + +router = APIRouter() + +ARTIFACTS_ROOT = Path(os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts")) + +_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$") +_SID_RE = re.compile(r"^[a-f0-9-]{36}$") +_SERVICE_RE = re.compile(r"^(ssh|telnet)$") +# Shard filename is built by sessrec from UTC date — keep the charset tight +# so a forged shard_path in the Log row can't traverse. +_SHARD_BASENAME_RE = re.compile(r"^sessions-\d{4}-\d{2}-\d{2}\.jsonl$") + +# (path, mtime_ns) → {sid: [(offset, length), ...]} +_INDEX_CACHE: "OrderedDict[tuple[str, int], dict[str, list[tuple[int, int]]]]" = OrderedDict() +_CACHE_MAX = 32 + + +def _get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]: + st = path.stat() + key = (str(path), st.st_mtime_ns) + if key in _INDEX_CACHE: + _INDEX_CACHE.move_to_end(key) + return _INDEX_CACHE[key], st.st_size + index: dict[str, list[tuple[int, int]]] = {} + with path.open("rb") as f: + offset = 0 + for line in f: + length = len(line) + # Fast sid extract: look for `"sid":"<36 chars>"` prefix — every + # sessrec line starts with that field (see emit_*). + try: + m = re.search(rb'"sid":"([a-f0-9-]{36})"', line) + except re.error: + m = None + if m: + sid = m.group(1).decode("ascii") + index.setdefault(sid, []).append((offset, length)) + offset += length + _INDEX_CACHE[key] = index + _INDEX_CACHE.move_to_end(key) + while len(_INDEX_CACHE) > _CACHE_MAX: + _INDEX_CACHE.popitem(last=False) + return index, st.st_size + + +def _resolve_shard(decky: str, service: str, shard_name: str) -> Path: + if not _DECKY_RE.fullmatch(decky): + raise HTTPException(status_code=400, detail="invalid decky name") + if not _SERVICE_RE.fullmatch(service): + raise HTTPException(status_code=400, detail="invalid service") + if not _SHARD_BASENAME_RE.fullmatch(shard_name): + raise HTTPException(status_code=400, detail="invalid shard name") + root = ARTIFACTS_ROOT.resolve() + candidate = (root / decky / service / "transcripts" / shard_name).resolve() + if root not in candidate.parents and candidate != root: + raise HTTPException(status_code=400, detail="path escapes artifacts root") + return candidate + + +@router.get( + "/transcripts/{decky}/{sid}", + tags=["Transcripts"], + responses={ + 400: {"description": "Invalid decky or sid parameter"}, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Admin access required"}, + 404: {"description": "Transcript not found"}, + }, +) +@_traced("api.get_transcript") +async def get_transcript( + decky: str, + sid: str, + offset: int = Query(0, ge=0), + limit: int = Query(500, ge=1, le=5000), + admin: dict = Depends(require_admin), +) -> dict[str, Any]: + if not _DECKY_RE.fullmatch(decky): + raise HTTPException(status_code=400, detail="invalid decky name") + if not _SID_RE.fullmatch(sid): + raise HTTPException(status_code=400, detail="invalid sid") + + log = await repo.get_session_log(sid) + if not log: + raise HTTPException(status_code=404, detail="session not found") + + try: + fields = json.loads(log.get("fields") or "{}") + except (ValueError, TypeError): + fields = {} + + service = fields.get("service") or log.get("service") + shard_path_field = fields.get("shard_path") or "" + shard_name = Path(shard_path_field).name + log_decky = log.get("decky") or fields.get("decky") + + if log_decky and log_decky != decky: + raise HTTPException(status_code=404, detail="session not found") + + path = _resolve_shard(decky, service or "", shard_name or "") + if not path.is_file(): + raise HTTPException(status_code=404, detail="transcript not found") + + index, _size = _get_index(path) + lines_meta = index.get(sid) + if not lines_meta: + raise HTTPException(status_code=404, detail="sid not present in shard") + + header: dict[str, Any] = {} + events: list[list[Any]] = [] + truncated = False + + # First pass: find the header line (has "hdr" key) and count events. + # Keep it O(n lines for this sid), not O(shard). + total_events = 0 + event_positions: list[tuple[int, int]] = [] + with path.open("rb") as f: + for off, ln in lines_meta: + f.seek(off) + raw = f.read(ln) + try: + obj = json.loads(raw) + except ValueError: + continue + if "hdr" in obj: + header = obj["hdr"] + continue + if obj.get("trunc"): + truncated = True + continue + event_positions.append((off, ln)) + total_events += 1 + + # Page the events window. + window = event_positions[offset:offset + limit] + for off, ln in window: + f.seek(off) + raw = f.read(ln) + try: + obj = json.loads(raw) + except ValueError: + continue + t = obj.get("t") + ch = obj.get("ch") + d = obj.get("d") + if t is None or ch is None or d is None: + continue + events.append([t, ch, d]) + + return { + "sid": sid, + "service": service, + "header": header, + "events": events, + "offset": offset, + "limit": limit, + "total": total_events, + "has_more": (offset + limit) < total_events, + "truncated": truncated, + } diff --git a/tests/test_base_repo.py b/tests/test_base_repo.py index ac23fea1..c84d6db4 100644 --- a/tests/test_base_repo.py +++ b/tests/test_base_repo.py @@ -38,6 +38,8 @@ class DummyRepo(BaseRepository): async def update_user_role(self, u, r): await super().update_user_role(u, r) async def purge_logs_and_bounties(self): await super().purge_logs_and_bounties() async def get_attacker_artifacts(self, uuid): await super().get_attacker_artifacts(uuid) + async def get_attacker_transcripts(self, uuid): await super().get_attacker_transcripts(uuid) + async def get_session_log(self, sid): await super().get_session_log(sid) @pytest.mark.asyncio async def test_base_repo_coverage(): @@ -75,6 +77,8 @@ async def test_base_repo_coverage(): await dr.update_user_role("a", "admin") await dr.purge_logs_and_bounties() await dr.get_attacker_artifacts("a") + await dr.get_attacker_transcripts("a") + await dr.get_session_log("a") # Swarm methods: default NotImplementedError on BaseRepository. Covering # them here keeps the coverage contract honest for the swarm CRUD surface.