fix(canary): stream base64 payload via stdin to avoid ARG_MAX
Real-world plant() crashed with OSError [Errno 7] Argument list too
long when an artifact (honeydoc HTML / DOCX / PDF) base64-encoded
into the sh -c script body exceeded the kernel's argv limit (typically
128KB-2MB depending on the host).
Fix: keep the script trivial ('mkdir -p ... && base64 -d > path && ...')
and stream the encoded bytes through 'docker exec -i ... sh -c'
stdin instead. _run() grew an optional stdin_bytes parameter that's
piped into proc.communicate(input=...). The stdin path covers
arbitrarily large artifacts.
Tests updated:
- test_plant_argv_and_base64_round_trip now asserts the docker -i
flag is present and the base64 payload reaches stdin (and notably
is NOT in the script body).
- _FakeProc.communicate accepts input=None across the board so the
patched fast path no longer trips on the new kwarg.
This commit is contained in:
@@ -59,17 +59,22 @@ def _dirname(path: str) -> str:
|
|||||||
return path[:idx]
|
return path[:idx]
|
||||||
|
|
||||||
|
|
||||||
async def _run(argv: list[str]) -> tuple[int, str, str]:
|
async def _run(
|
||||||
|
argv: list[str], *, stdin_bytes: Optional[bytes] = None,
|
||||||
|
) -> tuple[int, str, str]:
|
||||||
try:
|
try:
|
||||||
proc = await asyncio.create_subprocess_exec(
|
proc = await asyncio.create_subprocess_exec(
|
||||||
*argv,
|
*argv,
|
||||||
|
stdin=asyncio.subprocess.PIPE if stdin_bytes is not None else None,
|
||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
stderr=asyncio.subprocess.PIPE,
|
stderr=asyncio.subprocess.PIPE,
|
||||||
)
|
)
|
||||||
except FileNotFoundError as exc:
|
except FileNotFoundError as exc:
|
||||||
return 127, "", f"argv[0] not found: {exc}"
|
return 127, "", f"argv[0] not found: {exc}"
|
||||||
try:
|
try:
|
||||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT)
|
stdout, stderr = await asyncio.wait_for(
|
||||||
|
proc.communicate(input=stdin_bytes), timeout=_TIMEOUT,
|
||||||
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
try:
|
try:
|
||||||
proc.kill()
|
proc.kill()
|
||||||
@@ -83,25 +88,26 @@ async def _run(argv: list[str]) -> tuple[int, str, str]:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _build_plant_command(artifact: CanaryArtifact) -> str:
|
def _build_plant_command(artifact: CanaryArtifact) -> tuple[str, bytes]:
|
||||||
"""Compose the ``sh -c`` script that writes one artifact.
|
"""Compose the ``sh -c`` script + stdin payload for one artifact.
|
||||||
|
|
||||||
Binary safety: we base64-encode on the host side and ``base64 -d``
|
Binary safety: we base64-encode on the host and stream the result
|
||||||
inside the container, so the bytes never touch a shell argv
|
over stdin to ``base64 -d`` inside the container, so the bytes
|
||||||
interpolation point. Both ``base64`` (coreutils) and ``touch -d
|
never touch the argv (kernel ARG_MAX would reject anything larger
|
||||||
@<unix_ts>`` are present on every Linux base image we ship, so
|
than ~128KB-2MB depending on the host). Both ``base64`` (coreutils)
|
||||||
there's no per-distro branching.
|
and ``touch -d @<unix_ts>`` are present on every Linux base image
|
||||||
|
we ship, so there's no per-distro branching.
|
||||||
"""
|
"""
|
||||||
encoded = base64.b64encode(artifact.content).decode("ascii")
|
encoded = base64.b64encode(artifact.content)
|
||||||
mtime = int(time.time() + artifact.mtime_offset)
|
mtime = int(time.time() + artifact.mtime_offset)
|
||||||
mode_str = oct(artifact.mode)[2:]
|
mode_str = oct(artifact.mode)[2:]
|
||||||
parts = [
|
parts = [
|
||||||
f"mkdir -p {shlex.quote(_dirname(artifact.path))}",
|
f"mkdir -p {shlex.quote(_dirname(artifact.path))}",
|
||||||
f"printf %s {shlex.quote(encoded)} | base64 -d > {shlex.quote(artifact.path)}",
|
f"base64 -d > {shlex.quote(artifact.path)}",
|
||||||
f"chmod {mode_str} {shlex.quote(artifact.path)}",
|
f"chmod {mode_str} {shlex.quote(artifact.path)}",
|
||||||
f"touch -d @{mtime} {shlex.quote(artifact.path)}",
|
f"touch -d @{mtime} {shlex.quote(artifact.path)}",
|
||||||
]
|
]
|
||||||
return " && ".join(parts)
|
return " && ".join(parts), encoded
|
||||||
|
|
||||||
|
|
||||||
async def _publish(
|
async def _publish(
|
||||||
@@ -151,9 +157,11 @@ async def plant(
|
|||||||
await repo.update_canary_token_state(token_uuid, "failed", err)
|
await repo.update_canary_token_state(token_uuid, "failed", err)
|
||||||
return False, err
|
return False, err
|
||||||
|
|
||||||
sh_cmd = _build_plant_command(artifact)
|
sh_cmd, stdin_payload = _build_plant_command(artifact)
|
||||||
argv = [_DOCKER, "exec", _container_for(decky_name), "sh", "-c", sh_cmd]
|
# ``-i`` keeps stdin attached so base64 -d inside the container can
|
||||||
rc, _stdout, stderr = await _run(argv)
|
# consume the encoded payload streamed from the host.
|
||||||
|
argv = [_DOCKER, "exec", "-i", _container_for(decky_name), "sh", "-c", sh_cmd]
|
||||||
|
rc, _stdout, stderr = await _run(argv, stdin_bytes=stdin_payload)
|
||||||
success = rc == 0
|
success = rc == 0
|
||||||
error = None if success else (stderr.strip()[:256] or f"rc={rc}")
|
error = None if success else (stderr.strip()[:256] or f"rc={rc}")
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ class _FakeProc:
|
|||||||
self.returncode = rc
|
self.returncode = rc
|
||||||
self._stderr = stderr
|
self._stderr = stderr
|
||||||
|
|
||||||
async def communicate(self) -> tuple[bytes, bytes]:
|
async def communicate(self, input: bytes | None = None) -> tuple[bytes, bytes]:
|
||||||
return b"", self._stderr
|
return b"", self._stderr
|
||||||
|
|
||||||
def kill(self) -> None: # pragma: no cover
|
def kill(self) -> None: # pragma: no cover
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ class _FakeProc:
|
|||||||
self.returncode = rc
|
self.returncode = rc
|
||||||
self._stderr = stderr
|
self._stderr = stderr
|
||||||
|
|
||||||
async def communicate(self) -> tuple[bytes, bytes]:
|
async def communicate(self, input: bytes | None = None) -> tuple[bytes, bytes]:
|
||||||
return b"", self._stderr
|
return b"", self._stderr
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -46,12 +46,23 @@ class _FakeProc:
|
|||||||
|
|
||||||
def _patch_subprocess(rc: int = 0, stderr: bytes = b""):
|
def _patch_subprocess(rc: int = 0, stderr: bytes = b""):
|
||||||
captured: list[list[str]] = []
|
captured: list[list[str]] = []
|
||||||
|
stdin_seen: list[bytes | None] = []
|
||||||
|
|
||||||
async def _fake(*argv, **kw):
|
async def _fake(*argv, **kw):
|
||||||
captured.append(list(argv))
|
captured.append(list(argv))
|
||||||
return _FakeProc(rc, b"", stderr)
|
# Capture whatever bytes the planter would stream over stdin —
|
||||||
|
# the new contract pipes the base64 payload here instead of
|
||||||
|
# interpolating it into the sh script.
|
||||||
|
proc = _FakeProc(rc, b"", stderr)
|
||||||
|
orig = proc.communicate
|
||||||
|
|
||||||
return patch.object(asyncio, "create_subprocess_exec", _fake), captured
|
async def communicate(input=None):
|
||||||
|
stdin_seen.append(input)
|
||||||
|
return await orig()
|
||||||
|
proc.communicate = communicate # type: ignore[assignment]
|
||||||
|
return proc
|
||||||
|
|
||||||
|
return patch.object(asyncio, "create_subprocess_exec", _fake), captured, stdin_seen
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
@pytest_asyncio.fixture
|
||||||
@@ -87,7 +98,7 @@ async def test_plant_argv_and_base64_round_trip(repo: SQLiteRepository, fake_bus
|
|||||||
"generator": "aws_creds", "placement_path": art.path,
|
"generator": "aws_creds", "placement_path": art.path,
|
||||||
"callback_token": "slug", "secret_seed": "s", "created_by": "u1",
|
"callback_token": "slug", "secret_seed": "s", "created_by": "u1",
|
||||||
})
|
})
|
||||||
patcher, captured = _patch_subprocess(rc=0)
|
patcher, captured, stdin_seen = _patch_subprocess(rc=0)
|
||||||
with patcher:
|
with patcher:
|
||||||
ok, err = await planter.plant(
|
ok, err = await planter.plant(
|
||||||
"web1", art, token_uuid="tok-1", repo=repo, bus=fake_bus,
|
"web1", art, token_uuid="tok-1", repo=repo, bus=fake_bus,
|
||||||
@@ -95,12 +106,15 @@ async def test_plant_argv_and_base64_round_trip(repo: SQLiteRepository, fake_bus
|
|||||||
assert ok is True and err is None
|
assert ok is True and err is None
|
||||||
assert len(captured) == 1
|
assert len(captured) == 1
|
||||||
argv = captured[0]
|
argv = captured[0]
|
||||||
assert argv[:3] == ["docker", "exec", "web1-ssh"]
|
# docker exec -i <container> sh -c <script>
|
||||||
assert argv[3:5] == ["sh", "-c"]
|
assert argv[:4] == ["docker", "exec", "-i", "web1-ssh"]
|
||||||
script = argv[5]
|
assert argv[4:6] == ["sh", "-c"]
|
||||||
# base64-decoded payload appears in the script verbatim.
|
script = argv[6]
|
||||||
encoded = base64.b64encode(art.content).decode()
|
# The base64 payload is streamed via stdin, NOT interpolated into
|
||||||
assert encoded in script
|
# the script (would blow past ARG_MAX for any non-trivial blob).
|
||||||
|
assert stdin_seen[0] == base64.b64encode(art.content)
|
||||||
|
assert "base64 -d > /home/admin/.aws/credentials" in script
|
||||||
|
assert base64.b64encode(art.content).decode() not in script
|
||||||
# touch -d @<mtime> with negative offset → an int strictly less than now.
|
# touch -d @<mtime> with negative offset → an int strictly less than now.
|
||||||
m = re.search(r"touch -d @(\d+) ", script)
|
m = re.search(r"touch -d @(\d+) ", script)
|
||||||
assert m and int(m.group(1)) > 0
|
assert m and int(m.group(1)) > 0
|
||||||
@@ -117,7 +131,7 @@ async def test_plant_records_failure_when_docker_returns_nonzero(repo: SQLiteRep
|
|||||||
"callback_token": "slug2", "secret_seed": "s", "created_by": "u1",
|
"callback_token": "slug2", "secret_seed": "s", "created_by": "u1",
|
||||||
})
|
})
|
||||||
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
|
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
|
||||||
patcher, _ = _patch_subprocess(rc=125, stderr=b"container not running")
|
patcher, _argvs, _stdin = _patch_subprocess(rc=125, stderr=b"container not running")
|
||||||
with patcher:
|
with patcher:
|
||||||
ok, err = await planter.plant(
|
ok, err = await planter.plant(
|
||||||
"web1", art, token_uuid="tok-2", repo=repo, bus=fake_bus,
|
"web1", art, token_uuid="tok-2", repo=repo, bus=fake_bus,
|
||||||
@@ -151,7 +165,7 @@ async def test_plant_publishes_placed_event(repo: SQLiteRepository, fake_bus: Fa
|
|||||||
})
|
})
|
||||||
sub = fake_bus.subscribe("canary.>")
|
sub = fake_bus.subscribe("canary.>")
|
||||||
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
|
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
|
||||||
patcher, _ = _patch_subprocess(rc=0)
|
patcher, _argvs, _stdin = _patch_subprocess(rc=0)
|
||||||
with patcher:
|
with patcher:
|
||||||
await planter.plant(
|
await planter.plant(
|
||||||
"web1", art, token_uuid="tok-4", repo=repo, bus=fake_bus,
|
"web1", art, token_uuid="tok-4", repo=repo, bus=fake_bus,
|
||||||
@@ -173,7 +187,7 @@ async def test_revoke_unlinks_and_publishes(repo: SQLiteRepository, fake_bus: Fa
|
|||||||
"callback_token": "slugR", "secret_seed": "s", "created_by": "u1",
|
"callback_token": "slugR", "secret_seed": "s", "created_by": "u1",
|
||||||
})
|
})
|
||||||
sub = fake_bus.subscribe("canary.>")
|
sub = fake_bus.subscribe("canary.>")
|
||||||
patcher, captured = _patch_subprocess(rc=0)
|
patcher, captured, _stdin = _patch_subprocess(rc=0)
|
||||||
with patcher:
|
with patcher:
|
||||||
ok, err = await planter.revoke(
|
ok, err = await planter.revoke(
|
||||||
"web1", "/etc/x.env",
|
"web1", "/etc/x.env",
|
||||||
@@ -196,7 +210,7 @@ async def test_seed_baseline_creates_one_token_per_generator(
|
|||||||
) -> None:
|
) -> None:
|
||||||
monkeypatch.setenv("DECNET_CANARY_BASELINE", "git_config,env_file,aws_creds")
|
monkeypatch.setenv("DECNET_CANARY_BASELINE", "git_config,env_file,aws_creds")
|
||||||
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.test")
|
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.test")
|
||||||
patcher, captured = _patch_subprocess(rc=0)
|
patcher, captured, _stdin = _patch_subprocess(rc=0)
|
||||||
with patcher:
|
with patcher:
|
||||||
rows = await planter.seed_baseline("web1", repo, bus=fake_bus)
|
rows = await planter.seed_baseline("web1", repo, bus=fake_bus)
|
||||||
assert {r["generator"] for r in rows} == {"git_config", "env_file", "aws_creds"}
|
assert {r["generator"] for r in rows} == {"git_config", "env_file", "aws_creds"}
|
||||||
@@ -213,7 +227,7 @@ async def test_seed_baseline_creates_one_token_per_generator(
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_seed_baseline_skips_unknown_generator(repo: SQLiteRepository, monkeypatch) -> None:
|
async def test_seed_baseline_skips_unknown_generator(repo: SQLiteRepository, monkeypatch) -> None:
|
||||||
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file,bogus")
|
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file,bogus")
|
||||||
patcher, _ = _patch_subprocess(rc=0)
|
patcher, _argvs, _stdin = _patch_subprocess(rc=0)
|
||||||
with patcher:
|
with patcher:
|
||||||
rows = await planter.seed_baseline("web1", repo)
|
rows = await planter.seed_baseline("web1", repo)
|
||||||
assert {r["generator"] for r in rows} == {"env_file"}
|
assert {r["generator"] for r in rows} == {"env_file"}
|
||||||
@@ -224,7 +238,7 @@ async def test_seed_baseline_marks_failed_when_docker_errors(
|
|||||||
repo: SQLiteRepository, monkeypatch
|
repo: SQLiteRepository, monkeypatch
|
||||||
) -> None:
|
) -> None:
|
||||||
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file")
|
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file")
|
||||||
patcher, _ = _patch_subprocess(rc=125, stderr=b"container down")
|
patcher, _argvs, _stdin = _patch_subprocess(rc=125, stderr=b"container down")
|
||||||
with patcher:
|
with patcher:
|
||||||
rows = await planter.seed_baseline("web1", repo)
|
rows = await planter.seed_baseline("web1", repo)
|
||||||
assert len(rows) == 1
|
assert len(rows) == 1
|
||||||
|
|||||||
Reference in New Issue
Block a user