feat(realism): EditAction read-modify-write of planted files

Stage 3b of the realism migration. A TODO.md planted on Monday gets a
checkbox flipped on Tuesday; a notes file grows a follow-up line; a
cron log gets a fresh entry tacked on. The synthetic_files row's
edit_count, last_modified, and content_hash advance.

New surface:

- EditAction dataclass (peer of FileAction in scheduler.py): carries
  decky, path, persona, content_class, previous_body, mtime, and
  synthetic_file_uuid for the worker's update path.
- realism.bodies.next_iteration(cls, persona, prev, rng): per-class
  deterministic mutators. TODO flips an unchecked box and/or appends;
  notes/drafts/scripts append; logs are append-only (mirroring real
  log behaviour). Canary, cache_tmp, email raise KeyError —
  unsupported.
- realism.planner.pick gains an edit branch: 60% create, 30% edit
  (when an edit_candidate is supplied), 10% leave-alone. Returns
  None on leave-alone — quiet ticks are realism too.
- scheduler.pick_file pre-fetches a single edit candidate via
  repo.pick_random_synthetic_file_for_edit ~50% of ticks; the
  planner decides whether to use it.
- SSHDriver._run_edit: turns next_iteration output into a
  plant_file call (mtime-bumped, mode 0o644). Stashes new_body in
  result.payload so the worker can hash it for synthetic_files.
- worker._bump_synthetic_file_after_edit: patches edit_count + 1,
  last_modified=now, content_hash, last_body for the row UUID.
  No-op when the row was pruned mid-flight.
- events.to_row / topic_for / event_type_for now recognise
  EditAction (kind="file", action="file:edit").
This commit is contained in:
2026-04-27 16:38:17 -04:00
parent 32eeb0c813
commit b321e29002
7 changed files with 484 additions and 29 deletions

View File

@@ -26,7 +26,12 @@ from datetime import datetime, timezone
from decnet.logging import get_logger
from decnet.orchestrator.drivers.base import ActivityDriver, ActivityResult
from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction
from decnet.orchestrator.scheduler import (
Action,
EditAction,
FileAction,
TrafficAction,
)
log = get_logger("orchestrator.ssh")
@@ -109,6 +114,8 @@ class SSHDriver(ActivityDriver):
return await self._run_traffic(action)
if isinstance(action, FileAction):
return await self._run_file(action)
if isinstance(action, EditAction):
return await self._run_edit(action)
raise TypeError(f"unsupported action type: {type(action)!r}")
async def _run_traffic(self, action: TrafficAction) -> ActivityResult:
@@ -135,6 +142,61 @@ class SSHDriver(ActivityDriver):
)
return ActivityResult(success=success, payload=payload)
async def _run_edit(self, action: EditAction) -> ActivityResult:
"""Mutate an existing synthetic file in place.
The realism planner already loaded the previous body from the
``synthetic_files`` row, so we don't re-fetch via ``read_file``;
the body the planner saw is the body we mutate. This avoids a
TOCTOU window where the file changed between pick and apply
(the realism worker is the only writer in the MVP, but the
contract should still be tight).
"""
from decnet.realism.bodies import next_iteration as _next_iteration
from decnet.realism.taxonomy import ContentClass
try:
cls = ContentClass(action.content_class)
except ValueError:
return ActivityResult(
success=False,
payload={
"dst_decky": action.dst_name,
"path": action.path,
"error": f"unknown content_class: {action.content_class!r}",
},
)
try:
new_body = _next_iteration(
cls, action.persona, action.previous_body,
)
except KeyError:
return ActivityResult(
success=False,
payload={
"dst_decky": action.dst_name,
"path": action.path,
"error": (
f"content_class={cls!s} does not support edits"
),
},
)
result = await self.plant_file(
action.dst_name,
action.path,
new_body.encode("utf-8"),
mode=0o644,
mtime=action.mtime,
)
# Carry edit-specific metadata through to the orchestrator
# event payload so the worker's synthetic_files bump (and the
# dashboard's lineage view) sees what actually landed.
if result.success:
result.payload["new_body"] = new_body
result.payload["new_body_bytes"] = len(new_body.encode("utf-8"))
result.payload["synthetic_file_uuid"] = action.synthetic_file_uuid
return result
async def _run_file(self, action: FileAction) -> ActivityResult:
# FileAction's content is a string; the realism path uses
# bytes-typed plant_file so binary blobs (DOCX/PDF, future

