Files
DECNET/decnet/web/router/transcripts/api_get_transcript.py

244 lines
8.7 KiB
Python

"""
Paged asciinema v2 transcript endpoint.
Transcripts are stored as one JSONL day-shard per (decky, UTC day) under
/var/lib/decnet/artifacts/{decky}/{service}/transcripts/sessions-YYYY-MM-DD.jsonl
Each line carries a ``sid`` tag; multiple concurrent sessions interleave into
the same shard (O_APPEND + sub-PIPE_BUF writes keep lines atomic — see
decnet/templates/_shared/sessrec/sessrec.c for the guarantee).
Rather than scanning the whole shard on every request, the first hit for a
given (shard path, mtime) builds an in-memory index of ``sid → [byte offsets]``
by one pass. Subsequent paged reads pread() exact line slices in O(limit).
Index is bounded by the disk-free precheck (< 200 MB free → no recording)
and the 10 MB per-session cap.
"""
from __future__ import annotations
import json
import os
import re
from collections import OrderedDict
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import require_admin, repo
router = APIRouter()
ARTIFACTS_ROOT = Path(os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts"))
_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
_SID_RE = re.compile(r"^[a-f0-9-]{36}$")
_SERVICE_RE = re.compile(r"^(ssh|telnet)$")
# Shard filename is built by sessrec from UTC date — keep the charset tight
# so a forged shard_path in the Log row can't traverse.
_SHARD_BASENAME_RE = re.compile(r"^sessions-\d{4}-\d{2}-\d{2}\.jsonl$")
# (path, mtime_ns) → {sid: [(offset, length), ...]}
_INDEX_CACHE: "OrderedDict[tuple[str, int], dict[str, list[tuple[int, int]]]]" = OrderedDict()
_CACHE_MAX = 32
def _get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]:
st = path.stat()
key = (str(path), st.st_mtime_ns)
if key in _INDEX_CACHE:
_INDEX_CACHE.move_to_end(key)
return _INDEX_CACHE[key], st.st_size
index: dict[str, list[tuple[int, int]]] = {}
with path.open("rb") as f:
offset = 0
for line in f:
length = len(line)
# Fast sid extract: look for `"sid":"<36 chars>"` prefix — every
# sessrec line starts with that field (see emit_*).
try:
m = re.search(rb'"sid"\s*:\s*"([a-f0-9-]{36})"', line)
except re.error:
m = None
if m:
sid = m.group(1).decode("ascii")
index.setdefault(sid, []).append((offset, length))
offset += length
_INDEX_CACHE[key] = index
_INDEX_CACHE.move_to_end(key)
while len(_INDEX_CACHE) > _CACHE_MAX:
_INDEX_CACHE.popitem(last=False)
return index, st.st_size
def _validate_names(decky: str, service: str) -> None:
if not _DECKY_RE.fullmatch(decky):
raise HTTPException(status_code=400, detail="invalid decky name")
if not _SERVICE_RE.fullmatch(service):
raise HTTPException(status_code=400, detail="invalid service")
def _resolve_shard(decky: str, service: str, shard_name: str) -> Path:
_validate_names(decky, service)
if not _SHARD_BASENAME_RE.fullmatch(shard_name):
raise HTTPException(status_code=400, detail="invalid shard name")
root = ARTIFACTS_ROOT.resolve()
candidate = (root / decky / service / "transcripts" / shard_name).resolve()
if root not in candidate.parents and candidate != root:
raise HTTPException(status_code=400, detail="path escapes artifacts root")
return candidate
def _find_shard_with_sid(decky: str, service: str, sid: str) -> Path | None:
"""Scan every ``sessions-YYYY-MM-DD.jsonl`` under the decky's transcripts
dir until one claims this sid.
Fallback for rows where ``fields.shard_path`` is missing (current
sessrec.c does not emit it) or for sessions that span UTC midnight
(events land in two shards; the emitted SD could only name one).
Newest shards first — most transcript lookups are for recent
sessions. Result is cached by ``_get_index`` keyed on
(path, mtime), so repeated calls are ~free.
"""
_validate_names(decky, service)
root = ARTIFACTS_ROOT.resolve()
transcripts_dir = (root / decky / service / "transcripts").resolve()
if root not in transcripts_dir.parents:
return None
# Absent dir, or dir the API process can't stat/read — treat as
# "no transcript", not as a 500 traceback. Most commonly the decky
# container wrote this tree as a container-side uid that the API
# (running under --user / --group) can't cross.
try:
if not transcripts_dir.is_dir():
return None
entries = list(transcripts_dir.iterdir())
except (OSError, PermissionError):
return None
shards = sorted(
(p for p in entries if _SHARD_BASENAME_RE.fullmatch(p.name)),
reverse=True, # newest day first
)
for shard in shards:
try:
index, _size = _get_index(shard)
except (OSError, PermissionError):
continue
if sid in index:
return shard
return None
@router.get(
"/transcripts/{decky}/{sid}",
tags=["Transcripts"],
responses={
400: {"description": "Invalid decky or sid parameter"},
401: {"description": "Could not validate credentials"},
403: {"description": "Admin access required"},
404: {"description": "Transcript not found"},
},
)
@_traced("api.get_transcript")
async def get_transcript(
decky: str,
sid: str,
offset: int = Query(0, ge=0, le=2147483647),
limit: int = Query(500, ge=1, le=5000),
admin: dict = Depends(require_admin),
) -> dict[str, Any]:
if not _DECKY_RE.fullmatch(decky):
raise HTTPException(status_code=400, detail="invalid decky name")
if not _SID_RE.fullmatch(sid):
raise HTTPException(status_code=400, detail="invalid sid")
log = await repo.get_session_log(sid)
if not log:
raise HTTPException(status_code=404, detail="session not found")
try:
fields = json.loads(log.get("fields") or "{}")
except (ValueError, TypeError):
fields = {}
service = fields.get("service") or log.get("service")
shard_path_field = fields.get("shard_path") or ""
shard_name = Path(shard_path_field).name
log_decky = log.get("decky") or fields.get("decky")
if log_decky and log_decky != decky:
raise HTTPException(status_code=404, detail="session not found")
# Fast path: the Log row carries a fields.shard_path we can validate
# and hit directly. Falls back to scanning all shards when the SD
# didn't include one (current sessrec.c doesn't emit shard_path) or
# when the named shard isn't on disk anymore.
path: Path | None = None
if _SHARD_BASENAME_RE.fullmatch(shard_name or ""):
candidate = _resolve_shard(decky, service or "", shard_name)
if candidate.is_file():
path = candidate
if path is None:
path = _find_shard_with_sid(decky, service or "", sid)
if path is None:
raise HTTPException(status_code=404, detail="transcript not found")
index, _size = _get_index(path)
lines_meta = index.get(sid)
if not lines_meta:
raise HTTPException(status_code=404, detail="sid not present in shard")
header: dict[str, Any] = {}
events: list[list[Any]] = []
truncated = False
# First pass: find the header line (has "hdr" key) and count events.
# Keep it O(n lines for this sid), not O(shard).
total_events = 0
event_positions: list[tuple[int, int]] = []
with path.open("rb") as f:
for off, ln in lines_meta:
f.seek(off)
raw = f.read(ln)
try:
obj = json.loads(raw)
except ValueError:
continue
if "hdr" in obj:
header = obj["hdr"]
continue
if obj.get("trunc"):
truncated = True
continue
event_positions.append((off, ln))
total_events += 1
# Page the events window.
window = event_positions[offset:offset + limit]
for off, ln in window:
f.seek(off)
raw = f.read(ln)
try:
obj = json.loads(raw)
except ValueError:
continue
t = obj.get("t")
ch = obj.get("ch")
d = obj.get("d")
if t is None or ch is None or d is None:
continue
events.append([t, ch, d])
return {
"sid": sid,
"service": service,
"header": header,
"events": events,
"offset": offset,
"limit": limit,
"total": total_events,
"has_more": (offset + limit) < total_events,
"truncated": truncated,
}