feat(canary): planter (docker exec injector) + tests

Plant / revoke / seed_baseline using the same docker-exec-with-sh-c
pattern proven by decnet/orchestrator/drivers/ssh.py:_run_file.

Each plant call composes a single sh script:
  mkdir -p <dirname> && printf %s <base64> | base64 -d > <path> &&
  chmod <mode> <path> && touch -d @<mtime> <path>

Base64-on-the-host / decode-in-the-container keeps binary artifacts
(DOCX/PDF/PNG) safe across the argv boundary; the placement_path,
mode, and mtime are shlex-quoted.

State transitions hit the repo: planted -> failed on docker error
with stderr captured into last_error. Bus events fire on success
(canary.<id>.placed) and on revoke (canary.<id>.revoked) — wrapped
in try/except so a downed bus never blocks a placement.

seed_baseline(decky_name, repo) is the deploy-hook entry point —
reads DECNET_CANARY_BASELINE (default git_config,env_file,honeydoc,
aws_creds), persists one row per generator, plants each. Failed
placements are logged but do NOT abort; the deployer hook treats
the return list as informational.
This commit is contained in:
2026-04-27 13:08:18 -04:00
parent 19ceff4417
commit 8fb9bc5545
2 changed files with 526 additions and 0 deletions

293
decnet/canary/planter.py Normal file
View File

