refactor(orchestrator): extract ActivityDriver ABC + driver factory

Stage 4 of the realism migration. Lifts the driver Protocol into a
proper ABC with default plant_file/read_file methods (raise
NotImplementedError), and adds get_driver_for(action) so the
orchestrator worker can dispatch by action shape without isinstance
chains.

SSHDriver now inherits ActivityDriver and implements:
- plant_file: streams base64 via stdin (ARG_MAX-safe, mirrors
  decnet.canary.planter; commit c17b9e0). Honours mtime via touch -d
  so realism-planned files don't all stamp at wall-clock-now.
- read_file: docker exec cat with FileNotFoundError on rc=1, used by
  the upcoming EditAction (stage 3b).

EmailDriver inherits ActivityDriver. Driver alias kept for back-compat
during the migration; removed once realism stages 5-7 land.
This commit is contained in:
2026-04-27 16:09:46 -04:00
parent 0b9873982d
commit 636c057cc5
5 changed files with 329 additions and 50 deletions

View File

@@ -1,5 +1,74 @@
"""Activity drivers for the orchestrator (MVP: SSH only).""" """Activity drivers for the orchestrator.
from decnet.orchestrator.drivers.base import ActivityResult, Driver
from decnet.orchestrator.drivers.ssh import SSHDriver
__all__ = ["ActivityResult", "Driver", "SSHDriver"] Concrete drivers register dispatch in :func:`get_driver_for`. Same
lazy-import pattern as :mod:`decnet.canary.factory`: the import-time
cost of :mod:`decnet.orchestrator.drivers` stays low for callers that
only need :class:`ActivityResult` / :class:`ActivityDriver`.
"""
from __future__ import annotations
from decnet.orchestrator.drivers.base import (
ActivityDriver,
ActivityResult,
Driver,
)
from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction
__all__ = [
"ActivityDriver",
"ActivityResult",
"Driver",
"SSHDriver",
"get_driver_for",
]
def __getattr__(name: str): # pragma: no cover - import passthrough
"""Lazy access to concrete drivers.
Avoids dragging the docker-exec / email-driver code into every
consumer that only needs the ABC.
"""
if name == "SSHDriver":
from decnet.orchestrator.drivers.ssh import SSHDriver
return SSHDriver
if name == "EmailDriver":
from decnet.orchestrator.drivers.email import EmailDriver
return EmailDriver
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
def get_driver_for(action: Action) -> ActivityDriver:
"""Return the concrete driver that handles *action*.
Stage 4 of the realism migration adds this seam so the orchestrator
worker can dispatch by action type without an isinstance chain in
``_one_tick``. Stage 5 wires the worker to call this function
instead of holding a single ``SSHDriver`` instance.
The set of action shapes the orchestrator can plan grows with the
migration:
* :class:`TrafficAction` / :class:`FileAction` → :class:`SSHDriver`
* :class:`EmailAction` (post-stage-5) → ``EmailDriver``
* :class:`EditAction` (post-stage-3b) → :class:`SSHDriver`
"""
# Lazy imports keep the side-effecting docker-exec / email-driver
# modules out of every importer's graph.
from decnet.orchestrator.drivers.ssh import SSHDriver
if isinstance(action, (TrafficAction, FileAction)):
return SSHDriver()
# EmailAction lands in stage 5; reachable only after that import is
# added to scheduler. Importing inside the branch avoids a cycle
# with realism.llm at module load time.
try:
from decnet.orchestrator.emailgen.scheduler import EmailAction
except ImportError: # pragma: no cover - scheduler always exists
EmailAction = None # type: ignore[assignment]
if EmailAction is not None and isinstance(action, EmailAction):
from decnet.orchestrator.drivers.email import EmailDriver
return EmailDriver()
raise TypeError(
f"no driver registered for action type {type(action).__name__}"
)

View File

