refactor(artifacts): extract resolve_artifact_path to shared module

Move artifact path validation + symlink-escape check out of the
admin-gated download endpoint into decnet/artifacts/paths.py so the
TTP EmailLifter can disk-reach .eml files at tag-time without
duplicating regex/root logic (DEBT-047).

The router now catches ArtifactPathError and re-raises HTTPException(400);
behavior is unchanged.
This commit is contained in:
2026-05-02 20:02:47 -04:00
parent cdbb3d3571
commit 7036a86e76
6 changed files with 186 additions and 49 deletions

View File

@@ -0,0 +1 @@
"""Artifact storage helpers shared between the web router and TTP workers."""

86
decnet/artifacts/paths.py Normal file
View File

@@ -0,0 +1,86 @@
"""
Shared on-disk artifact path resolution.
Honeypot decoys (SSH, SMTP) farm captured payloads into a host-mounted
quarantine tree:
/var/lib/decnet/artifacts/{decky}/{service}/{stored_as}
Two callers need to translate ``(decky, stored_as, service)`` into a
concrete ``Path`` rooted under that tree:
* The web router endpoint ``GET /api/v1/artifacts/{decky}/{stored_as}``
(``decnet.web.router.artifacts.api_get_artifact``) — admin-gated
download for the dashboard.
* The TTP ``EmailLifter`` (``decnet.ttp.impl.email_lifter``), which
reads the stored ``.eml`` at tag-time so body-aware predicates
(R0047 BEC, R0048 macro) don't need raw body text on the bus.
Both callers share the same validation rules and the same
defence-in-depth symlink-escape check; this module is the single
implementation. It is auth-agnostic — wrappers layer authentication
where appropriate (the router does ``require_admin``, the lifter does
not).
"""
from __future__ import annotations
import os
import re
from pathlib import Path
# decky names come from the deployer — lowercase alnum plus hyphens.
_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
# Services that own an artifacts subdir. Kept explicit so a caller
# can't pivot into arbitrary subpaths via a query string or bus payload.
_ALLOWED_SERVICES = frozenset({"ssh", "smtp"})
# stored_as is assembled by the capturing template as:
# ${ts}_${sha:0:12}_${base}
# where ts is ISO-8601 UTC (e.g. 2026-04-18T02:22:56Z), sha is 12 hex chars,
# and base is the original filename's basename. Keep the filename charset
# tight but allow common punctuation dropped files actually use.
_STORED_AS_RE = re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z_[a-f0-9]{12}_[A-Za-z0-9._-]{1,255}$"
)
# Module-level so tests can monkeypatch. Override via env in production
# (the systemd unit sets this) — the prod path matches the bind mount
# declared in decnet/services/{ssh,smtp}.py.
ARTIFACTS_ROOT = Path(
os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts")
)
class ArtifactPathError(ValueError):
"""Raised when (decky, stored_as, service) fails validation or escapes
the artifacts root.
The router catches this and re-raises HTTPException(400). The lifter
catches it and treats the event as having no body available (no-tag).
"""
def resolve_artifact_path(decky: str, stored_as: str, service: str) -> Path:
"""Validate inputs, resolve the on-disk path, and confirm it stays
inside the artifacts root.
Raises :class:`ArtifactPathError` on any violation. Does NOT check
that the file exists — callers handle that distinctly (404 for the
router, no-tag for the lifter).
"""
if service not in _ALLOWED_SERVICES:
raise ArtifactPathError("invalid service")
if not _DECKY_RE.fullmatch(decky):
raise ArtifactPathError("invalid decky name")
if not _STORED_AS_RE.fullmatch(stored_as):
raise ArtifactPathError("invalid stored_as")
root = ARTIFACTS_ROOT.resolve()
candidate = (root / decky / service / stored_as).resolve()
# defence-in-depth: even though the regexes reject `..`, make sure a
# symlink or weird filesystem state can't escape the root.
if root not in candidate.parents and candidate != root:
raise ArtifactPathError("path escapes artifacts root")
return candidate

View File

