fix(canary): stream base64 payload via stdin to avoid ARG_MAX

Real-world plant() crashed with OSError [Errno 7] Argument list too
long when an artifact (honeydoc HTML / DOCX / PDF) base64-encoded
into the sh -c script body exceeded the kernel's argv limit (typically
128KB-2MB depending on the host).

Fix: keep the script trivial ('mkdir -p ... && base64 -d > path && ...')
and stream the encoded bytes through 'docker exec -i ... sh -c'
stdin instead. _run() grew an optional stdin_bytes parameter that's
piped into proc.communicate(input=...). The stdin path covers
arbitrarily large artifacts.

Tests updated:
- test_plant_argv_and_base64_round_trip now asserts the docker -i
  flag is present and the base64 payload reaches stdin (and notably
  is NOT in the script body).
- _FakeProc.communicate accepts input=None across the board so the
  patched fast path no longer trips on the new kwarg.
This commit is contained in:
2026-04-27 13:37:19 -04:00
parent af15e68a3d
commit c17b9e01c8
4 changed files with 54 additions and 32 deletions

View File

@@ -59,17 +59,22 @@ def _dirname(path: str) -> str:
return path[:idx] return path[:idx]
async def _run(argv: list[str]) -> tuple[int, str, str]: async def _run(
argv: list[str], *, stdin_bytes: Optional[bytes] = None,
) -> tuple[int, str, str]:
try: try:
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
*argv, *argv,
stdin=asyncio.subprocess.PIPE if stdin_bytes is not None else None,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
) )
except FileNotFoundError as exc: except FileNotFoundError as exc:
return 127, "", f"argv[0] not found: {exc}" return 127, "", f"argv[0] not found: {exc}"
try: try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT) stdout, stderr = await asyncio.wait_for(
proc.communicate(input=stdin_bytes), timeout=_TIMEOUT,
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
try: try:
proc.kill() proc.kill()
@@ -83,25 +88,26 @@ async def _run(argv: list[str]) -> tuple[int, str, str]:
) )
def _build_plant_command(artifact: CanaryArtifact) -> str: def _build_plant_command(artifact: CanaryArtifact) -> tuple[str, bytes]:
"""Compose the ``sh -c`` script that writes one artifact. """Compose the ``sh -c`` script + stdin payload for one artifact.
Binary safety: we base64-encode on the host side and ``base64 -d`` Binary safety: we base64-encode on the host and stream the result
inside the container, so the bytes never touch a shell argv over stdin to ``base64 -d`` inside the container, so the bytes
interpolation point. Both ``base64`` (coreutils) and ``touch -d never touch the argv (kernel ARG_MAX would reject anything larger
@<unix_ts>`` are present on every Linux base image we ship, so than ~128KB-2MB depending on the host). Both ``base64`` (coreutils)
there's no per-distro branching. and ``touch -d @<unix_ts>`` are present on every Linux base image
we ship, so there's no per-distro branching.
""" """
encoded = base64.b64encode(artifact.content).decode("ascii") encoded = base64.b64encode(artifact.content)
mtime = int(time.time() + artifact.mtime_offset) mtime = int(time.time() + artifact.mtime_offset)
mode_str = oct(artifact.mode)[2:] mode_str = oct(artifact.mode)[2:]
parts = [ parts = [
f"mkdir -p {shlex.quote(_dirname(artifact.path))}", f"mkdir -p {shlex.quote(_dirname(artifact.path))}",
f"printf %s {shlex.quote(encoded)} | base64 -d > {shlex.quote(artifact.path)}", f"base64 -d > {shlex.quote(artifact.path)}",
f"chmod {mode_str} {shlex.quote(artifact.path)}", f"chmod {mode_str} {shlex.quote(artifact.path)}",
f"touch -d @{mtime} {shlex.quote(artifact.path)}", f"touch -d @{mtime} {shlex.quote(artifact.path)}",
] ]
return " && ".join(parts) return " && ".join(parts), encoded
async def _publish( async def _publish(
@@ -151,9 +157,11 @@ async def plant(
await repo.update_canary_token_state(token_uuid, "failed", err) await repo.update_canary_token_state(token_uuid, "failed", err)
return False, err return False, err
sh_cmd = _build_plant_command(artifact) sh_cmd, stdin_payload = _build_plant_command(artifact)
argv = [_DOCKER, "exec", _container_for(decky_name), "sh", "-c", sh_cmd] # ``-i`` keeps stdin attached so base64 -d inside the container can
rc, _stdout, stderr = await _run(argv) # consume the encoded payload streamed from the host.
argv = [_DOCKER, "exec", "-i", _container_for(decky_name), "sh", "-c", sh_cmd]
rc, _stdout, stderr = await _run(argv, stdin_bytes=stdin_payload)
success = rc == 0 success = rc == 0
error = None if success else (stderr.strip()[:256] or f"rc={rc}") error = None if success else (stderr.strip()[:256] or f"rc={rc}")

View File

@@ -21,7 +21,7 @@ class _FakeProc:
self.returncode = rc self.returncode = rc
self._stderr = stderr self._stderr = stderr
async def communicate(self) -> tuple[bytes, bytes]: async def communicate(self, input: bytes | None = None) -> tuple[bytes, bytes]:
return b"", self._stderr return b"", self._stderr
def kill(self) -> None: # pragma: no cover def kill(self) -> None: # pragma: no cover

View File

@@ -32,7 +32,7 @@ class _FakeProc:
self.returncode = rc self.returncode = rc
self._stderr = stderr self._stderr = stderr
async def communicate(self) -> tuple[bytes, bytes]: async def communicate(self, input: bytes | None = None) -> tuple[bytes, bytes]:
return b"", self._stderr return b"", self._stderr

View File

@@ -46,12 +46,23 @@ class _FakeProc:
def _patch_subprocess(rc: int = 0, stderr: bytes = b""): def _patch_subprocess(rc: int = 0, stderr: bytes = b""):
captured: list[list[str]] = [] captured: list[list[str]] = []
stdin_seen: list[bytes | None] = []
async def _fake(*argv, **kw): async def _fake(*argv, **kw):
captured.append(list(argv)) captured.append(list(argv))
return _FakeProc(rc, b"", stderr) # Capture whatever bytes the planter would stream over stdin —
# the new contract pipes the base64 payload here instead of
# interpolating it into the sh script.
proc = _FakeProc(rc, b"", stderr)
orig = proc.communicate
return patch.object(asyncio, "create_subprocess_exec", _fake), captured async def communicate(input=None):
stdin_seen.append(input)
return await orig()
proc.communicate = communicate # type: ignore[assignment]
return proc
return patch.object(asyncio, "create_subprocess_exec", _fake), captured, stdin_seen
@pytest_asyncio.fixture @pytest_asyncio.fixture
@@ -87,7 +98,7 @@ async def test_plant_argv_and_base64_round_trip(repo: SQLiteRepository, fake_bus
"generator": "aws_creds", "placement_path": art.path, "generator": "aws_creds", "placement_path": art.path,
"callback_token": "slug", "secret_seed": "s", "created_by": "u1", "callback_token": "slug", "secret_seed": "s", "created_by": "u1",
}) })
patcher, captured = _patch_subprocess(rc=0) patcher, captured, stdin_seen = _patch_subprocess(rc=0)
with patcher: with patcher:
ok, err = await planter.plant( ok, err = await planter.plant(
"web1", art, token_uuid="tok-1", repo=repo, bus=fake_bus, "web1", art, token_uuid="tok-1", repo=repo, bus=fake_bus,
@@ -95,12 +106,15 @@ async def test_plant_argv_and_base64_round_trip(repo: SQLiteRepository, fake_bus
assert ok is True and err is None assert ok is True and err is None
assert len(captured) == 1 assert len(captured) == 1
argv = captured[0] argv = captured[0]
assert argv[:3] == ["docker", "exec", "web1-ssh"] # docker exec -i <container> sh -c <script>
assert argv[3:5] == ["sh", "-c"] assert argv[:4] == ["docker", "exec", "-i", "web1-ssh"]
script = argv[5] assert argv[4:6] == ["sh", "-c"]
# base64-decoded payload appears in the script verbatim. script = argv[6]
encoded = base64.b64encode(art.content).decode() # The base64 payload is streamed via stdin, NOT interpolated into
assert encoded in script # the script (would blow past ARG_MAX for any non-trivial blob).
assert stdin_seen[0] == base64.b64encode(art.content)
assert "base64 -d > /home/admin/.aws/credentials" in script
assert base64.b64encode(art.content).decode() not in script
# touch -d @<mtime> with negative offset → an int strictly less than now. # touch -d @<mtime> with negative offset → an int strictly less than now.
m = re.search(r"touch -d @(\d+) ", script) m = re.search(r"touch -d @(\d+) ", script)
assert m and int(m.group(1)) > 0 assert m and int(m.group(1)) > 0
@@ -117,7 +131,7 @@ async def test_plant_records_failure_when_docker_returns_nonzero(repo: SQLiteRep
"callback_token": "slug2", "secret_seed": "s", "created_by": "u1", "callback_token": "slug2", "secret_seed": "s", "created_by": "u1",
}) })
art = CanaryArtifact(path="/x", content=b"y", generator="env_file") art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
patcher, _ = _patch_subprocess(rc=125, stderr=b"container not running") patcher, _argvs, _stdin = _patch_subprocess(rc=125, stderr=b"container not running")
with patcher: with patcher:
ok, err = await planter.plant( ok, err = await planter.plant(
"web1", art, token_uuid="tok-2", repo=repo, bus=fake_bus, "web1", art, token_uuid="tok-2", repo=repo, bus=fake_bus,
@@ -151,7 +165,7 @@ async def test_plant_publishes_placed_event(repo: SQLiteRepository, fake_bus: Fa
}) })
sub = fake_bus.subscribe("canary.>") sub = fake_bus.subscribe("canary.>")
art = CanaryArtifact(path="/x", content=b"y", generator="env_file") art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
patcher, _ = _patch_subprocess(rc=0) patcher, _argvs, _stdin = _patch_subprocess(rc=0)
with patcher: with patcher:
await planter.plant( await planter.plant(
"web1", art, token_uuid="tok-4", repo=repo, bus=fake_bus, "web1", art, token_uuid="tok-4", repo=repo, bus=fake_bus,
@@ -173,7 +187,7 @@ async def test_revoke_unlinks_and_publishes(repo: SQLiteRepository, fake_bus: Fa
"callback_token": "slugR", "secret_seed": "s", "created_by": "u1", "callback_token": "slugR", "secret_seed": "s", "created_by": "u1",
}) })
sub = fake_bus.subscribe("canary.>") sub = fake_bus.subscribe("canary.>")
patcher, captured = _patch_subprocess(rc=0) patcher, captured, _stdin = _patch_subprocess(rc=0)
with patcher: with patcher:
ok, err = await planter.revoke( ok, err = await planter.revoke(
"web1", "/etc/x.env", "web1", "/etc/x.env",
@@ -196,7 +210,7 @@ async def test_seed_baseline_creates_one_token_per_generator(
) -> None: ) -> None:
monkeypatch.setenv("DECNET_CANARY_BASELINE", "git_config,env_file,aws_creds") monkeypatch.setenv("DECNET_CANARY_BASELINE", "git_config,env_file,aws_creds")
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.test") monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.test")
patcher, captured = _patch_subprocess(rc=0) patcher, captured, _stdin = _patch_subprocess(rc=0)
with patcher: with patcher:
rows = await planter.seed_baseline("web1", repo, bus=fake_bus) rows = await planter.seed_baseline("web1", repo, bus=fake_bus)
assert {r["generator"] for r in rows} == {"git_config", "env_file", "aws_creds"} assert {r["generator"] for r in rows} == {"git_config", "env_file", "aws_creds"}
@@ -213,7 +227,7 @@ async def test_seed_baseline_creates_one_token_per_generator(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_seed_baseline_skips_unknown_generator(repo: SQLiteRepository, monkeypatch) -> None: async def test_seed_baseline_skips_unknown_generator(repo: SQLiteRepository, monkeypatch) -> None:
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file,bogus") monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file,bogus")
patcher, _ = _patch_subprocess(rc=0) patcher, _argvs, _stdin = _patch_subprocess(rc=0)
with patcher: with patcher:
rows = await planter.seed_baseline("web1", repo) rows = await planter.seed_baseline("web1", repo)
assert {r["generator"] for r in rows} == {"env_file"} assert {r["generator"] for r in rows} == {"env_file"}
@@ -224,7 +238,7 @@ async def test_seed_baseline_marks_failed_when_docker_errors(
repo: SQLiteRepository, monkeypatch repo: SQLiteRepository, monkeypatch
) -> None: ) -> None:
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file") monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file")
patcher, _ = _patch_subprocess(rc=125, stderr=b"container down") patcher, _argvs, _stdin = _patch_subprocess(rc=125, stderr=b"container down")
with patcher: with patcher:
rows = await planter.seed_baseline("web1", repo) rows = await planter.seed_baseline("web1", repo)
assert len(rows) == 1 assert len(rows) == 1