@@ -1,13 +1,27 @@
"""Driver protocol for orchestrator actions. """Driver ABC for orchestrator actions.
Future protocols (HTTP, SMB, MySQL, …) implement this interface alongside Each concrete driver (SSH, Email, future HTTP/SMB/MySQL) maps one
the SSH driver. Kept deliberately minimal — the orchestrator only needs :class:`~decnet.orchestrator.scheduler.Action` shape to a side effect
"run this action and tell me how it went". on a target decky and returns an :class:`ActivityResult` the
orchestrator persists.
The ABC lives here, the dispatch factory in
:mod:`decnet.orchestrator.drivers` ``__init__``, and the impls in
sibling modules — same pattern as :mod:`decnet.canary.factory`,
:mod:`decnet.web.db.factory`, and :mod:`decnet.bus.factory`.
Why ABC and not :class:`Protocol`: drivers also expose lower-level
helpers (``plant_file``, ``read_file``) that the planner-driven
realism path will call directly without going through ``run``.
Inheritance pins the contract for those helpers; a structural
protocol would let a typo silently produce a half-implemented driver.
""" """
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Protocol from datetime import datetime
from typing import Any
from decnet.orchestrator.scheduler import Action from decnet.orchestrator.scheduler import Action
@@ -16,12 +30,63 @@ from decnet.orchestrator.scheduler import Action
class ActivityResult: class ActivityResult:
"""Outcome of one driver invocation. """Outcome of one driver invocation.
``payload`` is the per-action JSON envelope the worker writes to the ``payload`` is the per-action JSON envelope the worker writes to
``OrchestratorEvent.payload`` column and to the bus event body. the ``OrchestratorEvent.payload`` column and to the bus event
body.
""" """
success: bool success: bool
payload: dict[str, Any] = field(default_factory=dict) payload: dict[str, Any] = field(default_factory=dict)
class Driver(Protocol): class ActivityDriver(ABC):
async def run(self, action: Action) -> ActivityResult: ... """Base class every concrete orchestrator driver inherits.
Subclasses MUST implement :meth:`run` — the action-shape dispatch.
Subclasses that interact with files on the target decky SHOULD
implement :meth:`plant_file` and :meth:`read_file` so the realism
edit-in-place path can read existing artifacts before mutating
them. Drivers that don't touch files (e.g. a future pure-traffic
driver) raise :class:`NotImplementedError` from those, and the
planner avoids picking ``EditAction`` for them.
"""
@abstractmethod
async def run(self, action: Action) -> ActivityResult:
"""Execute the action against its target decky."""
async def plant_file(
self,
decky_name: str,
path: str,
content: bytes,
*,
mode: int = 0o600,
mtime: datetime | None = None,
) -> ActivityResult:
"""Write *content* to *path* inside *decky_name*.
Default raises :class:`NotImplementedError`; concrete drivers
that have a write transport (docker exec, ssh, etc.) override.
Bytes-typed so binary artifacts (DOCX/PDF) survive the wire.
"""
raise NotImplementedError(
f"{type(self).__name__} does not support plant_file"
)
async def read_file(self, decky_name: str, path: str) -> bytes:
"""Read *path* from inside *decky_name*.
Required for the realism edit-in-place flow (stage 3b of the
realism migration): the driver reads the previous body, the
realism engine produces the next iteration, the driver writes
it back. Default raises :class:`NotImplementedError`.
"""
raise NotImplementedError(
f"{type(self).__name__} does not support read_file"
)
# Back-compat alias so existing imports of ``Driver`` keep working
# while consumers transition to ``ActivityDriver``. Removed once the
# realism migration is complete.
Driver = ActivityDriver

View File