View File

@@ -6,7 +6,12 @@ from typing import Any
from decnet.bus import topics as _topics
from decnet.orchestrator.drivers.base import ActivityResult
from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction
from decnet.orchestrator.scheduler import (
Action,
EditAction,
FileAction,
TrafficAction,
)
def to_row(action: Action, result: ActivityResult) -> dict[str, Any]:
@@ -31,6 +36,16 @@ def to_row(action: Action, result: ActivityResult) -> dict[str, Any]:
src_decky_uuid=None,
dst_decky_uuid=action.dst_uuid,
)
elif isinstance(action, EditAction):
# EditAction shares the "file" kind (same dashboard view, same
# bus topic family) but action="file:edit" lets queries
# discriminate when needed.
base.update(
kind="file",
action=action.description,
src_decky_uuid=None,
dst_decky_uuid=action.dst_uuid,
)
else:
raise TypeError(f"unsupported action type: {type(action)!r}")
return base
@@ -40,7 +55,7 @@ def topic_for(action: Action) -> str:
"""Map an action to its bus topic."""
if isinstance(action, TrafficAction):
return _topics.orchestrator(_topics.ORCHESTRATOR_TRAFFIC, action.dst_uuid)
if isinstance(action, FileAction):
if isinstance(action, (FileAction, EditAction)):
return _topics.orchestrator(_topics.ORCHESTRATOR_FILE, action.dst_uuid)
raise TypeError(f"unsupported action type: {type(action)!r}")
@@ -48,6 +63,6 @@ def topic_for(action: Action) -> str:
def event_type_for(action: Action) -> str:
if isinstance(action, TrafficAction):
return _topics.ORCHESTRATOR_TRAFFIC
if isinstance(action, FileAction):
if isinstance(action, (FileAction, EditAction)):
return _topics.ORCHESTRATOR_FILE
raise TypeError(f"unsupported action type: {type(action)!r}")

View File