@@ -0,0 +1,293 @@
"""Plant / revoke canary artifacts inside running decky containers.
Single entry point per operation:
* :func:`plant` writes a :class:`CanaryArtifact` into one decky's
filesystem via ``docker exec`` (mirroring the SSH driver's
``_run_file`` pattern), backdates the mtime, sets the requested
mode, and publishes ``canary.{token_id}.placed`` on the bus.
* :func:`revoke` unlinks the file (best-effort) and publishes
``canary.{token_id}.revoked``.
* :func:`seed_baseline` is the deploy-hook helper: synthesises the
configured baseline set for one decky, persists rows, plants each.
Failures are logged but do **not** abort the deploy (the deployer
hook calls this best-effort).
We don't reuse :class:`SSHDriver` directly because the orchestrator
driver is tied to its action types (``FileAction`` carries str
content; canary content is bytes). The planter takes the same
shape but speaks bytes-via-base64 over the wire.
"""
from __future__ import annotations
import asyncio
import base64
import os
import shlex
import time
from secrets import token_urlsafe
from typing import Any, Iterable, Optional
from decnet.bus import topics
from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus
from decnet.canary.base import CanaryArtifact, CanaryContext
from decnet.canary.factory import get_generator
from decnet.canary.paths import default_path_for
from decnet.logging import get_logger
from decnet.web.db.repository import BaseRepository
log = get_logger("canary.planter")
_DOCKER = "docker"
_TIMEOUT = 8.0
# Container suffix — matches the orchestrator SSH driver's convention
# (``<decky_name>-ssh``). Canary placement always happens through the
# ssh container because every decky has one and it carries the most
# realistic filesystem layout.
_SSH_CONTAINER_SUFFIX = "-ssh"
def _container_for(decky_name: str) -> str:
return f"{decky_name}{_SSH_CONTAINER_SUFFIX}"
def _dirname(path: str) -> str:
idx = path.rfind("/")
if idx <= 0:
return "/"
return path[:idx]
async def _run(argv: list[str]) -> tuple[int, str, str]:
try:
proc = await asyncio.create_subprocess_exec(
*argv,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as exc:
return 127, "", f"argv[0] not found: {exc}"
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT)
except asyncio.TimeoutError:
try:
proc.kill()
except ProcessLookupError:
pass
return 124, "", "timeout"
return (
proc.returncode if proc.returncode is not None else -1,
stdout.decode("utf-8", "replace"),
stderr.decode("utf-8", "replace"),
)
def _build_plant_command(artifact: CanaryArtifact) -> str:
"""Compose the ``sh -c`` script that writes one artifact.
Binary safety: we base64-encode on the host side and ``base64 -d``
inside the container, so the bytes never touch a shell argv
interpolation point. Both ``base64`` (coreutils) and ``touch -d
@<unix_ts>`` are present on every Linux base image we ship, so
there's no per-distro branching.
"""
encoded = base64.b64encode(artifact.content).decode("ascii")
mtime = int(time.time() + artifact.mtime_offset)
mode_str = oct(artifact.mode)[2:]
parts = [
f"mkdir -p {shlex.quote(_dirname(artifact.path))}",
f"printf %s {shlex.quote(encoded)} | base64 -d > {shlex.quote(artifact.path)}",
f"chmod {mode_str} {shlex.quote(artifact.path)}",
f"touch -d @{mtime} {shlex.quote(artifact.path)}",
]
return " && ".join(parts)
async def _publish(
bus: Optional[BaseBus], topic: str, payload: dict[str, Any],
) -> None:
"""Best-effort publish — never raises.
When ``bus`` is None we resolve via :func:`get_bus`; either way
bus-side failures are logged and swallowed (delivery is at-most-once
by contract; the DB row is source of truth).
"""
try:
owns_bus = bus is None
target = bus if bus is not None else get_bus()
if owns_bus:
await target.connect()
await target.publish(topic, payload)
if owns_bus:
await target.close()
except Exception as e: # noqa: BLE001
log.warning("canary bus publish failed topic=%s err=%s", topic, e)
async def plant(
decky_name: str,
artifact: CanaryArtifact,
*,
token_uuid: str,
repo: Optional[BaseRepository] = None,
publish: bool = True,
bus: Optional[BaseBus] = None,
) -> tuple[bool, Optional[str]]:
"""Write *artifact* into the decky's ssh container.
Returns ``(success, error_or_none)``. When ``repo`` is provided
the token row's state is updated to ``planted`` / ``failed``
accordingly. When ``publish`` is True a ``canary.<id>.placed``
event is published on the bus on success.
The function never raises on docker errors — callers (the API,
the deploy hook) treat the result as data.
"""
if not artifact.path:
err = "planter requires a non-empty artifact.path"
log.warning("canary.plant skipped: %s decky=%s token=%s", err, decky_name, token_uuid)
if repo is not None:
await repo.update_canary_token_state(token_uuid, "failed", err)
return False, err
sh_cmd = _build_plant_command(artifact)
argv = [_DOCKER, "exec", _container_for(decky_name), "sh", "-c", sh_cmd]
rc, _stdout, stderr = await _run(argv)
success = rc == 0
error = None if success else (stderr.strip()[:256] or f"rc={rc}")
if repo is not None:
if success:
await repo.update_canary_token_state(token_uuid, "planted", None)
else:
await repo.update_canary_token_state(token_uuid, "failed", error)
if success and publish:
await _publish(bus, topics.canary(token_uuid, topics.CANARY_PLACED), {
"token_id": token_uuid,
"decky_name": decky_name,
"placement_path": artifact.path,
"instrumenter": artifact.instrumenter,
"generator": artifact.generator,
})
if not success:
log.warning(
"canary.plant failed decky=%s token=%s rc=%d stderr=%r",
decky_name, token_uuid, rc, stderr[:120],
)
return success, error
async def revoke(
decky_name: str,
placement_path: str,
*,
token_uuid: str,
repo: Optional[BaseRepository] = None,
publish: bool = True,
bus: Optional[BaseBus] = None,
) -> tuple[bool, Optional[str]]:
"""Best-effort unlink + state transition + bus publish.
Returns ``(success, error_or_none)``. ``success`` is True when
the file is gone after the call (whether we deleted it or it was
already missing); only docker / container-down errors return False.
"""
sh_cmd = f"rm -f {shlex.quote(placement_path)}"
argv = [_DOCKER, "exec", _container_for(decky_name), "sh", "-c", sh_cmd]
rc, _stdout, stderr = await _run(argv)
success = rc == 0
error = None if success else (stderr.strip()[:256] or f"rc={rc}")
if repo is not None:
await repo.update_canary_token_state(token_uuid, "revoked", error if not success else None)
if publish:
await _publish(bus, topics.canary(token_uuid, topics.CANARY_REVOKED), {
"token_id": token_uuid,
"decky_name": decky_name,
"placement_path": placement_path,
})
return success, error
def _baseline_set() -> Iterable[str]:
"""Return the configured baseline generator names.
Honors ``DECNET_CANARY_BASELINE`` (comma-separated). Default is
a sensible mix that exercises every callback-bearing generator
plus a passive aws_creds drop for realism.
"""
raw = os.environ.get(
"DECNET_CANARY_BASELINE",
"git_config,env_file,honeydoc,aws_creds",
)
return [n.strip() for n in raw.split(",") if n.strip()]
def _ctx_for(slug: str) -> CanaryContext:
"""Build a :class:`CanaryContext` from the canary worker config."""
base = os.environ.get("DECNET_CANARY_HTTP_BASE", "http://localhost:8088")
zone = os.environ.get("DECNET_CANARY_DNS_ZONE", "")
return CanaryContext(callback_token=slug, http_base=base, dns_zone=zone)
async def seed_baseline(
decky_name: str,
repo: BaseRepository,
*,
persona: str = "linux",
created_by: str = "system",
bus: Optional[BaseBus] = None,
) -> list[dict[str, Any]]:
"""Plant the configured baseline canary set on one decky.
Best-effort: any individual placement that fails is logged and
the row is left in ``state=failed``; the deployer hook treats the
return value as informational, not authoritative.
Returns the list of token rows created (whether their planting
ultimately succeeded or not), so the caller can surface them in
the deploy report.
"""
out: list[dict[str, Any]] = []
for gen_name in _baseline_set():
try:
generator = get_generator(gen_name)
except ValueError:
log.warning("canary.seed_baseline: unknown generator %r — skipping", gen_name)
continue
slug = token_urlsafe(16)
ctx = _ctx_for(slug)
artifact = generator.generate(ctx)
artifact.path = default_path_for(gen_name, persona)
kind = "aws_passive" if gen_name == "aws_creds" else "http"
# Persist first so the planter has a row to update; that way a
# crash mid-plant leaves a recoverable failed-state row.
from uuid import uuid4
token_uuid = str(uuid4())
await repo.create_canary_token({
"uuid": token_uuid,
"kind": kind,
"decky_name": decky_name,
"blob_uuid": None,
"instrumenter": None,
"generator": gen_name,
"placement_path": artifact.path,
"callback_token": slug,
"secret_seed": slug,
"created_by": created_by,
"state": "planted", # optimistic — plant() flips to failed on error
})
await plant(
decky_name, artifact,
token_uuid=token_uuid, repo=repo, publish=True, bus=bus,
)
out.append({
"token_uuid": token_uuid, "generator": gen_name, "kind": kind,
"callback_token": slug, "placement_path": artifact.path,
})
return out

