Topology deploys now plant the configured canary baseline set on every decky in the topology, mirroring the fleet-deploy hook. Containers are resolved via resolve_topology_container — <decky>-ssh when the decky exposes an ssh service, else the topology base container decnet_t_<id8>_<decky>. The planter's plant/revoke/seed_baseline grow an optional container= kwarg; default preserves the fleet <name>-ssh resolution.
372 lines
13 KiB
Python
372 lines
13 KiB
Python
"""Plant / revoke canary artifacts inside running decky containers.
|
|
|
|
Single entry point per operation:
|
|
|
|
* :func:`plant` writes a :class:`CanaryArtifact` into one decky's
|
|
filesystem via ``docker exec`` (mirroring the SSH driver's
|
|
``_run_file`` pattern), backdates the mtime, sets the requested
|
|
mode, and publishes ``canary.{token_id}.placed`` on the bus.
|
|
* :func:`revoke` unlinks the file (best-effort) and publishes
|
|
``canary.{token_id}.revoked``.
|
|
* :func:`seed_baseline` is the deploy-hook helper: synthesises the
|
|
configured baseline set for one decky, persists rows, plants each.
|
|
Failures are logged but do **not** abort the deploy (the deployer
|
|
hook calls this best-effort).
|
|
|
|
We don't reuse :class:`SSHDriver` directly because the orchestrator
|
|
driver is tied to its action types (``FileAction`` carries str
|
|
content; canary content is bytes). The planter takes the same
|
|
shape but speaks bytes-via-base64 over the wire.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import os
|
|
import shlex
|
|
import time
|
|
from secrets import token_urlsafe
|
|
from typing import Any, Iterable, Optional
|
|
|
|
from decnet.bus import topics
|
|
from decnet.bus.base import BaseBus
|
|
from decnet.bus.factory import get_bus
|
|
from decnet.canary.base import CanaryArtifact, CanaryContext
|
|
from decnet.canary.factory import get_generator
|
|
from decnet.canary.paths import default_path_for
|
|
from decnet.logging import get_logger
|
|
from decnet.web.db.repository import BaseRepository
|
|
|
|
log = get_logger("canary.planter")
|
|
|
|
_DOCKER = "docker"
|
|
_TIMEOUT = 8.0
|
|
# Container suffix — matches the orchestrator SSH driver's convention
|
|
# (``<decky_name>-ssh``). Canary placement always happens through the
|
|
# ssh container because every decky has one and it carries the most
|
|
# realistic filesystem layout.
|
|
_SSH_CONTAINER_SUFFIX = "-ssh"
|
|
|
|
|
|
def _container_for(decky_name: str) -> str:
|
|
return f"{decky_name}{_SSH_CONTAINER_SUFFIX}"
|
|
|
|
|
|
def resolve_topology_container(
|
|
topology_id: str, decky_name: str, services: Iterable[str],
|
|
) -> str:
|
|
"""Container name to docker-exec into for a MazeNET decky.
|
|
|
|
The ssh service container (when present) wins because it carries the
|
|
most realistic filesystem layout — same rationale as the fleet path.
|
|
Otherwise we target the base container, whose name is set by
|
|
:func:`decnet.topology.compose._container_name`.
|
|
"""
|
|
if "ssh" in set(services):
|
|
return f"{decky_name}{_SSH_CONTAINER_SUFFIX}"
|
|
return f"decnet_t_{topology_id[:8]}_{decky_name}"
|
|
|
|
|
|
def _dirname(path: str) -> str:
|
|
idx = path.rfind("/")
|
|
if idx <= 0:
|
|
return "/"
|
|
return path[:idx]
|
|
|
|
|
|
async def _run(
|
|
argv: list[str], *, stdin_bytes: Optional[bytes] = None,
|
|
) -> tuple[int, str, str]:
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*argv,
|
|
stdin=asyncio.subprocess.PIPE if stdin_bytes is not None else None,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
except FileNotFoundError as exc:
|
|
return 127, "", f"argv[0] not found: {exc}"
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(
|
|
proc.communicate(input=stdin_bytes), timeout=_TIMEOUT,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
try:
|
|
proc.kill()
|
|
except ProcessLookupError:
|
|
pass
|
|
return 124, "", "timeout"
|
|
return (
|
|
proc.returncode if proc.returncode is not None else -1,
|
|
stdout.decode("utf-8", "replace"),
|
|
stderr.decode("utf-8", "replace"),
|
|
)
|
|
|
|
|
|
def _build_plant_command(artifact: CanaryArtifact) -> tuple[str, bytes]:
|
|
"""Compose the ``sh -c`` script + stdin payload for one artifact.
|
|
|
|
Binary safety: we base64-encode on the host and stream the result
|
|
over stdin to ``base64 -d`` inside the container, so the bytes
|
|
never touch the argv (kernel ARG_MAX would reject anything larger
|
|
than ~128KB-2MB depending on the host). Both ``base64`` (coreutils)
|
|
and ``touch -d @<unix_ts>`` are present on every Linux base image
|
|
we ship, so there's no per-distro branching.
|
|
"""
|
|
encoded = base64.b64encode(artifact.content)
|
|
mtime = int(time.time() + artifact.mtime_offset)
|
|
mode_str = oct(artifact.mode)[2:]
|
|
parts = [
|
|
f"mkdir -p {shlex.quote(_dirname(artifact.path))}",
|
|
f"base64 -d > {shlex.quote(artifact.path)}",
|
|
f"chmod {mode_str} {shlex.quote(artifact.path)}",
|
|
f"touch -d @{mtime} {shlex.quote(artifact.path)}",
|
|
]
|
|
return " && ".join(parts), encoded
|
|
|
|
|
|
async def _publish(
|
|
bus: Optional[BaseBus], topic: str, payload: dict[str, Any],
|
|
) -> None:
|
|
"""Best-effort publish — never raises.
|
|
|
|
When ``bus`` is None we resolve via :func:`get_bus`; either way
|
|
bus-side failures are logged and swallowed (delivery is at-most-once
|
|
by contract; the DB row is source of truth).
|
|
"""
|
|
try:
|
|
owns_bus = bus is None
|
|
target = bus if bus is not None else get_bus()
|
|
if owns_bus:
|
|
await target.connect()
|
|
await target.publish(topic, payload)
|
|
if owns_bus:
|
|
await target.close()
|
|
except Exception as e: # noqa: BLE001
|
|
log.warning("canary bus publish failed topic=%s err=%s", topic, e)
|
|
|
|
|
|
async def plant(
|
|
decky_name: str,
|
|
artifact: CanaryArtifact,
|
|
*,
|
|
token_uuid: str,
|
|
repo: Optional[BaseRepository] = None,
|
|
publish: bool = True,
|
|
bus: Optional[BaseBus] = None,
|
|
container: Optional[str] = None,
|
|
) -> tuple[bool, Optional[str]]:
|
|
"""Write *artifact* into the decky's ssh container.
|
|
|
|
Returns ``(success, error_or_none)``. When ``repo`` is provided
|
|
the token row's state is updated to ``planted`` / ``failed``
|
|
accordingly. When ``publish`` is True a ``canary.<id>.placed``
|
|
event is published on the bus on success.
|
|
|
|
The function never raises on docker errors — callers (the API,
|
|
the deploy hook) treat the result as data.
|
|
"""
|
|
if not artifact.path:
|
|
err = "planter requires a non-empty artifact.path"
|
|
log.warning("canary.plant skipped: %s decky=%s token=%s", err, decky_name, token_uuid)
|
|
if repo is not None:
|
|
await repo.update_canary_token_state(token_uuid, "failed", err)
|
|
return False, err
|
|
|
|
sh_cmd, stdin_payload = _build_plant_command(artifact)
|
|
target_container = container or _container_for(decky_name)
|
|
# ``-i`` keeps stdin attached so base64 -d inside the container can
|
|
# consume the encoded payload streamed from the host.
|
|
argv = [_DOCKER, "exec", "-i", target_container, "sh", "-c", sh_cmd]
|
|
rc, _stdout, stderr = await _run(argv, stdin_bytes=stdin_payload)
|
|
success = rc == 0
|
|
error = None if success else (stderr.strip()[:256] or f"rc={rc}")
|
|
|
|
if repo is not None:
|
|
if success:
|
|
await repo.update_canary_token_state(token_uuid, "planted", None)
|
|
else:
|
|
await repo.update_canary_token_state(token_uuid, "failed", error)
|
|
|
|
if success and publish:
|
|
await _publish(bus, topics.canary(token_uuid, topics.CANARY_PLACED), {
|
|
"token_id": token_uuid,
|
|
"decky_name": decky_name,
|
|
"placement_path": artifact.path,
|
|
"instrumenter": artifact.instrumenter,
|
|
"generator": artifact.generator,
|
|
})
|
|
|
|
if not success:
|
|
log.warning(
|
|
"canary.plant failed decky=%s token=%s rc=%d stderr=%r",
|
|
decky_name, token_uuid, rc, stderr[:120],
|
|
)
|
|
return success, error
|
|
|
|
|
|
async def revoke(
|
|
decky_name: str,
|
|
placement_path: str,
|
|
*,
|
|
token_uuid: str,
|
|
repo: Optional[BaseRepository] = None,
|
|
publish: bool = True,
|
|
bus: Optional[BaseBus] = None,
|
|
container: Optional[str] = None,
|
|
) -> tuple[bool, Optional[str]]:
|
|
"""Best-effort unlink + state transition + bus publish.
|
|
|
|
Returns ``(success, error_or_none)``. ``success`` is True when
|
|
the file is gone after the call (whether we deleted it or it was
|
|
already missing); only docker / container-down errors return False.
|
|
"""
|
|
sh_cmd = f"rm -f {shlex.quote(placement_path)}"
|
|
target_container = container or _container_for(decky_name)
|
|
argv = [_DOCKER, "exec", target_container, "sh", "-c", sh_cmd]
|
|
rc, _stdout, stderr = await _run(argv)
|
|
success = rc == 0
|
|
error = None if success else (stderr.strip()[:256] or f"rc={rc}")
|
|
|
|
if repo is not None:
|
|
await repo.update_canary_token_state(token_uuid, "revoked", error if not success else None)
|
|
|
|
if publish:
|
|
await _publish(bus, topics.canary(token_uuid, topics.CANARY_REVOKED), {
|
|
"token_id": token_uuid,
|
|
"decky_name": decky_name,
|
|
"placement_path": placement_path,
|
|
})
|
|
|
|
return success, error
|
|
|
|
|
|
def _baseline_set() -> Iterable[str]:
|
|
"""Return the configured baseline generator names.
|
|
|
|
Honors ``DECNET_CANARY_BASELINE`` (comma-separated). Default is
|
|
a sensible mix that exercises every callback-bearing generator
|
|
plus a passive aws_creds drop for realism.
|
|
"""
|
|
raw = os.environ.get(
|
|
"DECNET_CANARY_BASELINE",
|
|
"git_config,env_file,honeydoc,aws_creds",
|
|
)
|
|
return [n.strip() for n in raw.split(",") if n.strip()]
|
|
|
|
|
|
def _ctx_for(slug: str) -> CanaryContext:
|
|
"""Build a :class:`CanaryContext` from the canary worker config."""
|
|
base = os.environ.get("DECNET_CANARY_HTTP_BASE", "http://localhost:8088")
|
|
zone = os.environ.get("DECNET_CANARY_DNS_ZONE", "")
|
|
return CanaryContext(callback_token=slug, http_base=base, dns_zone=zone)
|
|
|
|
|
|
async def seed_baseline(
|
|
decky_name: str,
|
|
repo: BaseRepository,
|
|
*,
|
|
persona: str = "linux",
|
|
created_by: str = "system",
|
|
bus: Optional[BaseBus] = None,
|
|
container: Optional[str] = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Plant the configured baseline canary set on one decky.
|
|
|
|
Best-effort: any individual placement that fails is logged and
|
|
the row is left in ``state=failed``; the deployer hook treats the
|
|
return value as informational, not authoritative.
|
|
|
|
Returns the list of token rows created (whether their planting
|
|
ultimately succeeded or not), so the caller can surface them in
|
|
the deploy report.
|
|
"""
|
|
out: list[dict[str, Any]] = []
|
|
for gen_name in _baseline_set():
|
|
try:
|
|
generator = get_generator(gen_name)
|
|
except ValueError:
|
|
log.warning("canary.seed_baseline: unknown generator %r — skipping", gen_name)
|
|
continue
|
|
slug = token_urlsafe(16)
|
|
ctx = _ctx_for(slug)
|
|
artifact = generator.generate(ctx)
|
|
artifact.path = default_path_for(gen_name, persona)
|
|
kind = "aws_passive" if gen_name == "aws_creds" else "http"
|
|
# Persist first so the planter has a row to update; that way a
|
|
# crash mid-plant leaves a recoverable failed-state row.
|
|
from uuid import uuid4
|
|
token_uuid = str(uuid4())
|
|
await repo.create_canary_token({
|
|
"uuid": token_uuid,
|
|
"kind": kind,
|
|
"decky_name": decky_name,
|
|
"blob_uuid": None,
|
|
"instrumenter": None,
|
|
"generator": gen_name,
|
|
"placement_path": artifact.path,
|
|
"callback_token": slug,
|
|
"secret_seed": slug,
|
|
"created_by": created_by,
|
|
"state": "planted", # optimistic — plant() flips to failed on error
|
|
})
|
|
await plant(
|
|
decky_name, artifact,
|
|
token_uuid=token_uuid, repo=repo, publish=True, bus=bus,
|
|
container=container,
|
|
)
|
|
out.append({
|
|
"token_uuid": token_uuid, "generator": gen_name, "kind": kind,
|
|
"callback_token": slug, "placement_path": artifact.path,
|
|
})
|
|
return out
|
|
|
|
|
|
async def seed_baseline_topology(
|
|
repo: BaseRepository,
|
|
topology_id: str,
|
|
*,
|
|
created_by: str = "system",
|
|
bus: Optional[BaseBus] = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Plant baseline canaries on every decky in a MazeNET topology.
|
|
|
|
Mirrors :func:`seed_baseline` for the topology path. Container name
|
|
resolution uses :func:`resolve_topology_container` since topology
|
|
deckies may not have an ssh service — in that case we target the
|
|
base container instead.
|
|
|
|
Best-effort: failures on any single decky are logged inside
|
|
:func:`plant`; the deploy hook treats the return value as
|
|
informational. Returns a flat list of per-token dicts (with an added
|
|
``decky_name`` key) across all deckies.
|
|
"""
|
|
from decnet.topology.persistence import hydrate
|
|
|
|
hydrated = await hydrate(repo, topology_id)
|
|
if hydrated is None:
|
|
log.warning(
|
|
"canary.seed_baseline_topology: topology %s not found", topology_id,
|
|
)
|
|
return []
|
|
|
|
out: list[dict[str, Any]] = []
|
|
for decky in hydrated["deckies"]:
|
|
cfg = decky.get("decky_config") or {}
|
|
decky_name = cfg.get("name") or decky.get("name")
|
|
if not decky_name:
|
|
continue
|
|
services = decky.get("services") or []
|
|
container = resolve_topology_container(topology_id, decky_name, services)
|
|
# MazeNET deckies don't carry an OS persona today; default to
|
|
# linux (every base image we ship is Linux).
|
|
rows = await seed_baseline(
|
|
decky_name, repo,
|
|
persona="linux", created_by=created_by, bus=bus,
|
|
container=container,
|
|
)
|
|
for r in rows:
|
|
r["decky_name"] = decky_name
|
|
out.append(r)
|
|
return out
|