@@ -28,7 +28,7 @@ from email.utils import formatdate
from typing import Any, Optional from typing import Any, Optional
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.orchestrator.drivers.base import ActivityResult from decnet.orchestrator.drivers.base import ActivityDriver, ActivityResult
from decnet.orchestrator.emailgen.scheduler import EmailAction from decnet.orchestrator.emailgen.scheduler import EmailAction
from decnet.orchestrator.emailgen.threads import new_message_id from decnet.orchestrator.emailgen.threads import new_message_id
from decnet.realism.llm import LLMBackend, LLMTimeout, get_llm from decnet.realism.llm import LLMBackend, LLMTimeout, get_llm
@@ -148,7 +148,7 @@ def _build_eml(
return msg.as_bytes() return msg.as_bytes()
class EmailDriver: class EmailDriver(ActivityDriver):
"""Concrete driver for :class:`EmailAction`. """Concrete driver for :class:`EmailAction`.
Stateless across calls — the LLM backend is constructed once at Stateless across calls — the LLM backend is constructed once at

View File

@@ -21,8 +21,11 @@ import asyncio
import shlex import shlex
from typing import Any from typing import Any
import base64
from datetime import datetime, timezone
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.orchestrator.drivers.base import ActivityResult from decnet.orchestrator.drivers.base import ActivityDriver, ActivityResult
from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction
log = get_logger("orchestrator.ssh") log = get_logger("orchestrator.ssh")
@@ -48,16 +51,31 @@ async def _run(argv: list[str]) -> tuple[int, str, str]:
raises — orchestrator success/failure is a payload attribute, not raises — orchestrator success/failure is a payload attribute, not
an exception. an exception.
""" """
return await _run_with_stdin(argv, None)
async def _run_with_stdin(
argv: list[str], stdin_bytes: bytes | None,
) -> tuple[int, str, str]:
"""Spawn *argv*, optionally feeding *stdin_bytes*, and capture rc+output.
Used by :meth:`SSHDriver.plant_file` to stream base64 payloads via
stdin (avoids ARG_MAX on large blobs — same fix as the canary
planter in commit c17b9e0). Same failure semantics as :func:`_run`.
"""
try: try:
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
*argv, *argv,
stdin=asyncio.subprocess.PIPE if stdin_bytes is not None else None,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
) )
except FileNotFoundError as exc: except FileNotFoundError as exc:
return 127, "", f"argv[0] not found: {exc}" return 127, "", f"argv[0] not found: {exc}"
try: try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT) stdout, stderr = await asyncio.wait_for(
proc.communicate(stdin_bytes), timeout=_TIMEOUT,
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
try: try:
proc.kill() proc.kill()
@@ -83,8 +101,8 @@ _PROBE_PY = (
) )
class SSHDriver: class SSHDriver(ActivityDriver):
"""Concrete :class:`Driver` for the MVP.""" """Concrete :class:`ActivityDriver` for SSH-flavoured actions."""
async def run(self, action: Action) -> ActivityResult: async def run(self, action: Action) -> ActivityResult:
if isinstance(action, TrafficAction): if isinstance(action, TrafficAction):
@@ -118,30 +136,83 @@ class SSHDriver:
return ActivityResult(success=success, payload=payload) return ActivityResult(success=success, payload=payload)
async def _run_file(self, action: FileAction) -> ActivityResult: async def _run_file(self, action: FileAction) -> ActivityResult:
container = _container_for(action.dst_name) # FileAction's content is a string; the realism path uses
# `tee` is in coreutils on every base image; using it (instead of # bytes-typed plant_file so binary blobs (DOCX/PDF, future
# `>` redirection) keeps the argv free of shell metacharacters # canary artifacts) survive the wire. Encode-once here.
# the dst_name/path could otherwise weaponise. Path validation return await self.plant_file(
# still belongs upstream — the scheduler's templates are fixed. action.dst_name,
# We do invoke `sh -c` so the parent dir gets mkdir'd in one action.path,
# call; the sh argv stays trivially auditable. action.content.encode("utf-8"),
sh_cmd = ( mode=0o644,
f"mkdir -p {shlex.quote(_dirname(action.path))} && "
f"printf %s {shlex.quote(action.content)} > {shlex.quote(action.path)} && "
f"touch {shlex.quote(action.path)}"
) )
argv = [_DOCKER, "exec", container, "sh", "-c", sh_cmd]
rc, stdout, stderr = await _run(argv) async def plant_file(
self,
decky_name: str,
path: str,
content: bytes,
*,
mode: int = 0o600,
mtime: datetime | None = None,
) -> ActivityResult:
"""Write *content* to *path* inside *decky_name*'s ssh container.
Streams base64 via stdin (mirrors :mod:`decnet.canary.planter`'s
ARG_MAX-safe write — see commit c17b9e0). Sets file mode and,
when *mtime* is provided, ``touch -d`` to backdate the file so
it doesn't all stamp at wall-clock-now (the realism failure
this migration is fixing).
"""
container = _container_for(decky_name)
b64 = base64.b64encode(content).decode("ascii")
# touch -d accepts ISO 8601; we always emit UTC so the
# container's local TZ doesn't drift the mtime.
if mtime is not None:
ts = mtime.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
touch_cmd = f"touch -d {shlex.quote(ts)} {shlex.quote(path)}"
else:
touch_cmd = f"touch {shlex.quote(path)}"
sh_cmd = (
f"mkdir -p {shlex.quote(_dirname(path))} && "
f"base64 -d > {shlex.quote(path)} && "
f"chmod {mode:o} {shlex.quote(path)} && "
f"{touch_cmd}"
)
argv = [_DOCKER, "exec", "-i", container, "sh", "-c", sh_cmd]
rc, _stdout, stderr = await _run_with_stdin(argv, b64.encode("ascii"))
success = rc == 0 success = rc == 0
payload: dict[str, Any] = { payload: dict[str, Any] = {
"dst_decky": action.dst_name, "dst_decky": decky_name,
"path": action.path, "path": path,
"bytes": len(action.content.encode("utf-8")), "bytes": len(content),
"rc": rc, "rc": rc,
"stderr": stderr.strip()[:256] if not success else None, "stderr": stderr.strip()[:256] if not success else None,
} }
return ActivityResult(success=success, payload=payload) return ActivityResult(success=success, payload=payload)
async def read_file(self, decky_name: str, path: str) -> bytes:
"""Read *path* from inside *decky_name*'s ssh container.
Used by the realism edit-in-place flow: the driver fetches
the previous body, the realism engine produces the next
iteration, the driver re-plants it via :meth:`plant_file`.
Raises :class:`FileNotFoundError` when the container path
doesn't exist (rc=1 from ``cat`` with stderr ``No such
file``). Other failures raise :class:`RuntimeError` carrying
the docker stderr.
"""
container = _container_for(decky_name)
argv = [_DOCKER, "exec", container, "cat", path]
rc, stdout, stderr = await _run(argv)
if rc == 0:
return stdout.encode("utf-8") if isinstance(stdout, str) else stdout
if "No such file" in stderr or "no such file" in stderr.lower():
raise FileNotFoundError(f"{path} not present in {decky_name}")
raise RuntimeError(
f"docker exec cat failed rc={rc} stderr={stderr.strip()[:256]!r}"
)
def _dirname(path: str) -> str: def _dirname(path: str) -> str:
"""Pure-string dirname. We can't trust ``os.path.dirname`` on the """Pure-string dirname. We can't trust ``os.path.dirname`` on the

