diff --git a/decnet/orchestrator/drivers/__init__.py b/decnet/orchestrator/drivers/__init__.py index cdfb8d0d..5d214c24 100644 --- a/decnet/orchestrator/drivers/__init__.py +++ b/decnet/orchestrator/drivers/__init__.py @@ -1,5 +1,74 @@ -"""Activity drivers for the orchestrator (MVP: SSH only).""" -from decnet.orchestrator.drivers.base import ActivityResult, Driver -from decnet.orchestrator.drivers.ssh import SSHDriver +"""Activity drivers for the orchestrator. -__all__ = ["ActivityResult", "Driver", "SSHDriver"] +Concrete drivers register dispatch in :func:`get_driver_for`. Same +lazy-import pattern as :mod:`decnet.canary.factory`: the import-time +cost of :mod:`decnet.orchestrator.drivers` stays low for callers that +only need :class:`ActivityResult` / :class:`ActivityDriver`. +""" +from __future__ import annotations + +from decnet.orchestrator.drivers.base import ( + ActivityDriver, + ActivityResult, + Driver, +) +from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction + +__all__ = [ + "ActivityDriver", + "ActivityResult", + "Driver", + "SSHDriver", + "get_driver_for", +] + + +def __getattr__(name: str): # pragma: no cover - import passthrough + """Lazy access to concrete drivers. + + Avoids dragging the docker-exec / email-driver code into every + consumer that only needs the ABC. + """ + if name == "SSHDriver": + from decnet.orchestrator.drivers.ssh import SSHDriver + return SSHDriver + if name == "EmailDriver": + from decnet.orchestrator.drivers.email import EmailDriver + return EmailDriver + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + +def get_driver_for(action: Action) -> ActivityDriver: + """Return the concrete driver that handles *action*. + + Stage 4 of the realism migration adds this seam so the orchestrator + worker can dispatch by action type without an isinstance chain in + ``_one_tick``. Stage 5 wires the worker to call this function + instead of holding a single ``SSHDriver`` instance. + + The set of action shapes the orchestrator can plan grows with the + migration: + + * :class:`TrafficAction` / :class:`FileAction` → :class:`SSHDriver` + * :class:`EmailAction` (post-stage-5) → ``EmailDriver`` + * :class:`EditAction` (post-stage-3b) → :class:`SSHDriver` + """ + # Lazy imports keep the side-effecting docker-exec / email-driver + # modules out of every importer's graph. + from decnet.orchestrator.drivers.ssh import SSHDriver + + if isinstance(action, (TrafficAction, FileAction)): + return SSHDriver() + # EmailAction lands in stage 5; reachable only after that import is + # added to scheduler. Importing inside the branch avoids a cycle + # with realism.llm at module load time. + try: + from decnet.orchestrator.emailgen.scheduler import EmailAction + except ImportError: # pragma: no cover - scheduler always exists + EmailAction = None # type: ignore[assignment] + if EmailAction is not None and isinstance(action, EmailAction): + from decnet.orchestrator.drivers.email import EmailDriver + return EmailDriver() + raise TypeError( + f"no driver registered for action type {type(action).__name__}" + ) diff --git a/decnet/orchestrator/drivers/base.py b/decnet/orchestrator/drivers/base.py index cbad4db3..9d14abc8 100644 --- a/decnet/orchestrator/drivers/base.py +++ b/decnet/orchestrator/drivers/base.py @@ -1,13 +1,27 @@ -"""Driver protocol for orchestrator actions. +"""Driver ABC for orchestrator actions. -Future protocols (HTTP, SMB, MySQL, …) implement this interface alongside -the SSH driver. Kept deliberately minimal — the orchestrator only needs -"run this action and tell me how it went". +Each concrete driver (SSH, Email, future HTTP/SMB/MySQL) maps one +:class:`~decnet.orchestrator.scheduler.Action` shape to a side effect +on a target decky and returns an :class:`ActivityResult` the +orchestrator persists. + +The ABC lives here, the dispatch factory in +:mod:`decnet.orchestrator.drivers` ``__init__``, and the impls in +sibling modules — same pattern as :mod:`decnet.canary.factory`, +:mod:`decnet.web.db.factory`, and :mod:`decnet.bus.factory`. + +Why ABC and not :class:`Protocol`: drivers also expose lower-level +helpers (``plant_file``, ``read_file``) that the planner-driven +realism path will call directly without going through ``run``. +Inheritance pins the contract for those helpers; a structural +protocol would let a typo silently produce a half-implemented driver. """ from __future__ import annotations +from abc import ABC, abstractmethod from dataclasses import dataclass, field -from typing import Any, Protocol +from datetime import datetime +from typing import Any from decnet.orchestrator.scheduler import Action @@ -16,12 +30,63 @@ from decnet.orchestrator.scheduler import Action class ActivityResult: """Outcome of one driver invocation. - ``payload`` is the per-action JSON envelope the worker writes to the - ``OrchestratorEvent.payload`` column and to the bus event body. + ``payload`` is the per-action JSON envelope the worker writes to + the ``OrchestratorEvent.payload`` column and to the bus event + body. """ success: bool payload: dict[str, Any] = field(default_factory=dict) -class Driver(Protocol): - async def run(self, action: Action) -> ActivityResult: ... +class ActivityDriver(ABC): + """Base class every concrete orchestrator driver inherits. + + Subclasses MUST implement :meth:`run` — the action-shape dispatch. + Subclasses that interact with files on the target decky SHOULD + implement :meth:`plant_file` and :meth:`read_file` so the realism + edit-in-place path can read existing artifacts before mutating + them. Drivers that don't touch files (e.g. a future pure-traffic + driver) raise :class:`NotImplementedError` from those, and the + planner avoids picking ``EditAction`` for them. + """ + + @abstractmethod + async def run(self, action: Action) -> ActivityResult: + """Execute the action against its target decky.""" + + async def plant_file( + self, + decky_name: str, + path: str, + content: bytes, + *, + mode: int = 0o600, + mtime: datetime | None = None, + ) -> ActivityResult: + """Write *content* to *path* inside *decky_name*. + + Default raises :class:`NotImplementedError`; concrete drivers + that have a write transport (docker exec, ssh, etc.) override. + Bytes-typed so binary artifacts (DOCX/PDF) survive the wire. + """ + raise NotImplementedError( + f"{type(self).__name__} does not support plant_file" + ) + + async def read_file(self, decky_name: str, path: str) -> bytes: + """Read *path* from inside *decky_name*. + + Required for the realism edit-in-place flow (stage 3b of the + realism migration): the driver reads the previous body, the + realism engine produces the next iteration, the driver writes + it back. Default raises :class:`NotImplementedError`. + """ + raise NotImplementedError( + f"{type(self).__name__} does not support read_file" + ) + + +# Back-compat alias so existing imports of ``Driver`` keep working +# while consumers transition to ``ActivityDriver``. Removed once the +# realism migration is complete. +Driver = ActivityDriver diff --git a/decnet/orchestrator/drivers/email.py b/decnet/orchestrator/drivers/email.py index c44b82f6..b5746729 100644 --- a/decnet/orchestrator/drivers/email.py +++ b/decnet/orchestrator/drivers/email.py @@ -28,7 +28,7 @@ from email.utils import formatdate from typing import Any, Optional from decnet.logging import get_logger -from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.drivers.base import ActivityDriver, ActivityResult from decnet.orchestrator.emailgen.scheduler import EmailAction from decnet.orchestrator.emailgen.threads import new_message_id from decnet.realism.llm import LLMBackend, LLMTimeout, get_llm @@ -148,7 +148,7 @@ def _build_eml( return msg.as_bytes() -class EmailDriver: +class EmailDriver(ActivityDriver): """Concrete driver for :class:`EmailAction`. Stateless across calls — the LLM backend is constructed once at diff --git a/decnet/orchestrator/drivers/ssh.py b/decnet/orchestrator/drivers/ssh.py index 64ddced8..73289b73 100644 --- a/decnet/orchestrator/drivers/ssh.py +++ b/decnet/orchestrator/drivers/ssh.py @@ -21,8 +21,11 @@ import asyncio import shlex from typing import Any +import base64 +from datetime import datetime, timezone + from decnet.logging import get_logger -from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.drivers.base import ActivityDriver, ActivityResult from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction log = get_logger("orchestrator.ssh") @@ -48,16 +51,31 @@ async def _run(argv: list[str]) -> tuple[int, str, str]: raises — orchestrator success/failure is a payload attribute, not an exception. """ + return await _run_with_stdin(argv, None) + + +async def _run_with_stdin( + argv: list[str], stdin_bytes: bytes | None, +) -> tuple[int, str, str]: + """Spawn *argv*, optionally feeding *stdin_bytes*, and capture rc+output. + + Used by :meth:`SSHDriver.plant_file` to stream base64 payloads via + stdin (avoids ARG_MAX on large blobs — same fix as the canary + planter in commit c17b9e0). Same failure semantics as :func:`_run`. + """ try: proc = await asyncio.create_subprocess_exec( *argv, + stdin=asyncio.subprocess.PIPE if stdin_bytes is not None else None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError as exc: return 127, "", f"argv[0] not found: {exc}" try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT) + stdout, stderr = await asyncio.wait_for( + proc.communicate(stdin_bytes), timeout=_TIMEOUT, + ) except asyncio.TimeoutError: try: proc.kill() @@ -83,8 +101,8 @@ _PROBE_PY = ( ) -class SSHDriver: - """Concrete :class:`Driver` for the MVP.""" +class SSHDriver(ActivityDriver): + """Concrete :class:`ActivityDriver` for SSH-flavoured actions.""" async def run(self, action: Action) -> ActivityResult: if isinstance(action, TrafficAction): @@ -118,30 +136,83 @@ class SSHDriver: return ActivityResult(success=success, payload=payload) async def _run_file(self, action: FileAction) -> ActivityResult: - container = _container_for(action.dst_name) - # `tee` is in coreutils on every base image; using it (instead of - # `>` redirection) keeps the argv free of shell metacharacters - # the dst_name/path could otherwise weaponise. Path validation - # still belongs upstream — the scheduler's templates are fixed. - # We do invoke `sh -c` so the parent dir gets mkdir'd in one - # call; the sh argv stays trivially auditable. - sh_cmd = ( - f"mkdir -p {shlex.quote(_dirname(action.path))} && " - f"printf %s {shlex.quote(action.content)} > {shlex.quote(action.path)} && " - f"touch {shlex.quote(action.path)}" + # FileAction's content is a string; the realism path uses + # bytes-typed plant_file so binary blobs (DOCX/PDF, future + # canary artifacts) survive the wire. Encode-once here. + return await self.plant_file( + action.dst_name, + action.path, + action.content.encode("utf-8"), + mode=0o644, ) - argv = [_DOCKER, "exec", container, "sh", "-c", sh_cmd] - rc, stdout, stderr = await _run(argv) + + async def plant_file( + self, + decky_name: str, + path: str, + content: bytes, + *, + mode: int = 0o600, + mtime: datetime | None = None, + ) -> ActivityResult: + """Write *content* to *path* inside *decky_name*'s ssh container. + + Streams base64 via stdin (mirrors :mod:`decnet.canary.planter`'s + ARG_MAX-safe write — see commit c17b9e0). Sets file mode and, + when *mtime* is provided, ``touch -d`` to backdate the file so + it doesn't all stamp at wall-clock-now (the realism failure + this migration is fixing). + """ + container = _container_for(decky_name) + b64 = base64.b64encode(content).decode("ascii") + # touch -d accepts ISO 8601; we always emit UTC so the + # container's local TZ doesn't drift the mtime. + if mtime is not None: + ts = mtime.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + touch_cmd = f"touch -d {shlex.quote(ts)} {shlex.quote(path)}" + else: + touch_cmd = f"touch {shlex.quote(path)}" + sh_cmd = ( + f"mkdir -p {shlex.quote(_dirname(path))} && " + f"base64 -d > {shlex.quote(path)} && " + f"chmod {mode:o} {shlex.quote(path)} && " + f"{touch_cmd}" + ) + argv = [_DOCKER, "exec", "-i", container, "sh", "-c", sh_cmd] + rc, _stdout, stderr = await _run_with_stdin(argv, b64.encode("ascii")) success = rc == 0 payload: dict[str, Any] = { - "dst_decky": action.dst_name, - "path": action.path, - "bytes": len(action.content.encode("utf-8")), + "dst_decky": decky_name, + "path": path, + "bytes": len(content), "rc": rc, "stderr": stderr.strip()[:256] if not success else None, } return ActivityResult(success=success, payload=payload) + async def read_file(self, decky_name: str, path: str) -> bytes: + """Read *path* from inside *decky_name*'s ssh container. + + Used by the realism edit-in-place flow: the driver fetches + the previous body, the realism engine produces the next + iteration, the driver re-plants it via :meth:`plant_file`. + + Raises :class:`FileNotFoundError` when the container path + doesn't exist (rc=1 from ``cat`` with stderr ``No such + file``). Other failures raise :class:`RuntimeError` carrying + the docker stderr. + """ + container = _container_for(decky_name) + argv = [_DOCKER, "exec", container, "cat", path] + rc, stdout, stderr = await _run(argv) + if rc == 0: + return stdout.encode("utf-8") if isinstance(stdout, str) else stdout + if "No such file" in stderr or "no such file" in stderr.lower(): + raise FileNotFoundError(f"{path} not present in {decky_name}") + raise RuntimeError( + f"docker exec cat failed rc={rc} stderr={stderr.strip()[:256]!r}" + ) + def _dirname(path: str) -> str: """Pure-string dirname. We can't trust ``os.path.dirname`` on the diff --git a/tests/orchestrator/test_ssh_driver.py b/tests/orchestrator/test_ssh_driver.py index b76d162b..1fb6873f 100644 --- a/tests/orchestrator/test_ssh_driver.py +++ b/tests/orchestrator/test_ssh_driver.py @@ -56,13 +56,17 @@ async def test_traffic_failure_when_banner_missing(monkeypatch): @pytest.mark.asyncio async def test_file_action_invokes_docker_exec_on_dst(monkeypatch): - captured_argv: list[list[str]] = [] + captured: list[tuple[list[str], bytes | None]] = [] - async def fake_run(argv): - captured_argv.append(argv) + async def fake_run_with_stdin(argv, stdin_bytes): + captured.append((argv, stdin_bytes)) return 0, "", "" - monkeypatch.setattr(ssh_driver, "_run", fake_run) + # plant_file streams base64 content via stdin to avoid ARG_MAX + # (mirrors decnet.canary.planter; see commit c17b9e0). The test + # patches _run_with_stdin instead of _run because that's the + # codepath FileAction now exercises. + monkeypatch.setattr(ssh_driver, "_run_with_stdin", fake_run_with_stdin) drv = ssh_driver.SSHDriver() action = FileAction( dst_uuid="u2", dst_name="decky-02", @@ -71,19 +75,20 @@ async def test_file_action_invokes_docker_exec_on_dst(monkeypatch): ) result = await drv.run(action) assert result.success is True - assert result.payload["bytes"] == len("session=1700000000\n".encode()) - argv = captured_argv[0] - assert argv[:3] == ["docker", "exec", "decky-02-ssh"] - assert argv[3] == "sh" - assert argv[4] == "-c" - # The shell payload must single-quote both the content and the path — - # any unquoted ``;`` or ``$`` here would mean a shell-injection bug. - sh_cmd = argv[5] - # Path appears (shlex.quote leaves safe paths unquoted) and content - # is single-quoted — that's the shell-injection-safe contract. + assert result.payload["bytes"] == len(b"session=1700000000\n") + argv, stdin_bytes = captured[0] + assert argv[:4] == ["docker", "exec", "-i", "decky-02-ssh"] + assert argv[4] == "sh" + assert argv[5] == "-c" + sh_cmd = argv[6] assert "/tmp/.cache-1700000000.tmp" in sh_cmd - assert "'session=1700000000\n'" in sh_cmd + assert "base64 -d" in sh_cmd assert "mkdir -p /tmp" in sh_cmd + # Content travels base64-encoded on stdin, not interpolated into + # argv — that's the ARG_MAX-safe + shell-injection-safe contract. + import base64 + assert stdin_bytes is not None + assert base64.b64decode(stdin_bytes) == b"session=1700000000\n" @pytest.mark.asyncio @@ -97,3 +102,72 @@ async def test_run_handles_missing_docker_binary(monkeypatch): rc, out, err = await ssh_driver._run(["docker", "exec", "x", "true"]) assert rc == 127 assert "not found" in err + + +@pytest.mark.asyncio +async def test_plant_file_applies_mtime_via_touch_d(monkeypatch): + from datetime import datetime, timezone + captured: list[tuple[list[str], bytes | None]] = [] + + async def fake_run_with_stdin(argv, stdin_bytes): + captured.append((argv, stdin_bytes)) + return 0, "", "" + + monkeypatch.setattr(ssh_driver, "_run_with_stdin", fake_run_with_stdin) + drv = ssh_driver.SSHDriver() + mtime = datetime(2026, 4, 20, 11, 30, 0, tzinfo=timezone.utc) + result = await drv.plant_file( + "decky-03", "/home/admin/TODO.md", b"- [ ] rotate keys\n", + mode=0o644, mtime=mtime, + ) + assert result.success is True + sh_cmd = captured[0][0][6] + # Backdated mtime appears in the touch -d argument. + assert "touch -d '2026-04-20 11:30:00 UTC'" in sh_cmd + assert "chmod 644" in sh_cmd + + +@pytest.mark.asyncio +async def test_read_file_returns_bytes(monkeypatch): + async def fake_run(argv): + assert argv[:3] == ["docker", "exec", "decky-04-ssh"] + assert argv[3] == "cat" + return 0, "previous body\n", "" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + drv = ssh_driver.SSHDriver() + body = await drv.read_file("decky-04", "/home/admin/notes.txt") + assert body == b"previous body\n" + + +@pytest.mark.asyncio +async def test_read_file_raises_file_not_found(monkeypatch): + async def fake_run(argv): + return 1, "", "cat: /nope: No such file or directory" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + drv = ssh_driver.SSHDriver() + with pytest.raises(FileNotFoundError): + await drv.read_file("decky-04", "/nope") + + +@pytest.mark.asyncio +async def test_get_driver_for_dispatches_by_action_type(): + from decnet.orchestrator.drivers import get_driver_for, SSHDriver + traffic = TrafficAction( + src_uuid="u1", src_name="a", dst_uuid="u2", dst_name="b", + dst_ip="10.0.0.1", + ) + file_a = FileAction( + dst_uuid="u2", dst_name="b", path="/tmp/x", content="y", + ) + assert isinstance(get_driver_for(traffic), SSHDriver) + assert isinstance(get_driver_for(file_a), SSHDriver) + + +def test_get_driver_for_unknown_action_raises(): + from decnet.orchestrator.drivers import get_driver_for + class _Bogus: + pass + with pytest.raises(TypeError, match="no driver registered"): + get_driver_for(_Bogus()) # type: ignore[arg-type]