diff --git a/decnet/artifacts/shards.py b/decnet/artifacts/shards.py new file mode 100644 index 00000000..b8d06957 --- /dev/null +++ b/decnet/artifacts/shards.py @@ -0,0 +1,129 @@ +"""Shared asciinema shard helpers. + +Extracted from ``decnet/web/router/transcripts/api_get_transcript.py`` +so non-router callers (the BEHAVE-SHELL session-ended handler in +``decnet/profiler/worker.py``, the collector's session aggregator) +can resolve shard paths without crossing the layer boundary into the +FastAPI router. + +Functions here speak in :class:`ValueError` — callers that want HTTP +semantics translate at the boundary. The router wrappers keep their +existing ``HTTPException`` behaviour for backwards compatibility. + +PII boundary unchanged: shards live on disk; this module returns +:class:`pathlib.Path` pointers, never byte content. The ``_get_index`` +cache stores byte offsets only. +""" +from __future__ import annotations + +import os +import re +from collections import OrderedDict +from pathlib import Path + +ARTIFACTS_ROOT = Path( + os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts"), +) + +_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$") +_SERVICE_RE = re.compile(r"^(ssh|telnet)$") +_SHARD_BASENAME_RE = re.compile(r"^sessions-\d{4}-\d{2}-\d{2}\.jsonl$") +_SID_LINE_RE = re.compile(rb'"sid"\s*:\s*"([a-f0-9-]{36})"') + +# (path, mtime_ns) → {sid: [(offset, length), ...]} +_INDEX_CACHE: "OrderedDict[tuple[str, int], dict[str, list[tuple[int, int]]]]" = ( + OrderedDict() +) +_CACHE_MAX = 32 + + +def validate_names(decky: str, service: str) -> None: + """Raise :class:`ValueError` if ``decky`` / ``service`` look forged.""" + if not _DECKY_RE.fullmatch(decky): + raise ValueError(f"invalid decky name: {decky!r}") + if not _SERVICE_RE.fullmatch(service): + raise ValueError(f"invalid service: {service!r}") + + +def resolve_shard(decky: str, service: str, shard_name: str) -> Path: + """Resolve ``ARTIFACTS_ROOT/{decky}/{service}/transcripts/{shard_name}`` + with escape-attempt detection. Raises :class:`ValueError` on + invalid inputs. + """ + validate_names(decky, service) + if not _SHARD_BASENAME_RE.fullmatch(shard_name): + raise ValueError(f"invalid shard name: {shard_name!r}") + root = ARTIFACTS_ROOT.resolve() + candidate = (root / decky / service / "transcripts" / shard_name).resolve() + if root not in candidate.parents and candidate != root: + raise ValueError(f"path escapes artifacts root: {candidate}") + return candidate + + +def _build_index(path: Path) -> dict[str, list[tuple[int, int]]]: + index: dict[str, list[tuple[int, int]]] = {} + with path.open("rb") as f: + offset = 0 + for line in f: + length = len(line) + m = _SID_LINE_RE.search(line) + if m: + sid = m.group(1).decode("ascii") + index.setdefault(sid, []).append((offset, length)) + offset += length + return index + + +def get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]: + """Return ``(sid → [(offset, length), …], file_size)``. + + Cached by ``(path, mtime_ns)``; rebuilt when the shard changes. + """ + st = path.stat() + key = (str(path), st.st_mtime_ns) + if key in _INDEX_CACHE: + _INDEX_CACHE.move_to_end(key) + return _INDEX_CACHE[key], st.st_size + index = _build_index(path) + _INDEX_CACHE[key] = index + _INDEX_CACHE.move_to_end(key) + while len(_INDEX_CACHE) > _CACHE_MAX: + _INDEX_CACHE.popitem(last=False) + return index, st.st_size + + +def find_shard_with_sid(decky: str, service: str, sid: str) -> Path | None: + """Scan every ``sessions-YYYY-MM-DD.jsonl`` under the decky's + transcripts dir until one claims this ``sid``. + + Newest shards first — most lookups are for recent sessions. Caches + the per-shard sid index, so repeated calls are ~free until the + shard's mtime changes. + + Returns ``None`` when nothing claims the sid OR when the + transcripts dir is missing / unreadable. Never raises on + filesystem-level errors — callers treat ``None`` as "skip". + """ + validate_names(decky, service) + root = ARTIFACTS_ROOT.resolve() + transcripts_dir = (root / decky / service / "transcripts").resolve() + if root not in transcripts_dir.parents: + return None + try: + if not transcripts_dir.is_dir(): + return None + entries = list(transcripts_dir.iterdir()) + except (OSError, PermissionError): + return None + shards = sorted( + (p for p in entries if _SHARD_BASENAME_RE.fullmatch(p.name)), + reverse=True, + ) + for shard in shards: + try: + index, _size = get_index(shard) + except (OSError, PermissionError): + continue + if sid in index: + return shard + return None diff --git a/decnet/web/router/transcripts/api_get_transcript.py b/decnet/web/router/transcripts/api_get_transcript.py index d534c8fa..b3774da1 100644 --- a/decnet/web/router/transcripts/api_get_transcript.py +++ b/decnet/web/router/transcripts/api_get_transcript.py @@ -17,117 +17,54 @@ and the 10 MB per-session cap. from __future__ import annotations import json -import os import re -from collections import OrderedDict from pathlib import Path from typing import Any from fastapi import APIRouter, Depends, HTTPException, Query +from decnet.artifacts.shards import ( + ARTIFACTS_ROOT as ARTIFACTS_ROOT, # re-export for monkeypatching tests + _SHARD_BASENAME_RE, + find_shard_with_sid as _shared_find_shard_with_sid, + get_index as _get_index, + resolve_shard as _shared_resolve_shard, + validate_names as _shared_validate_names, +) from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_admin, repo router = APIRouter() -ARTIFACTS_ROOT = Path(os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts")) - _DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$") _SID_RE = re.compile(r"^[a-f0-9-]{36}$") -_SERVICE_RE = re.compile(r"^(ssh|telnet)$") -# Shard filename is built by sessrec from UTC date — keep the charset tight -# so a forged shard_path in the Log row can't traverse. -_SHARD_BASENAME_RE = re.compile(r"^sessions-\d{4}-\d{2}-\d{2}\.jsonl$") - -# (path, mtime_ns) → {sid: [(offset, length), ...]} -_INDEX_CACHE: "OrderedDict[tuple[str, int], dict[str, list[tuple[int, int]]]]" = OrderedDict() -_CACHE_MAX = 32 - - -def _get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]: - st = path.stat() - key = (str(path), st.st_mtime_ns) - if key in _INDEX_CACHE: - _INDEX_CACHE.move_to_end(key) - return _INDEX_CACHE[key], st.st_size - index: dict[str, list[tuple[int, int]]] = {} - with path.open("rb") as f: - offset = 0 - for line in f: - length = len(line) - # Fast sid extract: look for `"sid":"<36 chars>"` prefix — every - # sessrec line starts with that field (see emit_*). - try: - m = re.search(rb'"sid"\s*:\s*"([a-f0-9-]{36})"', line) - except re.error: - m = None - if m: - sid = m.group(1).decode("ascii") - index.setdefault(sid, []).append((offset, length)) - offset += length - _INDEX_CACHE[key] = index - _INDEX_CACHE.move_to_end(key) - while len(_INDEX_CACHE) > _CACHE_MAX: - _INDEX_CACHE.popitem(last=False) - return index, st.st_size def _validate_names(decky: str, service: str) -> None: - if not _DECKY_RE.fullmatch(decky): - raise HTTPException(status_code=400, detail="invalid decky name") - if not _SERVICE_RE.fullmatch(service): - raise HTTPException(status_code=400, detail="invalid service") + """Router-level wrapper: translate ValueError → HTTPException(400).""" + try: + _shared_validate_names(decky, service) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) def _resolve_shard(decky: str, service: str, shard_name: str) -> Path: - _validate_names(decky, service) - if not _SHARD_BASENAME_RE.fullmatch(shard_name): - raise HTTPException(status_code=400, detail="invalid shard name") - root = ARTIFACTS_ROOT.resolve() - candidate = (root / decky / service / "transcripts" / shard_name).resolve() - if root not in candidate.parents and candidate != root: - raise HTTPException(status_code=400, detail="path escapes artifacts root") - return candidate + try: + return _shared_resolve_shard(decky, service, shard_name) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) def _find_shard_with_sid(decky: str, service: str, sid: str) -> Path | None: - """Scan every ``sessions-YYYY-MM-DD.jsonl`` under the decky's transcripts - dir until one claims this sid. + """Router-level wrapper around the shared helper. - Fallback for rows where ``fields.shard_path`` is missing (current - sessrec.c does not emit it) or for sessions that span UTC midnight - (events land in two shards; the emitted SD could only name one). - Newest shards first — most transcript lookups are for recent - sessions. Result is cached by ``_get_index`` keyed on - (path, mtime), so repeated calls are ~free. + Translates the ValueError on bad names into HTTPException(400) so + the route handler's existing error UX is preserved. """ - _validate_names(decky, service) - root = ARTIFACTS_ROOT.resolve() - transcripts_dir = (root / decky / service / "transcripts").resolve() - if root not in transcripts_dir.parents: - return None - # Absent dir, or dir the API process can't stat/read — treat as - # "no transcript", not as a 500 traceback. Most commonly the decky - # container wrote this tree as a container-side uid that the API - # (running under --user / --group) can't cross. try: - if not transcripts_dir.is_dir(): - return None - entries = list(transcripts_dir.iterdir()) - except (OSError, PermissionError): - return None - shards = sorted( - (p for p in entries if _SHARD_BASENAME_RE.fullmatch(p.name)), - reverse=True, # newest day first - ) - for shard in shards: - try: - index, _size = _get_index(shard) - except (OSError, PermissionError): - continue - if sid in index: - return shard - return None + return _shared_find_shard_with_sid(decky, service, sid) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) @router.get( diff --git a/tests/api/transcripts/test_get_transcript.py b/tests/api/transcripts/test_get_transcript.py index 62ef13b0..fae17793 100644 --- a/tests/api/transcripts/test_get_transcript.py +++ b/tests/api/transcripts/test_get_transcript.py @@ -66,9 +66,11 @@ def shard(tmp_path, monkeypatch): shard_path = _write_shard(root, _DECKY, "ssh", _SHARD_NAME, [lines_a[0], lines_b[0], lines_a[1], lines_b[1], lines_b[2], lines_a[2]]) + from decnet.artifacts import shards as _shards from decnet.web.router.transcripts import api_get_transcript + monkeypatch.setattr(_shards, "ARTIFACTS_ROOT", root) monkeypatch.setattr(api_get_transcript, "ARTIFACTS_ROOT", root) - api_get_transcript._INDEX_CACHE.clear() + _shards._INDEX_CACHE.clear() return shard_path diff --git a/tests/artifacts/test_shards.py b/tests/artifacts/test_shards.py new file mode 100644 index 00000000..a9188976 --- /dev/null +++ b/tests/artifacts/test_shards.py @@ -0,0 +1,121 @@ +"""Unit tests for ``decnet.artifacts.shards``. + +The router-side wrapper is exercised by +``tests/api/transcripts/test_get_transcript.py``; this module pins +the pure-Python helpers directly so non-router callers (the +profiler worker, the collector) have a tested surface to lean on. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from decnet.artifacts import shards + + +_SID_A = "11111111-2222-3333-4444-555555555555" +_SID_B = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" +_DECKY = "test-decky" +_SERVICE = "ssh" +_SHARD_NAME = "sessions-2026-05-08.jsonl" + + +def _write_shard(root: Path, decky: str, service: str, name: str, lines: list[dict]) -> Path: + shard_dir = root / decky / service / "transcripts" + shard_dir.mkdir(parents=True, exist_ok=True) + shard = shard_dir / name + with shard.open("w") as f: + for line in lines: + f.write(json.dumps(line) + "\n") + return shard + + +@pytest.fixture +def shard_root(tmp_path, monkeypatch) -> Path: + monkeypatch.setattr(shards, "ARTIFACTS_ROOT", tmp_path) + shards._INDEX_CACHE.clear() + return tmp_path + + +def test_validate_names_accepts_valid() -> None: + shards.validate_names("test-decky", "ssh") + shards.validate_names("d", "telnet") + + +def test_validate_names_rejects_bad_decky() -> None: + with pytest.raises(ValueError, match="invalid decky"): + shards.validate_names("Bad-Decky", "ssh") + + +def test_validate_names_rejects_bad_service() -> None: + with pytest.raises(ValueError, match="invalid service"): + shards.validate_names("d", "http") + + +def test_resolve_shard_happy_path(shard_root) -> None: + p = shards.resolve_shard(_DECKY, _SERVICE, _SHARD_NAME) + expected = (shard_root / _DECKY / _SERVICE / "transcripts" / _SHARD_NAME).resolve() + assert p == expected + + +def test_resolve_shard_rejects_bad_shard_name(shard_root) -> None: + with pytest.raises(ValueError, match="invalid shard name"): + shards.resolve_shard(_DECKY, _SERVICE, "../etc/passwd") + + +def test_find_shard_with_sid_happy_path(shard_root) -> None: + shard = _write_shard( + shard_root, _DECKY, _SERVICE, _SHARD_NAME, + [ + {"sid": _SID_A, "hdr": {}}, + {"sid": _SID_A, "t": 0.0, "ch": "i", "d": "x"}, + {"sid": _SID_B, "t": 0.0, "ch": "o", "d": "y"}, + ], + ) + assert shards.find_shard_with_sid(_DECKY, _SERVICE, _SID_A) == shard + assert shards.find_shard_with_sid(_DECKY, _SERVICE, _SID_B) == shard + + +def test_find_shard_with_sid_returns_none_when_sid_missing(shard_root) -> None: + _write_shard( + shard_root, _DECKY, _SERVICE, _SHARD_NAME, + [{"sid": _SID_A, "hdr": {}}], + ) + other = "ffffffff-eeee-dddd-cccc-bbbbbbbbbbbb" + assert shards.find_shard_with_sid(_DECKY, _SERVICE, other) is None + + +def test_find_shard_with_sid_returns_none_when_dir_missing(shard_root) -> None: + assert shards.find_shard_with_sid(_DECKY, _SERVICE, _SID_A) is None + + +def test_find_shard_with_sid_picks_newest_first(shard_root) -> None: + """Two shards both contain the same sid (mid-night spans). Newest wins.""" + older = _write_shard( + shard_root, _DECKY, _SERVICE, "sessions-2026-05-07.jsonl", + [{"sid": _SID_A, "hdr": {}}], + ) + newer = _write_shard( + shard_root, _DECKY, _SERVICE, "sessions-2026-05-09.jsonl", + [{"sid": _SID_A, "hdr": {}}], + ) + found = shards.find_shard_with_sid(_DECKY, _SERVICE, _SID_A) + assert found == newer + assert found != older + + +def test_find_shard_with_sid_rejects_bad_decky(shard_root) -> None: + with pytest.raises(ValueError): + shards.find_shard_with_sid("Bad-Decky", _SERVICE, _SID_A) + + +def test_get_index_cache_hit_after_first_build(shard_root) -> None: + shard = _write_shard( + shard_root, _DECKY, _SERVICE, _SHARD_NAME, + [{"sid": _SID_A, "hdr": {}}], + ) + idx1, _ = shards.get_index(shard) + idx2, _ = shards.get_index(shard) + assert idx1 is idx2 # same cached object