View File

@@ -56,13 +56,17 @@ async def test_traffic_failure_when_banner_missing(monkeypatch):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_file_action_invokes_docker_exec_on_dst(monkeypatch): async def test_file_action_invokes_docker_exec_on_dst(monkeypatch):
captured_argv: list[list[str]] = [] captured: list[tuple[list[str], bytes | None]] = []
async def fake_run(argv): async def fake_run_with_stdin(argv, stdin_bytes):
captured_argv.append(argv) captured.append((argv, stdin_bytes))
return 0, "", "" return 0, "", ""
monkeypatch.setattr(ssh_driver, "_run", fake_run) # plant_file streams base64 content via stdin to avoid ARG_MAX
# (mirrors decnet.canary.planter; see commit c17b9e0). The test
# patches _run_with_stdin instead of _run because that's the
# codepath FileAction now exercises.
monkeypatch.setattr(ssh_driver, "_run_with_stdin", fake_run_with_stdin)
drv = ssh_driver.SSHDriver() drv = ssh_driver.SSHDriver()
action = FileAction( action = FileAction(
dst_uuid="u2", dst_name="decky-02", dst_uuid="u2", dst_name="decky-02",
@@ -71,19 +75,20 @@ async def test_file_action_invokes_docker_exec_on_dst(monkeypatch):
) )
result = await drv.run(action) result = await drv.run(action)
assert result.success is True assert result.success is True
assert result.payload["bytes"] == len("session=1700000000\n".encode()) assert result.payload["bytes"] == len(b"session=1700000000\n")
argv = captured_argv[0] argv, stdin_bytes = captured[0]
assert argv[:3] == ["docker", "exec", "decky-02-ssh"] assert argv[:4] == ["docker", "exec", "-i", "decky-02-ssh"]
assert argv[3] == "sh" assert argv[4] == "sh"
assert argv[4] == "-c" assert argv[5] == "-c"
# The shell payload must single-quote both the content and the path — sh_cmd = argv[6]
# any unquoted ``;`` or ``$`` here would mean a shell-injection bug.
sh_cmd = argv[5]
# Path appears (shlex.quote leaves safe paths unquoted) and content
# is single-quoted — that's the shell-injection-safe contract.
assert "/tmp/.cache-1700000000.tmp" in sh_cmd assert "/tmp/.cache-1700000000.tmp" in sh_cmd
assert "'session=1700000000\n'" in sh_cmd assert "base64 -d" in sh_cmd
assert "mkdir -p /tmp" in sh_cmd assert "mkdir -p /tmp" in sh_cmd
# Content travels base64-encoded on stdin, not interpolated into
# argv — that's the ARG_MAX-safe + shell-injection-safe contract.
import base64
assert stdin_bytes is not None
assert base64.b64decode(stdin_bytes) == b"session=1700000000\n"
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -97,3 +102,72 @@ async def test_run_handles_missing_docker_binary(monkeypatch):
rc, out, err = await ssh_driver._run(["docker", "exec", "x", "true"]) rc, out, err = await ssh_driver._run(["docker", "exec", "x", "true"])
assert rc == 127 assert rc == 127
assert "not found" in err assert "not found" in err
@pytest.mark.asyncio
async def test_plant_file_applies_mtime_via_touch_d(monkeypatch):
from datetime import datetime, timezone
captured: list[tuple[list[str], bytes | None]] = []
async def fake_run_with_stdin(argv, stdin_bytes):
captured.append((argv, stdin_bytes))
return 0, "", ""
monkeypatch.setattr(ssh_driver, "_run_with_stdin", fake_run_with_stdin)
drv = ssh_driver.SSHDriver()
mtime = datetime(2026, 4, 20, 11, 30, 0, tzinfo=timezone.utc)
result = await drv.plant_file(
"decky-03", "/home/admin/TODO.md", b"- [ ] rotate keys\n",
mode=0o644, mtime=mtime,
)
assert result.success is True
sh_cmd = captured[0][0][6]
# Backdated mtime appears in the touch -d argument.
assert "touch -d '2026-04-20 11:30:00 UTC'" in sh_cmd
assert "chmod 644" in sh_cmd
@pytest.mark.asyncio
async def test_read_file_returns_bytes(monkeypatch):
async def fake_run(argv):
assert argv[:3] == ["docker", "exec", "decky-04-ssh"]
assert argv[3] == "cat"
return 0, "previous body\n", ""
monkeypatch.setattr(ssh_driver, "_run", fake_run)
drv = ssh_driver.SSHDriver()
body = await drv.read_file("decky-04", "/home/admin/notes.txt")
assert body == b"previous body\n"
@pytest.mark.asyncio
async def test_read_file_raises_file_not_found(monkeypatch):
async def fake_run(argv):
return 1, "", "cat: /nope: No such file or directory"
monkeypatch.setattr(ssh_driver, "_run", fake_run)
drv = ssh_driver.SSHDriver()
with pytest.raises(FileNotFoundError):
await drv.read_file("decky-04", "/nope")
@pytest.mark.asyncio
async def test_get_driver_for_dispatches_by_action_type():
from decnet.orchestrator.drivers import get_driver_for, SSHDriver
traffic = TrafficAction(
src_uuid="u1", src_name="a", dst_uuid="u2", dst_name="b",
dst_ip="10.0.0.1",
)
file_a = FileAction(
dst_uuid="u2", dst_name="b", path="/tmp/x", content="y",
)
assert isinstance(get_driver_for(traffic), SSHDriver)
assert isinstance(get_driver_for(file_a), SSHDriver)
def test_get_driver_for_unknown_action_raises():
from decnet.orchestrator.drivers import get_driver_for
class _Bogus:
pass
with pytest.raises(TypeError, match="no driver registered"):
get_driver_for(_Bogus()) # type: ignore[arg-type]