""" Paged asciinema v2 transcript endpoint. Transcripts are stored as one JSONL day-shard per (decky, UTC day) under /var/lib/decnet/artifacts/{decky}/{service}/transcripts/sessions-YYYY-MM-DD.jsonl Each line carries a ``sid`` tag; multiple concurrent sessions interleave into the same shard (O_APPEND + sub-PIPE_BUF writes keep lines atomic — see decnet/templates/_shared/sessrec/sessrec.c for the guarantee). Rather than scanning the whole shard on every request, the first hit for a given (shard path, mtime) builds an in-memory index of ``sid → [byte offsets]`` by one pass. Subsequent paged reads pread() exact line slices in O(limit). Index is bounded by the disk-free precheck (< 200 MB free → no recording) and the 10 MB per-session cap. """ from __future__ import annotations import json import os import re from collections import OrderedDict from pathlib import Path from typing import Any from fastapi import APIRouter, Depends, HTTPException, Query from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_admin, repo router = APIRouter() ARTIFACTS_ROOT = Path(os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts")) _DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$") _SID_RE = re.compile(r"^[a-f0-9-]{36}$") _SERVICE_RE = re.compile(r"^(ssh|telnet)$") # Shard filename is built by sessrec from UTC date — keep the charset tight # so a forged shard_path in the Log row can't traverse. _SHARD_BASENAME_RE = re.compile(r"^sessions-\d{4}-\d{2}-\d{2}\.jsonl$") # (path, mtime_ns) → {sid: [(offset, length), ...]} _INDEX_CACHE: "OrderedDict[tuple[str, int], dict[str, list[tuple[int, int]]]]" = OrderedDict() _CACHE_MAX = 32 def _get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]: st = path.stat() key = (str(path), st.st_mtime_ns) if key in _INDEX_CACHE: _INDEX_CACHE.move_to_end(key) return _INDEX_CACHE[key], st.st_size index: dict[str, list[tuple[int, int]]] = {} with path.open("rb") as f: offset = 0 for line in f: length = len(line) # Fast sid extract: look for `"sid":"<36 chars>"` prefix — every # sessrec line starts with that field (see emit_*). try: m = re.search(rb'"sid"\s*:\s*"([a-f0-9-]{36})"', line) except re.error: m = None if m: sid = m.group(1).decode("ascii") index.setdefault(sid, []).append((offset, length)) offset += length _INDEX_CACHE[key] = index _INDEX_CACHE.move_to_end(key) while len(_INDEX_CACHE) > _CACHE_MAX: _INDEX_CACHE.popitem(last=False) return index, st.st_size def _validate_names(decky: str, service: str) -> None: if not _DECKY_RE.fullmatch(decky): raise HTTPException(status_code=400, detail="invalid decky name") if not _SERVICE_RE.fullmatch(service): raise HTTPException(status_code=400, detail="invalid service") def _resolve_shard(decky: str, service: str, shard_name: str) -> Path: _validate_names(decky, service) if not _SHARD_BASENAME_RE.fullmatch(shard_name): raise HTTPException(status_code=400, detail="invalid shard name") root = ARTIFACTS_ROOT.resolve() candidate = (root / decky / service / "transcripts" / shard_name).resolve() if root not in candidate.parents and candidate != root: raise HTTPException(status_code=400, detail="path escapes artifacts root") return candidate def _find_shard_with_sid(decky: str, service: str, sid: str) -> Path | None: """Scan every ``sessions-YYYY-MM-DD.jsonl`` under the decky's transcripts dir until one claims this sid. Fallback for rows where ``fields.shard_path`` is missing (current sessrec.c does not emit it) or for sessions that span UTC midnight (events land in two shards; the emitted SD could only name one). Newest shards first — most transcript lookups are for recent sessions. Result is cached by ``_get_index`` keyed on (path, mtime), so repeated calls are ~free. """ _validate_names(decky, service) root = ARTIFACTS_ROOT.resolve() transcripts_dir = (root / decky / service / "transcripts").resolve() if root not in transcripts_dir.parents: return None # Absent dir, or dir the API process can't stat/read — treat as # "no transcript", not as a 500 traceback. Most commonly the decky # container wrote this tree as a container-side uid that the API # (running under --user / --group) can't cross. try: if not transcripts_dir.is_dir(): return None entries = list(transcripts_dir.iterdir()) except (OSError, PermissionError): return None shards = sorted( (p for p in entries if _SHARD_BASENAME_RE.fullmatch(p.name)), reverse=True, # newest day first ) for shard in shards: try: index, _size = _get_index(shard) except (OSError, PermissionError): continue if sid in index: return shard return None @router.get( "/transcripts/{decky}/{sid}", tags=["Transcripts"], responses={ 400: {"description": "Invalid decky or sid parameter"}, 401: {"description": "Could not validate credentials"}, 403: {"description": "Admin access required"}, 404: {"description": "Transcript not found"}, }, ) @_traced("api.get_transcript") async def get_transcript( decky: str, sid: str, offset: int = Query(0, ge=0), limit: int = Query(500, ge=1, le=5000), admin: dict = Depends(require_admin), ) -> dict[str, Any]: if not _DECKY_RE.fullmatch(decky): raise HTTPException(status_code=400, detail="invalid decky name") if not _SID_RE.fullmatch(sid): raise HTTPException(status_code=400, detail="invalid sid") log = await repo.get_session_log(sid) if not log: raise HTTPException(status_code=404, detail="session not found") try: fields = json.loads(log.get("fields") or "{}") except (ValueError, TypeError): fields = {} service = fields.get("service") or log.get("service") shard_path_field = fields.get("shard_path") or "" shard_name = Path(shard_path_field).name log_decky = log.get("decky") or fields.get("decky") if log_decky and log_decky != decky: raise HTTPException(status_code=404, detail="session not found") # Fast path: the Log row carries a fields.shard_path we can validate # and hit directly. Falls back to scanning all shards when the SD # didn't include one (current sessrec.c doesn't emit shard_path) or # when the named shard isn't on disk anymore. path: Path | None = None if _SHARD_BASENAME_RE.fullmatch(shard_name or ""): candidate = _resolve_shard(decky, service or "", shard_name) if candidate.is_file(): path = candidate if path is None: path = _find_shard_with_sid(decky, service or "", sid) if path is None: raise HTTPException(status_code=404, detail="transcript not found") index, _size = _get_index(path) lines_meta = index.get(sid) if not lines_meta: raise HTTPException(status_code=404, detail="sid not present in shard") header: dict[str, Any] = {} events: list[list[Any]] = [] truncated = False # First pass: find the header line (has "hdr" key) and count events. # Keep it O(n lines for this sid), not O(shard). total_events = 0 event_positions: list[tuple[int, int]] = [] with path.open("rb") as f: for off, ln in lines_meta: f.seek(off) raw = f.read(ln) try: obj = json.loads(raw) except ValueError: continue if "hdr" in obj: header = obj["hdr"] continue if obj.get("trunc"): truncated = True continue event_positions.append((off, ln)) total_events += 1 # Page the events window. window = event_positions[offset:offset + limit] for off, ln in window: f.seek(off) raw = f.read(ln) try: obj = json.loads(raw) except ValueError: continue t = obj.get("t") ch = obj.get("ch") d = obj.get("d") if t is None or ch is None or d is None: continue events.append([t, ch, d]) return { "sid": sid, "service": service, "header": header, "events": events, "offset": offset, "limit": limit, "total": total_events, "has_more": (offset + limit) < total_events, "truncated": truncated, }