Real-world plant() crashed with OSError [Errno 7] Argument list too
long when an artifact (honeydoc HTML / DOCX / PDF) base64-encoded
into the sh -c script body exceeded the kernel's argv limit (typically
128KB-2MB depending on the host).
Fix: keep the script trivial ('mkdir -p ... && base64 -d > path && ...')
and stream the encoded bytes through 'docker exec -i ... sh -c'
stdin instead. _run() grew an optional stdin_bytes parameter that's
piped into proc.communicate(input=...). The stdin path covers
arbitrarily large artifacts.
Tests updated:
- test_plant_argv_and_base64_round_trip now asserts the docker -i
flag is present and the base64 payload reaches stdin (and notably
is NOT in the script body).
- _FakeProc.communicate accepts input=None across the board so the
patched fast path no longer trips on the new kwarg.
248 lines
9.3 KiB
Python
248 lines
9.3 KiB
Python
"""Coverage for the canary planter (docker exec wrapper).
|
|
|
|
We don't actually invoke docker — :func:`asyncio.create_subprocess_exec`
|
|
is patched to record argv and return canned ``(rc, stdout, stderr)``
|
|
triples. That lets us assert:
|
|
|
|
* the docker argv has the right shape (container = ``<decky>-ssh``,
|
|
``sh -c <script>``);
|
|
* the script base64-decodes the artifact bytes losslessly;
|
|
* mtime is backdated by the right offset;
|
|
* state transitions hit the repo on success/failure;
|
|
* the bus event publishes on success.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import os
|
|
import re
|
|
from typing import AsyncIterator
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from decnet.bus import topics
|
|
from decnet.bus.fake import FakeBus
|
|
from decnet.canary import CanaryArtifact
|
|
from decnet.canary import planter
|
|
from decnet.web.db.sqlite.repository import SQLiteRepository
|
|
import decnet.web.db.models # noqa: F401
|
|
|
|
|
|
class _FakeProc:
|
|
def __init__(self, rc: int, stdout: bytes = b"", stderr: bytes = b"") -> None:
|
|
self.returncode = rc
|
|
self._stdout = stdout
|
|
self._stderr = stderr
|
|
|
|
async def communicate(self) -> tuple[bytes, bytes]:
|
|
return self._stdout, self._stderr
|
|
|
|
def kill(self) -> None: # pragma: no cover — never reached in non-timeout tests
|
|
pass
|
|
|
|
|
|
def _patch_subprocess(rc: int = 0, stderr: bytes = b""):
|
|
captured: list[list[str]] = []
|
|
stdin_seen: list[bytes | None] = []
|
|
|
|
async def _fake(*argv, **kw):
|
|
captured.append(list(argv))
|
|
# Capture whatever bytes the planter would stream over stdin —
|
|
# the new contract pipes the base64 payload here instead of
|
|
# interpolating it into the sh script.
|
|
proc = _FakeProc(rc, b"", stderr)
|
|
orig = proc.communicate
|
|
|
|
async def communicate(input=None):
|
|
stdin_seen.append(input)
|
|
return await orig()
|
|
proc.communicate = communicate # type: ignore[assignment]
|
|
return proc
|
|
|
|
return patch.object(asyncio, "create_subprocess_exec", _fake), captured, stdin_seen
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def repo(tmp_path) -> AsyncIterator[SQLiteRepository]:
|
|
r = SQLiteRepository(str(tmp_path / "p.db"))
|
|
await r.initialize()
|
|
yield r
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def fake_bus() -> AsyncIterator[FakeBus]:
|
|
bus = FakeBus()
|
|
await bus.connect()
|
|
yield bus
|
|
await bus.close()
|
|
|
|
|
|
# ---------------- argv shape + base64 round-trip --------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plant_argv_and_base64_round_trip(repo: SQLiteRepository, fake_bus: FakeBus, tmp_path) -> None:
|
|
art = CanaryArtifact(
|
|
path="/home/admin/.aws/credentials",
|
|
content=b"\x00binary\xffpayload",
|
|
mode=0o600,
|
|
mtime_offset=-86400,
|
|
generator="aws_creds",
|
|
)
|
|
# Persist a token row so the state-update path has something to flip.
|
|
await repo.create_canary_token({
|
|
"uuid": "tok-1", "kind": "http", "decky_name": "web1",
|
|
"generator": "aws_creds", "placement_path": art.path,
|
|
"callback_token": "slug", "secret_seed": "s", "created_by": "u1",
|
|
})
|
|
patcher, captured, stdin_seen = _patch_subprocess(rc=0)
|
|
with patcher:
|
|
ok, err = await planter.plant(
|
|
"web1", art, token_uuid="tok-1", repo=repo, bus=fake_bus,
|
|
)
|
|
assert ok is True and err is None
|
|
assert len(captured) == 1
|
|
argv = captured[0]
|
|
# docker exec -i <container> sh -c <script>
|
|
assert argv[:4] == ["docker", "exec", "-i", "web1-ssh"]
|
|
assert argv[4:6] == ["sh", "-c"]
|
|
script = argv[6]
|
|
# The base64 payload is streamed via stdin, NOT interpolated into
|
|
# the script (would blow past ARG_MAX for any non-trivial blob).
|
|
assert stdin_seen[0] == base64.b64encode(art.content)
|
|
assert "base64 -d > /home/admin/.aws/credentials" in script
|
|
assert base64.b64encode(art.content).decode() not in script
|
|
# touch -d @<mtime> with negative offset → an int strictly less than now.
|
|
m = re.search(r"touch -d @(\d+) ", script)
|
|
assert m and int(m.group(1)) > 0
|
|
# State transitioned to planted.
|
|
row = await repo.get_canary_token("tok-1")
|
|
assert row["state"] == "planted" and row["last_error"] is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plant_records_failure_when_docker_returns_nonzero(repo: SQLiteRepository, fake_bus: FakeBus) -> None:
|
|
await repo.create_canary_token({
|
|
"uuid": "tok-2", "kind": "http", "decky_name": "web1",
|
|
"generator": "env_file", "placement_path": "/x",
|
|
"callback_token": "slug2", "secret_seed": "s", "created_by": "u1",
|
|
})
|
|
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
|
|
patcher, _argvs, _stdin = _patch_subprocess(rc=125, stderr=b"container not running")
|
|
with patcher:
|
|
ok, err = await planter.plant(
|
|
"web1", art, token_uuid="tok-2", repo=repo, bus=fake_bus,
|
|
)
|
|
assert ok is False
|
|
assert err and "not running" in err
|
|
row = await repo.get_canary_token("tok-2")
|
|
assert row["state"] == "failed" and row["last_error"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plant_rejects_empty_path(repo: SQLiteRepository) -> None:
|
|
await repo.create_canary_token({
|
|
"uuid": "tok-3", "kind": "http", "decky_name": "web1",
|
|
"generator": "env_file", "placement_path": "/x",
|
|
"callback_token": "slug3", "secret_seed": "s", "created_by": "u1",
|
|
})
|
|
art = CanaryArtifact(path="", content=b"y")
|
|
ok, err = await planter.plant("web1", art, token_uuid="tok-3", repo=repo)
|
|
assert ok is False and err is not None
|
|
row = await repo.get_canary_token("tok-3")
|
|
assert row["state"] == "failed"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plant_publishes_placed_event(repo: SQLiteRepository, fake_bus: FakeBus) -> None:
|
|
await repo.create_canary_token({
|
|
"uuid": "tok-4", "kind": "http", "decky_name": "web1",
|
|
"generator": "env_file", "placement_path": "/x",
|
|
"callback_token": "slug4", "secret_seed": "s", "created_by": "u1",
|
|
})
|
|
sub = fake_bus.subscribe("canary.>")
|
|
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
|
|
patcher, _argvs, _stdin = _patch_subprocess(rc=0)
|
|
with patcher:
|
|
await planter.plant(
|
|
"web1", art, token_uuid="tok-4", repo=repo, bus=fake_bus,
|
|
)
|
|
event = await asyncio.wait_for(sub.__anext__(), timeout=1.0)
|
|
assert event.topic == topics.canary("tok-4", topics.CANARY_PLACED)
|
|
assert event.payload["decky_name"] == "web1"
|
|
assert event.payload["generator"] == "env_file"
|
|
|
|
|
|
# ---------------- revoke --------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_unlinks_and_publishes(repo: SQLiteRepository, fake_bus: FakeBus) -> None:
|
|
await repo.create_canary_token({
|
|
"uuid": "tok-r", "kind": "http", "decky_name": "web1",
|
|
"generator": "env_file", "placement_path": "/etc/x.env",
|
|
"callback_token": "slugR", "secret_seed": "s", "created_by": "u1",
|
|
})
|
|
sub = fake_bus.subscribe("canary.>")
|
|
patcher, captured, _stdin = _patch_subprocess(rc=0)
|
|
with patcher:
|
|
ok, err = await planter.revoke(
|
|
"web1", "/etc/x.env",
|
|
token_uuid="tok-r", repo=repo, bus=fake_bus,
|
|
)
|
|
assert ok and not err
|
|
assert "rm -f /etc/x.env" in captured[0][5]
|
|
row = await repo.get_canary_token("tok-r")
|
|
assert row["state"] == "revoked"
|
|
event = await asyncio.wait_for(sub.__anext__(), timeout=1.0)
|
|
assert event.topic == topics.canary("tok-r", topics.CANARY_REVOKED)
|
|
|
|
|
|
# ---------------- seed_baseline ------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_seed_baseline_creates_one_token_per_generator(
|
|
repo: SQLiteRepository, fake_bus: FakeBus, monkeypatch
|
|
) -> None:
|
|
monkeypatch.setenv("DECNET_CANARY_BASELINE", "git_config,env_file,aws_creds")
|
|
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.test")
|
|
patcher, captured, _stdin = _patch_subprocess(rc=0)
|
|
with patcher:
|
|
rows = await planter.seed_baseline("web1", repo, bus=fake_bus)
|
|
assert {r["generator"] for r in rows} == {"git_config", "env_file", "aws_creds"}
|
|
# One docker exec per generator.
|
|
assert len(captured) == 3
|
|
# aws_creds ends up as kind=aws_passive; the other two are http.
|
|
by_gen = {r["generator"]: r for r in rows}
|
|
assert by_gen["aws_creds"]["kind"] == "aws_passive"
|
|
assert by_gen["env_file"]["kind"] == "http"
|
|
persisted = await repo.list_canary_tokens(decky_name="web1")
|
|
assert len(persisted) == 3
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_seed_baseline_skips_unknown_generator(repo: SQLiteRepository, monkeypatch) -> None:
|
|
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file,bogus")
|
|
patcher, _argvs, _stdin = _patch_subprocess(rc=0)
|
|
with patcher:
|
|
rows = await planter.seed_baseline("web1", repo)
|
|
assert {r["generator"] for r in rows} == {"env_file"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_seed_baseline_marks_failed_when_docker_errors(
|
|
repo: SQLiteRepository, monkeypatch
|
|
) -> None:
|
|
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file")
|
|
patcher, _argvs, _stdin = _patch_subprocess(rc=125, stderr=b"container down")
|
|
with patcher:
|
|
rows = await planter.seed_baseline("web1", repo)
|
|
assert len(rows) == 1
|
|
persisted = await repo.list_canary_tokens(decky_name="web1")
|
|
assert persisted[0]["state"] == "failed"
|
|
assert persisted[0]["last_error"]
|