@@ -58,7 +58,28 @@ class FileAction:
description: str = "file:create"
Action = TrafficAction | FileAction
@dataclass(frozen=True)
class EditAction:
"""Read-modify-write of an existing synthetic file.
Stage 3b of the realism migration: a previously-planted ``TODO.md``
gets a checkbox flipped, a notes file gets a new line appended, a
cron log gets a fresh entry tacked on. ``synthetic_file_uuid`` is
the row in ``synthetic_files`` to update; ``previous_body`` is
what the planner already saw so the driver doesn't double-fetch.
"""
dst_uuid: str
dst_name: str
path: str
persona: str
content_class: str
previous_body: str
synthetic_file_uuid: str
mtime: Optional[datetime] = None
description: str = "file:edit"
Action = TrafficAction | FileAction | EditAction
def _has_ssh(decky: dict[str, Any]) -> bool:
@@ -100,26 +121,64 @@ async def pick_file(
*,
now: Optional[datetime] = None,
rand: Optional[secrets.SystemRandom] = None,
) -> Optional[FileAction]:
"""Realism-driven file action.
) -> Optional[Action]:
"""Realism-driven file action — create or edit.
Resolves personas per decky (topology pool when the decky has a
parent topology; global pool otherwise), filters to deckies in any
persona's work hours, asks :func:`decnet.realism.planner.pick` to
pick the (decky, persona, content_class, path, body, mtime), and
maps the resulting :class:`Plan` to a :class:`FileAction` the
persona's work hours, optionally fetches an edit candidate from
the synthetic_files table, and asks
:func:`decnet.realism.planner.pick` to choose between create / edit
/ leave-alone. Maps the resulting :class:`Plan` to a
:class:`FileAction` (create) or :class:`EditAction` (edit) the
SSH driver can dispatch.
Returns ``None`` when no decky has a non-empty persona pool with a
persona currently in its active-hours window.
persona currently in its active-hours window, or when the planner
rolled "leave alone."
"""
rng = rand or secrets.SystemRandom()
when = now or datetime.now(timezone.utc)
enriched = await _resolve_personas(deckies, repo)
plan = _realism_pick(enriched, when, rand=rng)
if not enriched:
return None
# Pre-fetch a single edit candidate from a random eligible decky,
# so the planner can decide whether to use it. We pick the decky
# client-side (cheap) and ask the repo for one row; if there's
# nothing editable, planner falls back to create.
edit_candidate = None
if rng.random() < 0.5 and enriched:
# Half the ticks consider an edit. Lower than the planner's
# 30% edit weight on purpose — the repo lookup is the
# expensive part, no point doing it on every tick.
candidate_decky = rng.choice(enriched)
try:
row = await repo.pick_random_synthetic_file_for_edit(
candidate_decky["uuid"],
)
except Exception: # noqa: BLE001
row = None
if row is not None:
row = {**row, "decky_name": candidate_decky["name"]}
edit_candidate = row
plan = _realism_pick(enriched, when, edit_candidate=edit_candidate, rand=rng)
if plan is None:
return None
if plan.action == "edit":
return EditAction(
dst_uuid=plan.decky_uuid,
dst_name=plan.decky_name,
path=plan.target_path,
persona=plan.persona,
content_class=plan.content_class.value,
previous_body=plan.previous_body or "",
synthetic_file_uuid=(edit_candidate or {}).get("uuid", ""),
mtime=plan.mtime,
)
return FileAction(
dst_uuid=plan.decky_uuid,
dst_name=plan.decky_name,
@@ -203,6 +262,7 @@ def _topology_personas(topology: Optional[dict[str, Any]]) -> list[EmailPersona]
# ``Plan`` from the scheduler keep working through the migration.
__all__ = [
"Action",
"EditAction",
"FileAction",
"Plan",
"TrafficAction",

View File

@@ -212,14 +212,24 @@ async def _one_tick(repo: BaseRepository, bus) -> None:
await _persist_email(repo, action, result, bus)
else:
await _persist_event(repo, action, result, bus)
if isinstance(action, scheduler.FileAction) and result.success:
try:
await _record_synthetic_file(repo, action)
except Exception as exc: # noqa: BLE001
logger.warning(
"orchestrator: synthetic_files write failed dst=%s path=%s: %s",
action.dst_uuid, action.path, exc,
)
if result.success:
if isinstance(action, scheduler.FileAction):
try:
await _record_synthetic_file(repo, action)
except Exception as exc: # noqa: BLE001
logger.warning(
"orchestrator: synthetic_files write failed dst=%s path=%s: %s",
action.dst_uuid, action.path, exc,
)
elif isinstance(action, scheduler.EditAction):
try:
await _bump_synthetic_file_after_edit(repo, action, result)
except Exception as exc: # noqa: BLE001
logger.warning(
"orchestrator: synthetic_files edit-bump failed "
"dst=%s path=%s: %s",
action.dst_uuid, action.path, exc,
)
async def _persist_event(repo, action, result, bus) -> None:
@@ -284,6 +294,41 @@ async def _persist_email(repo, action: EmailAction, result, bus) -> None:
)
async def _bump_synthetic_file_after_edit(repo, action, result) -> None:
"""Patch ``synthetic_files`` after a successful EditAction.
Bumps ``edit_count`` + ``last_modified`` + ``content_hash`` so the
dashboard's lineage view shows the change. When the row's UUID
isn't on the action (planner produced an edit plan from a stale
candidate that the repo pruned in between), the update is a no-op
— resurrecting a pruned row isn't this layer's job.
The new body comes from ``result.payload["new_body"]`` (the SSH
driver stashes it on success); we re-hash here so the orchestrator,
not the driver, owns the canonical hash field.
"""
if not action.synthetic_file_uuid:
return
new_body = result.payload.get("new_body", "")
rows = await repo.list_synthetic_files(decky_uuid=action.dst_uuid, limit=200)
existing = next(
(r for r in rows if r.get("uuid") == action.synthetic_file_uuid),
None,
)
if existing is None:
return # candidate was pruned mid-flight; skip silently
patch: dict = {
"last_modified": datetime.now(timezone.utc),
"edit_count": int(existing.get("edit_count", 0)) + 1,
}
if new_body:
patch["content_hash"] = hashlib.sha256(
new_body.encode("utf-8"),
).hexdigest()
patch["last_body"] = new_body[:65536]
await repo.update_synthetic_file(action.synthetic_file_uuid, patch)
async def _record_synthetic_file(repo, action) -> None:
"""Persist (or patch) a synthetic_files row after a FileAction plant.