Move `_find_shard_with_sid`, `_resolve_shard`, `_validate_names`, `_get_index`, and the index cache from `decnet/web/router/transcripts/api_get_transcript.py` into `decnet/artifacts/shards.py`. The shared module speaks `ValueError`; the router keeps thin wrappers that translate to `HTTPException(400)` so the route's error UX is unchanged. This unblocks the BEHAVE-INTEGRATION Phase 4 worker wiring — the profiler worker (and the collector's session aggregator) need to disk-reach asciinema shards but must not import from a FastAPI router. 11 new unit tests for the shared helper. Existing transcript router tests pass (the shard fixture's monkeypatch points at the shared module's ARTIFACTS_ROOT now).
130 lines
4.5 KiB
Python
130 lines
4.5 KiB
Python
"""Shared asciinema shard helpers.
|
|
|
|
Extracted from ``decnet/web/router/transcripts/api_get_transcript.py``
|
|
so non-router callers (the BEHAVE-SHELL session-ended handler in
|
|
``decnet/profiler/worker.py``, the collector's session aggregator)
|
|
can resolve shard paths without crossing the layer boundary into the
|
|
FastAPI router.
|
|
|
|
Functions here speak in :class:`ValueError` — callers that want HTTP
|
|
semantics translate at the boundary. The router wrappers keep their
|
|
existing ``HTTPException`` behaviour for backwards compatibility.
|
|
|
|
PII boundary unchanged: shards live on disk; this module returns
|
|
:class:`pathlib.Path` pointers, never byte content. The ``_get_index``
|
|
cache stores byte offsets only.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
from collections import OrderedDict
|
|
from pathlib import Path
|
|
|
|
ARTIFACTS_ROOT = Path(
|
|
os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts"),
|
|
)
|
|
|
|
_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
|
|
_SERVICE_RE = re.compile(r"^(ssh|telnet)$")
|
|
_SHARD_BASENAME_RE = re.compile(r"^sessions-\d{4}-\d{2}-\d{2}\.jsonl$")
|
|
_SID_LINE_RE = re.compile(rb'"sid"\s*:\s*"([a-f0-9-]{36})"')
|
|
|
|
# (path, mtime_ns) → {sid: [(offset, length), ...]}
|
|
_INDEX_CACHE: "OrderedDict[tuple[str, int], dict[str, list[tuple[int, int]]]]" = (
|
|
OrderedDict()
|
|
)
|
|
_CACHE_MAX = 32
|
|
|
|
|
|
def validate_names(decky: str, service: str) -> None:
|
|
"""Raise :class:`ValueError` if ``decky`` / ``service`` look forged."""
|
|
if not _DECKY_RE.fullmatch(decky):
|
|
raise ValueError(f"invalid decky name: {decky!r}")
|
|
if not _SERVICE_RE.fullmatch(service):
|
|
raise ValueError(f"invalid service: {service!r}")
|
|
|
|
|
|
def resolve_shard(decky: str, service: str, shard_name: str) -> Path:
|
|
"""Resolve ``ARTIFACTS_ROOT/{decky}/{service}/transcripts/{shard_name}``
|
|
with escape-attempt detection. Raises :class:`ValueError` on
|
|
invalid inputs.
|
|
"""
|
|
validate_names(decky, service)
|
|
if not _SHARD_BASENAME_RE.fullmatch(shard_name):
|
|
raise ValueError(f"invalid shard name: {shard_name!r}")
|
|
root = ARTIFACTS_ROOT.resolve()
|
|
candidate = (root / decky / service / "transcripts" / shard_name).resolve()
|
|
if root not in candidate.parents and candidate != root:
|
|
raise ValueError(f"path escapes artifacts root: {candidate}")
|
|
return candidate
|
|
|
|
|
|
def _build_index(path: Path) -> dict[str, list[tuple[int, int]]]:
|
|
index: dict[str, list[tuple[int, int]]] = {}
|
|
with path.open("rb") as f:
|
|
offset = 0
|
|
for line in f:
|
|
length = len(line)
|
|
m = _SID_LINE_RE.search(line)
|
|
if m:
|
|
sid = m.group(1).decode("ascii")
|
|
index.setdefault(sid, []).append((offset, length))
|
|
offset += length
|
|
return index
|
|
|
|
|
|
def get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]:
|
|
"""Return ``(sid → [(offset, length), …], file_size)``.
|
|
|
|
Cached by ``(path, mtime_ns)``; rebuilt when the shard changes.
|
|
"""
|
|
st = path.stat()
|
|
key = (str(path), st.st_mtime_ns)
|
|
if key in _INDEX_CACHE:
|
|
_INDEX_CACHE.move_to_end(key)
|
|
return _INDEX_CACHE[key], st.st_size
|
|
index = _build_index(path)
|
|
_INDEX_CACHE[key] = index
|
|
_INDEX_CACHE.move_to_end(key)
|
|
while len(_INDEX_CACHE) > _CACHE_MAX:
|
|
_INDEX_CACHE.popitem(last=False)
|
|
return index, st.st_size
|
|
|
|
|
|
def find_shard_with_sid(decky: str, service: str, sid: str) -> Path | None:
|
|
"""Scan every ``sessions-YYYY-MM-DD.jsonl`` under the decky's
|
|
transcripts dir until one claims this ``sid``.
|
|
|
|
Newest shards first — most lookups are for recent sessions. Caches
|
|
the per-shard sid index, so repeated calls are ~free until the
|
|
shard's mtime changes.
|
|
|
|
Returns ``None`` when nothing claims the sid OR when the
|
|
transcripts dir is missing / unreadable. Never raises on
|
|
filesystem-level errors — callers treat ``None`` as "skip".
|
|
"""
|
|
validate_names(decky, service)
|
|
root = ARTIFACTS_ROOT.resolve()
|
|
transcripts_dir = (root / decky / service / "transcripts").resolve()
|
|
if root not in transcripts_dir.parents:
|
|
return None
|
|
try:
|
|
if not transcripts_dir.is_dir():
|
|
return None
|
|
entries = list(transcripts_dir.iterdir())
|
|
except (OSError, PermissionError):
|
|
return None
|
|
shards = sorted(
|
|
(p for p in entries if _SHARD_BASENAME_RE.fullmatch(p.name)),
|
|
reverse=True,
|
|
)
|
|
for shard in shards:
|
|
try:
|
|
index, _size = get_index(shard)
|
|
except (OSError, PermissionError):
|
|
continue
|
|
if sid in index:
|
|
return shard
|
|
return None
|