feat(web): transcripts API + repository lookups

Adds get_attacker_transcripts (mirror of artifacts for session_recorded
logs) and get_session_log for sid→shard resolution. New
/api/v1/transcripts/{decky}/{sid}?offset=&limit= pages asciinema events
out of the shared JSONL day-shard via an mtime-keyed byte-offset index
— never scans the whole shard per request. New
/api/v1/attackers/{uuid}/transcripts lists sessions for drilldown. Both
endpoints admin-gated.
This commit is contained in:
2026-04-21 23:06:39 -04:00
parent a58d42e492
commit 6e522c5a55
7 changed files with 291 additions and 0 deletions

View File

@@ -198,6 +198,16 @@ class BaseRepository(ABC):
"""Return `file_captured` log rows for this attacker, newest first."""
pass
@abstractmethod
async def get_attacker_transcripts(self, uuid: str) -> list[dict[str, Any]]:
"""Return `session_recorded` log rows for this attacker, newest first."""
pass
@abstractmethod
async def get_session_log(self, sid: str) -> Optional[dict[str, Any]]:
"""Look up the `session_recorded` Log row for a given session UUID."""
pass
# ------------------------------------------------------------- swarm
# Swarm methods have default no-op / empty implementations so existing
# subclasses and non-swarm deployments continue to work without change.

View File

@@ -769,6 +769,48 @@ class SQLModelRepository(BaseRepository):
)
return [r.model_dump(mode="json") for r in rows.scalars().all()]
async def get_session_log(self, sid: str) -> Optional[dict[str, Any]]:
"""Look up the `session_recorded` Log row that owns a given sid.
sid is a v4 UUID embedded in the row's ``fields`` JSON blob. Matched
with LIKE on the textual sid substring — cheap given the bounded
cardinality of session_recorded rows vs. the full logs table.
"""
needle = f'"sid":"{sid}"'
async with self._session() as session:
rows = await session.execute(
select(Log)
.where(Log.event_type == "session_recorded")
.where(Log.fields.contains(needle))
.limit(1)
)
row = rows.scalars().first()
return row.model_dump(mode="json") if row else None
async def get_attacker_transcripts(self, uuid: str) -> list[dict[str, Any]]:
"""Return `session_recorded` logs for the attacker identified by UUID.
Mirror of :meth:`get_attacker_artifacts` — sessions ride in the same
Log table with event_type=session_recorded; the ingester decodes the
RFC 5424 SD fields (sid, service, decky, src_ip, duration_s, bytes,
truncated, shard_path) into the returned ``fields`` blob.
"""
async with self._session() as session:
ip_res = await session.execute(
select(Attacker.ip).where(Attacker.uuid == uuid)
)
ip = ip_res.scalar_one_or_none()
if not ip:
return []
rows = await session.execute(
select(Log)
.where(Log.attacker_ip == ip)
.where(Log.event_type == "session_recorded")
.order_by(desc(Log.timestamp))
.limit(200)
)
return [r.model_dump(mode="json") for r in rows.scalars().all()]
# ------------------------------------------------------------- swarm
async def add_swarm_host(self, data: dict[str, Any]) -> None:

View File

@@ -15,6 +15,8 @@ from .attackers.api_get_attackers import router as attackers_router
from .attackers.api_get_attacker_detail import router as attacker_detail_router
from .attackers.api_get_attacker_commands import router as attacker_commands_router
from .attackers.api_get_attacker_artifacts import router as attacker_artifacts_router
from .attackers.api_get_attacker_transcripts import router as attacker_transcripts_router
from .transcripts import transcripts_router
from .config.api_get_config import router as config_get_router
from .config.api_update_config import router as config_update_router
from .config.api_manage_users import router as config_users_router
@@ -61,6 +63,7 @@ api_router.include_router(attackers_router)
api_router.include_router(attacker_detail_router)
api_router.include_router(attacker_commands_router)
api_router.include_router(attacker_artifacts_router)
api_router.include_router(attacker_transcripts_router)
# Observability
api_router.include_router(stats_router)
@@ -76,6 +79,9 @@ api_router.include_router(config_reinit_router)
# Artifacts (captured attacker file drops)
api_router.include_router(artifacts_router)
# Transcripts (PTY session recordings, paged asciinema events)
api_router.include_router(transcripts_router)
# Remote Updates (dashboard → worker updater daemons)
api_router.include_router(swarm_updates_router)