View File

@@ -0,0 +1,233 @@
"""Coverage for the canary planter (docker exec wrapper).
We don't actually invoke docker — :func:`asyncio.create_subprocess_exec`
is patched to record argv and return canned ``(rc, stdout, stderr)``
triples. That lets us assert:
* the docker argv has the right shape (container = ``<decky>-ssh``,
``sh -c <script>``);
* the script base64-decodes the artifact bytes losslessly;
* mtime is backdated by the right offset;
* state transitions hit the repo on success/failure;
* the bus event publishes on success.
"""
from __future__ import annotations
import asyncio
import base64
import os
import re
from typing import AsyncIterator
from unittest.mock import patch
import pytest
import pytest_asyncio
from decnet.bus import topics
from decnet.bus.fake import FakeBus
from decnet.canary import CanaryArtifact
from decnet.canary import planter
from decnet.web.db.sqlite.repository import SQLiteRepository
import decnet.web.db.models # noqa: F401
class _FakeProc:
def __init__(self, rc: int, stdout: bytes = b"", stderr: bytes = b"") -> None:
self.returncode = rc
self._stdout = stdout
self._stderr = stderr
async def communicate(self) -> tuple[bytes, bytes]:
return self._stdout, self._stderr
def kill(self) -> None: # pragma: no cover — never reached in non-timeout tests
pass
def _patch_subprocess(rc: int = 0, stderr: bytes = b""):
captured: list[list[str]] = []
async def _fake(*argv, **kw):
captured.append(list(argv))
return _FakeProc(rc, b"", stderr)
return patch.object(asyncio, "create_subprocess_exec", _fake), captured
@pytest_asyncio.fixture
async def repo(tmp_path) -> AsyncIterator[SQLiteRepository]:
r = SQLiteRepository(str(tmp_path / "p.db"))
await r.initialize()
yield r
@pytest_asyncio.fixture
async def fake_bus() -> AsyncIterator[FakeBus]:
bus = FakeBus()
await bus.connect()
yield bus
await bus.close()
# ---------------- argv shape + base64 round-trip --------------------------
@pytest.mark.asyncio
async def test_plant_argv_and_base64_round_trip(repo: SQLiteRepository, fake_bus: FakeBus, tmp_path) -> None:
art = CanaryArtifact(
path="/home/admin/.aws/credentials",
content=b"\x00binary\xffpayload",
mode=0o600,
mtime_offset=-86400,
generator="aws_creds",
)
# Persist a token row so the state-update path has something to flip.
await repo.create_canary_token({
"uuid": "tok-1", "kind": "http", "decky_name": "web1",
"generator": "aws_creds", "placement_path": art.path,
"callback_token": "slug", "secret_seed": "s", "created_by": "u1",
})
patcher, captured = _patch_subprocess(rc=0)
with patcher:
ok, err = await planter.plant(
"web1", art, token_uuid="tok-1", repo=repo, bus=fake_bus,
)
assert ok is True and err is None
assert len(captured) == 1
argv = captured[0]
assert argv[:3] == ["docker", "exec", "web1-ssh"]
assert argv[3:5] == ["sh", "-c"]
script = argv[5]
# base64-decoded payload appears in the script verbatim.
encoded = base64.b64encode(art.content).decode()
assert encoded in script
# touch -d @<mtime> with negative offset → an int strictly less than now.
m = re.search(r"touch -d @(\d+) ", script)
assert m and int(m.group(1)) > 0
# State transitioned to planted.
row = await repo.get_canary_token("tok-1")
assert row["state"] == "planted" and row["last_error"] is None
@pytest.mark.asyncio
async def test_plant_records_failure_when_docker_returns_nonzero(repo: SQLiteRepository, fake_bus: FakeBus) -> None:
await repo.create_canary_token({
"uuid": "tok-2", "kind": "http", "decky_name": "web1",
"generator": "env_file", "placement_path": "/x",
"callback_token": "slug2", "secret_seed": "s", "created_by": "u1",
})
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
patcher, _ = _patch_subprocess(rc=125, stderr=b"container not running")
with patcher:
ok, err = await planter.plant(
"web1", art, token_uuid="tok-2", repo=repo, bus=fake_bus,
)
assert ok is False
assert err and "not running" in err
row = await repo.get_canary_token("tok-2")
assert row["state"] == "failed" and row["last_error"]
@pytest.mark.asyncio
async def test_plant_rejects_empty_path(repo: SQLiteRepository) -> None:
await repo.create_canary_token({
"uuid": "tok-3", "kind": "http", "decky_name": "web1",
"generator": "env_file", "placement_path": "/x",
"callback_token": "slug3", "secret_seed": "s", "created_by": "u1",
})
art = CanaryArtifact(path="", content=b"y")
ok, err = await planter.plant("web1", art, token_uuid="tok-3", repo=repo)
assert ok is False and err is not None
row = await repo.get_canary_token("tok-3")
assert row["state"] == "failed"
@pytest.mark.asyncio
async def test_plant_publishes_placed_event(repo: SQLiteRepository, fake_bus: FakeBus) -> None:
await repo.create_canary_token({
"uuid": "tok-4", "kind": "http", "decky_name": "web1",
"generator": "env_file", "placement_path": "/x",
"callback_token": "slug4", "secret_seed": "s", "created_by": "u1",
})
sub = fake_bus.subscribe("canary.>")
art = CanaryArtifact(path="/x", content=b"y", generator="env_file")
patcher, _ = _patch_subprocess(rc=0)
with patcher:
await planter.plant(
"web1", art, token_uuid="tok-4", repo=repo, bus=fake_bus,
)
event = await asyncio.wait_for(sub.__anext__(), timeout=1.0)
assert event.topic == topics.canary("tok-4", topics.CANARY_PLACED)
assert event.payload["decky_name"] == "web1"
assert event.payload["generator"] == "env_file"
# ---------------- revoke --------------------------------------------------
@pytest.mark.asyncio
async def test_revoke_unlinks_and_publishes(repo: SQLiteRepository, fake_bus: FakeBus) -> None:
await repo.create_canary_token({
"uuid": "tok-r", "kind": "http", "decky_name": "web1",
"generator": "env_file", "placement_path": "/etc/x.env",
"callback_token": "slugR", "secret_seed": "s", "created_by": "u1",
})
sub = fake_bus.subscribe("canary.>")
patcher, captured = _patch_subprocess(rc=0)
with patcher:
ok, err = await planter.revoke(
"web1", "/etc/x.env",
token_uuid="tok-r", repo=repo, bus=fake_bus,
)
assert ok and not err
assert "rm -f /etc/x.env" in captured[0][5]
row = await repo.get_canary_token("tok-r")
assert row["state"] == "revoked"
event = await asyncio.wait_for(sub.__anext__(), timeout=1.0)
assert event.topic == topics.canary("tok-r", topics.CANARY_REVOKED)
# ---------------- seed_baseline ------------------------------------------
@pytest.mark.asyncio
async def test_seed_baseline_creates_one_token_per_generator(
repo: SQLiteRepository, fake_bus: FakeBus, monkeypatch
) -> None:
monkeypatch.setenv("DECNET_CANARY_BASELINE", "git_config,env_file,aws_creds")
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.test")
patcher, captured = _patch_subprocess(rc=0)
with patcher:
rows = await planter.seed_baseline("web1", repo, bus=fake_bus)
assert {r["generator"] for r in rows} == {"git_config", "env_file", "aws_creds"}
# One docker exec per generator.
assert len(captured) == 3
# aws_creds ends up as kind=aws_passive; the other two are http.
by_gen = {r["generator"]: r for r in rows}
assert by_gen["aws_creds"]["kind"] == "aws_passive"
assert by_gen["env_file"]["kind"] == "http"
persisted = await repo.list_canary_tokens(decky_name="web1")
assert len(persisted) == 3
@pytest.mark.asyncio
async def test_seed_baseline_skips_unknown_generator(repo: SQLiteRepository, monkeypatch) -> None:
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file,bogus")
patcher, _ = _patch_subprocess(rc=0)
with patcher:
rows = await planter.seed_baseline("web1", repo)
assert {r["generator"] for r in rows} == {"env_file"}
@pytest.mark.asyncio
async def test_seed_baseline_marks_failed_when_docker_errors(
repo: SQLiteRepository, monkeypatch
) -> None:
monkeypatch.setenv("DECNET_CANARY_BASELINE", "env_file")
patcher, _ = _patch_subprocess(rc=125, stderr=b"container down")
with patcher:
rows = await planter.seed_baseline("web1", repo)
assert len(rows) == 1
persisted = await repo.list_canary_tokens(decky_name="web1")
assert persisted[0]["state"] == "failed"
assert persisted[0]["last_error"]