@@ -8,61 +8,23 @@ The capture event already flows through the normal log pipeline (one
RFC 5424 line per capture, see templates/ssh/emit_capture.py), so metadata RFC 5424 line per capture, see templates/ssh/emit_capture.py), so metadata
is served via /logs. This endpoint exists only to retrieve the raw bytes — is served via /logs. This endpoint exists only to retrieve the raw bytes —
admin-gated because the payloads are attacker-controlled content. admin-gated because the payloads are attacker-controlled content.
Path resolution lives in :mod:`decnet.artifacts.paths` so the TTP
EmailLifter can share the exact same validation when it disk-reaches
``.eml`` files at tag-time (DEBT-047).
""" """
from __future__ import annotations from __future__ import annotations
import os
import re
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from decnet.artifacts.paths import ArtifactPathError, resolve_artifact_path
from decnet.telemetry import traced as _traced from decnet.telemetry import traced as _traced
from decnet.web.dependencies import require_admin from decnet.web.dependencies import require_admin
router = APIRouter() router = APIRouter()
# Override via env for tests; the prod path matches the bind mount declared in
# decnet/services/ssh.py and decnet/services/smtp.py.
ARTIFACTS_ROOT = Path(os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts"))
# decky names come from the deployer — lowercase alnum plus hyphens.
_DECKY_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
# Services that own an artifacts subdir. Kept explicit so a caller can't
# pivot into arbitrary subpaths via the query string.
_ALLOWED_SERVICES = {"ssh", "smtp"}
# stored_as is assembled by the capturing template as:
# ${ts}_${sha:0:12}_${base}
# where ts is ISO-8601 UTC (e.g. 2026-04-18T02:22:56Z), sha is 12 hex chars,
# and base is the original filename's basename. Keep the filename charset
# tight but allow common punctuation dropped files actually use.
_STORED_AS_RE = re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z_[a-f0-9]{12}_[A-Za-z0-9._-]{1,255}$"
)
def _resolve_artifact_path(decky: str, stored_as: str, service: str) -> Path:
"""Validate inputs, resolve the on-disk path, and confirm it stays inside
the artifacts root. Raises HTTPException(400) on any violation."""
if service not in _ALLOWED_SERVICES:
raise HTTPException(status_code=400, detail="invalid service")
if not _DECKY_RE.fullmatch(decky):
raise HTTPException(status_code=400, detail="invalid decky name")
if not _STORED_AS_RE.fullmatch(stored_as):
raise HTTPException(status_code=400, detail="invalid stored_as")
root = ARTIFACTS_ROOT.resolve()
candidate = (root / decky / service / stored_as).resolve()
# defence-in-depth: even though the regexes reject `..`, make sure a
# symlink or weird filesystem state can't escape the root.
if root not in candidate.parents and candidate != root:
raise HTTPException(status_code=400, detail="path escapes artifacts root")
return candidate
@router.get( @router.get(
"/artifacts/{decky}/{stored_as}", "/artifacts/{decky}/{stored_as}",
@@ -81,7 +43,10 @@ async def get_artifact(
service: str = Query("ssh", pattern=r"^[a-z]{1,16}$"), service: str = Query("ssh", pattern=r"^[a-z]{1,16}$"),
admin: dict = Depends(require_admin), admin: dict = Depends(require_admin),
) -> FileResponse: ) -> FileResponse:
path = _resolve_artifact_path(decky, stored_as, service) try:
path = resolve_artifact_path(decky, stored_as, service)
except ArtifactPathError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
if not path.is_file(): if not path.is_file():
raise HTTPException(status_code=404, detail="artifact not found") raise HTTPException(status_code=404, detail="artifact not found")
return FileResponse( return FileResponse(

View File

@@ -23,9 +23,10 @@ def artifacts_root(tmp_path, monkeypatch):
(root / _DECKY / "ssh").mkdir(parents=True) (root / _DECKY / "ssh").mkdir(parents=True)
(root / _DECKY / "ssh" / _VALID_STORED_AS).write_bytes(_PAYLOAD) (root / _DECKY / "ssh" / _VALID_STORED_AS).write_bytes(_PAYLOAD)
# Patch the module-level constant (captured at import time). # Patch the canonical module-level constant. Both the router and
from decnet.web.router.artifacts import api_get_artifact # the EmailLifter resolve through decnet.artifacts.paths.
monkeypatch.setattr(api_get_artifact, "ARTIFACTS_ROOT", root) from decnet.artifacts import paths as artifact_paths
monkeypatch.setattr(artifact_paths, "ARTIFACTS_ROOT", root)
return root return root
@@ -137,8 +138,8 @@ async def test_smtp_service_serves_from_smtp_subdir(
(root / _DECKY / "smtp").mkdir(parents=True) (root / _DECKY / "smtp").mkdir(parents=True)
eml = "2026-04-18T02:22:56Z_abc123def456_msg.eml" eml = "2026-04-18T02:22:56Z_abc123def456_msg.eml"
(root / _DECKY / "smtp" / eml).write_bytes(b"From: a\r\n\r\nhi") (root / _DECKY / "smtp" / eml).write_bytes(b"From: a\r\n\r\nhi")
from decnet.web.router.artifacts import api_get_artifact from decnet.artifacts import paths as artifact_paths
monkeypatch.setattr(api_get_artifact, "ARTIFACTS_ROOT", root) monkeypatch.setattr(artifact_paths, "ARTIFACTS_ROOT", root)
res = await client.get( res = await client.get(
f"/api/v1/artifacts/{_DECKY}/{eml}?service=smtp", f"/api/v1/artifacts/{_DECKY}/{eml}?service=smtp",
headers={"Authorization": f"Bearer {auth_token}"}, headers={"Authorization": f"Bearer {auth_token}"},

View File

View File

@@ -0,0 +1,84 @@
"""Unit tests for decnet.artifacts.paths.resolve_artifact_path."""
from __future__ import annotations
import os
import pytest
from decnet.artifacts import paths as artifact_paths
from decnet.artifacts.paths import ArtifactPathError, resolve_artifact_path
_DECKY = "test-decky-01"
_VALID_STORED_AS = "2026-04-18T02:22:56Z_abc123def456_payload.bin"
@pytest.fixture
def root(tmp_path, monkeypatch):
monkeypatch.setattr(artifact_paths, "ARTIFACTS_ROOT", tmp_path)
return tmp_path
def test_valid_ssh_path(root):
p = resolve_artifact_path(_DECKY, _VALID_STORED_AS, "ssh")
assert p == (root / _DECKY / "ssh" / _VALID_STORED_AS).resolve()
def test_valid_smtp_path(root):
eml = "2026-04-18T02:22:56Z_abc123def456_msg.eml"
p = resolve_artifact_path(_DECKY, eml, "smtp")
assert p == (root / _DECKY / "smtp" / eml).resolve()
@pytest.mark.parametrize("service", ["rdp", "telnet", "", "../etc", "ssh/../smtp"])
def test_invalid_service(root, service):
with pytest.raises(ArtifactPathError, match="invalid service"):
resolve_artifact_path(_DECKY, _VALID_STORED_AS, service)
@pytest.mark.parametrize("decky", [
"UPPERCASE", "has_underscore", "has.dot", "-leading-hyphen",
"", "a/b", "..",
])
def test_invalid_decky(root, decky):
with pytest.raises(ArtifactPathError, match="invalid decky name"):
resolve_artifact_path(decky, _VALID_STORED_AS, "ssh")
@pytest.mark.parametrize("stored_as", [
"not-a-timestamp_abc123def456_payload.bin",
"2026-04-18T02:22:56Z_SHORT_payload.bin",
"2026-04-18T02:22:56Z_abc123def456_",
"random-string",
"",
"../../etc/passwd",
])
def test_invalid_stored_as(root, stored_as):
with pytest.raises(ArtifactPathError, match="invalid stored_as"):
resolve_artifact_path(_DECKY, stored_as, "ssh")
def test_symlink_escape_blocked(tmp_path, monkeypatch):
"""A symlink inside the artifacts tree pointing outside must not let
resolve_artifact_path return a path outside the root."""
real_root = tmp_path / "real"
real_root.mkdir()
secret_dir = tmp_path / "outside"
secret_dir.mkdir()
(secret_dir / _VALID_STORED_AS).write_bytes(b"secret")
decky_dir = real_root / _DECKY
decky_dir.mkdir()
# symlink the entire ssh subdir to the outside location
os.symlink(secret_dir, decky_dir / "ssh")
monkeypatch.setattr(artifact_paths, "ARTIFACTS_ROOT", real_root)
with pytest.raises(ArtifactPathError, match="escapes"):
resolve_artifact_path(_DECKY, _VALID_STORED_AS, "ssh")
def test_does_not_check_existence(root):
"""Helper validates and resolves; existence is the caller's problem."""
p = resolve_artifact_path(_DECKY, _VALID_STORED_AS, "ssh")
assert not p.exists()