View File

@@ -0,0 +1,34 @@
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import require_viewer, repo
router = APIRouter()
@router.get(
"/attackers/{uuid}/transcripts",
tags=["Attacker Profiles"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Attacker not found"},
},
)
@_traced("api.get_attacker_transcripts")
async def get_attacker_transcripts(
uuid: str,
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
"""List PTY session recordings for an attacker (newest first).
Each entry is a `session_recorded` log row — the frontend lists them
in the AttackerDetail Sessions tab and opens SessionDrawer on click.
"""
attacker = await repo.get_attacker_by_uuid(uuid)
if not attacker:
raise HTTPException(status_code=404, detail="Attacker not found")
rows = await repo.get_attacker_transcripts(uuid)
return {"total": len(rows), "data": rows}

View File

@@ -0,0 +1,6 @@
from fastapi import APIRouter
from .api_get_transcript import router as transcript_router
transcripts_router = APIRouter()
transcripts_router.include_router(transcript_router)

View File

@@ -0,0 +1,189 @@
"""
Paged asciinema v2 transcript endpoint.
Transcripts are stored as one JSONL day-shard per (decky, UTC day) under
/var/lib/decnet/artifacts/{decky}/{service}/transcripts/sessions-YYYY-MM-DD.jsonl
Each line carries a ``sid`` tag; multiple concurrent sessions interleave into
the same shard (O_APPEND + sub-PIPE_BUF writes keep lines atomic — see
decnet/templates/_shared/sessrec/sessrec.c for the guarantee).
Rather than scanning the whole shard on every request, the first hit for a
given (shard path, mtime) builds an in-memory index of ``sid → [byte offsets]``
by one pass. Subsequent paged reads pread() exact line slices in O(limit).
Index is bounded by the disk-free precheck (< 200 MB free → no recording)
and the 10 MB per-session cap.
"""
from __future__ import annotations
import json
import os
import re
from collections import OrderedDict
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import require_admin, repo
router = APIRouter()
ARTIFACTS_ROOT = Path(os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts"))
_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
_SID_RE = re.compile(r"^[a-f0-9-]{36}$")
_SERVICE_RE = re.compile(r"^(ssh|telnet)$")
# Shard filename is built by sessrec from UTC date — keep the charset tight
# so a forged shard_path in the Log row can't traverse.
_SHARD_BASENAME_RE = re.compile(r"^sessions-\d{4}-\d{2}-\d{2}\.jsonl$")
# (path, mtime_ns) → {sid: [(offset, length), ...]}
_INDEX_CACHE: "OrderedDict[tuple[str, int], dict[str, list[tuple[int, int]]]]" = OrderedDict()
_CACHE_MAX = 32
def _get_index(path: Path) -> tuple[dict[str, list[tuple[int, int]]], int]:
st = path.stat()
key = (str(path), st.st_mtime_ns)
if key in _INDEX_CACHE:
_INDEX_CACHE.move_to_end(key)
return _INDEX_CACHE[key], st.st_size
index: dict[str, list[tuple[int, int]]] = {}
with path.open("rb") as f:
offset = 0
for line in f:
length = len(line)
# Fast sid extract: look for `"sid":"<36 chars>"` prefix — every
# sessrec line starts with that field (see emit_*).
try:
m = re.search(rb'"sid":"([a-f0-9-]{36})"', line)
except re.error:
m = None
if m:
sid = m.group(1).decode("ascii")
index.setdefault(sid, []).append((offset, length))
offset += length
_INDEX_CACHE[key] = index
_INDEX_CACHE.move_to_end(key)
while len(_INDEX_CACHE) > _CACHE_MAX:
_INDEX_CACHE.popitem(last=False)
return index, st.st_size
def _resolve_shard(decky: str, service: str, shard_name: str) -> Path:
if not _DECKY_RE.fullmatch(decky):
raise HTTPException(status_code=400, detail="invalid decky name")
if not _SERVICE_RE.fullmatch(service):
raise HTTPException(status_code=400, detail="invalid service")
if not _SHARD_BASENAME_RE.fullmatch(shard_name):
raise HTTPException(status_code=400, detail="invalid shard name")
root = ARTIFACTS_ROOT.resolve()
candidate = (root / decky / service / "transcripts" / shard_name).resolve()
if root not in candidate.parents and candidate != root:
raise HTTPException(status_code=400, detail="path escapes artifacts root")
return candidate
@router.get(
"/transcripts/{decky}/{sid}",
tags=["Transcripts"],
responses={
400: {"description": "Invalid decky or sid parameter"},
401: {"description": "Could not validate credentials"},
403: {"description": "Admin access required"},
404: {"description": "Transcript not found"},
},
)
@_traced("api.get_transcript")
async def get_transcript(
decky: str,
sid: str,
offset: int = Query(0, ge=0),
limit: int = Query(500, ge=1, le=5000),
admin: dict = Depends(require_admin),
) -> dict[str, Any]:
if not _DECKY_RE.fullmatch(decky):
raise HTTPException(status_code=400, detail="invalid decky name")
if not _SID_RE.fullmatch(sid):
raise HTTPException(status_code=400, detail="invalid sid")
log = await repo.get_session_log(sid)
if not log:
raise HTTPException(status_code=404, detail="session not found")
try:
fields = json.loads(log.get("fields") or "{}")
except (ValueError, TypeError):
fields = {}
service = fields.get("service") or log.get("service")
shard_path_field = fields.get("shard_path") or ""
shard_name = Path(shard_path_field).name
log_decky = log.get("decky") or fields.get("decky")
if log_decky and log_decky != decky:
raise HTTPException(status_code=404, detail="session not found")
path = _resolve_shard(decky, service or "", shard_name or "")
if not path.is_file():
raise HTTPException(status_code=404, detail="transcript not found")
index, _size = _get_index(path)
lines_meta = index.get(sid)
if not lines_meta:
raise HTTPException(status_code=404, detail="sid not present in shard")
header: dict[str, Any] = {}
events: list[list[Any]] = []
truncated = False
# First pass: find the header line (has "hdr" key) and count events.
# Keep it O(n lines for this sid), not O(shard).
total_events = 0
event_positions: list[tuple[int, int]] = []
with path.open("rb") as f:
for off, ln in lines_meta:
f.seek(off)
raw = f.read(ln)
try:
obj = json.loads(raw)
except ValueError:
continue
if "hdr" in obj:
header = obj["hdr"]
continue
if obj.get("trunc"):
truncated = True
continue
event_positions.append((off, ln))
total_events += 1
# Page the events window.
window = event_positions[offset:offset + limit]
for off, ln in window:
f.seek(off)
raw = f.read(ln)
try:
obj = json.loads(raw)
except ValueError:
continue
t = obj.get("t")
ch = obj.get("ch")
d = obj.get("d")
if t is None or ch is None or d is None:
continue
events.append([t, ch, d])
return {
"sid": sid,
"service": service,
"header": header,
"events": events,
"offset": offset,
"limit": limit,
"total": total_events,
"has_more": (offset + limit) < total_events,
"truncated": truncated,
}

View File

@@ -38,6 +38,8 @@ class DummyRepo(BaseRepository):
async def update_user_role(self, u, r): await super().update_user_role(u, r)
async def purge_logs_and_bounties(self): await super().purge_logs_and_bounties()
async def get_attacker_artifacts(self, uuid): await super().get_attacker_artifacts(uuid)
async def get_attacker_transcripts(self, uuid): await super().get_attacker_transcripts(uuid)
async def get_session_log(self, sid): await super().get_session_log(sid)
@pytest.mark.asyncio
async def test_base_repo_coverage():
@@ -75,6 +77,8 @@ async def test_base_repo_coverage():
await dr.update_user_role("a", "admin")
await dr.purge_logs_and_bounties()
await dr.get_attacker_artifacts("a")
await dr.get_attacker_transcripts("a")
await dr.get_session_log("a")
# Swarm methods: default NotImplementedError on BaseRepository. Covering
# them here keeps the coverage contract honest for the swarm CRUD surface.