diff --git a/decnet/orchestrator/drivers/ssh.py b/decnet/orchestrator/drivers/ssh.py index 276264aa..b94bb0dc 100644 --- a/decnet/orchestrator/drivers/ssh.py +++ b/decnet/orchestrator/drivers/ssh.py @@ -26,7 +26,12 @@ from datetime import datetime, timezone from decnet.logging import get_logger from decnet.orchestrator.drivers.base import ActivityDriver, ActivityResult -from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction +from decnet.orchestrator.scheduler import ( + Action, + EditAction, + FileAction, + TrafficAction, +) log = get_logger("orchestrator.ssh") @@ -109,6 +114,8 @@ class SSHDriver(ActivityDriver): return await self._run_traffic(action) if isinstance(action, FileAction): return await self._run_file(action) + if isinstance(action, EditAction): + return await self._run_edit(action) raise TypeError(f"unsupported action type: {type(action)!r}") async def _run_traffic(self, action: TrafficAction) -> ActivityResult: @@ -135,6 +142,61 @@ class SSHDriver(ActivityDriver): ) return ActivityResult(success=success, payload=payload) + async def _run_edit(self, action: EditAction) -> ActivityResult: + """Mutate an existing synthetic file in place. + + The realism planner already loaded the previous body from the + ``synthetic_files`` row, so we don't re-fetch via ``read_file``; + the body the planner saw is the body we mutate. This avoids a + TOCTOU window where the file changed between pick and apply + (the realism worker is the only writer in the MVP, but the + contract should still be tight). + """ + from decnet.realism.bodies import next_iteration as _next_iteration + from decnet.realism.taxonomy import ContentClass + + try: + cls = ContentClass(action.content_class) + except ValueError: + return ActivityResult( + success=False, + payload={ + "dst_decky": action.dst_name, + "path": action.path, + "error": f"unknown content_class: {action.content_class!r}", + }, + ) + try: + new_body = _next_iteration( + cls, action.persona, action.previous_body, + ) + except KeyError: + return ActivityResult( + success=False, + payload={ + "dst_decky": action.dst_name, + "path": action.path, + "error": ( + f"content_class={cls!s} does not support edits" + ), + }, + ) + result = await self.plant_file( + action.dst_name, + action.path, + new_body.encode("utf-8"), + mode=0o644, + mtime=action.mtime, + ) + # Carry edit-specific metadata through to the orchestrator + # event payload so the worker's synthetic_files bump (and the + # dashboard's lineage view) sees what actually landed. + if result.success: + result.payload["new_body"] = new_body + result.payload["new_body_bytes"] = len(new_body.encode("utf-8")) + result.payload["synthetic_file_uuid"] = action.synthetic_file_uuid + return result + async def _run_file(self, action: FileAction) -> ActivityResult: # FileAction's content is a string; the realism path uses # bytes-typed plant_file so binary blobs (DOCX/PDF, future diff --git a/decnet/orchestrator/events.py b/decnet/orchestrator/events.py index c63af0b7..83497c4f 100644 --- a/decnet/orchestrator/events.py +++ b/decnet/orchestrator/events.py @@ -6,7 +6,12 @@ from typing import Any from decnet.bus import topics as _topics from decnet.orchestrator.drivers.base import ActivityResult -from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction +from decnet.orchestrator.scheduler import ( + Action, + EditAction, + FileAction, + TrafficAction, +) def to_row(action: Action, result: ActivityResult) -> dict[str, Any]: @@ -31,6 +36,16 @@ def to_row(action: Action, result: ActivityResult) -> dict[str, Any]: src_decky_uuid=None, dst_decky_uuid=action.dst_uuid, ) + elif isinstance(action, EditAction): + # EditAction shares the "file" kind (same dashboard view, same + # bus topic family) but action="file:edit" lets queries + # discriminate when needed. + base.update( + kind="file", + action=action.description, + src_decky_uuid=None, + dst_decky_uuid=action.dst_uuid, + ) else: raise TypeError(f"unsupported action type: {type(action)!r}") return base @@ -40,7 +55,7 @@ def topic_for(action: Action) -> str: """Map an action to its bus topic.""" if isinstance(action, TrafficAction): return _topics.orchestrator(_topics.ORCHESTRATOR_TRAFFIC, action.dst_uuid) - if isinstance(action, FileAction): + if isinstance(action, (FileAction, EditAction)): return _topics.orchestrator(_topics.ORCHESTRATOR_FILE, action.dst_uuid) raise TypeError(f"unsupported action type: {type(action)!r}") @@ -48,6 +63,6 @@ def topic_for(action: Action) -> str: def event_type_for(action: Action) -> str: if isinstance(action, TrafficAction): return _topics.ORCHESTRATOR_TRAFFIC - if isinstance(action, FileAction): + if isinstance(action, (FileAction, EditAction)): return _topics.ORCHESTRATOR_FILE raise TypeError(f"unsupported action type: {type(action)!r}") diff --git a/decnet/orchestrator/scheduler.py b/decnet/orchestrator/scheduler.py index 35bb5703..67233dff 100644 --- a/decnet/orchestrator/scheduler.py +++ b/decnet/orchestrator/scheduler.py @@ -58,7 +58,28 @@ class FileAction: description: str = "file:create" -Action = TrafficAction | FileAction +@dataclass(frozen=True) +class EditAction: + """Read-modify-write of an existing synthetic file. + + Stage 3b of the realism migration: a previously-planted ``TODO.md`` + gets a checkbox flipped, a notes file gets a new line appended, a + cron log gets a fresh entry tacked on. ``synthetic_file_uuid`` is + the row in ``synthetic_files`` to update; ``previous_body`` is + what the planner already saw so the driver doesn't double-fetch. + """ + dst_uuid: str + dst_name: str + path: str + persona: str + content_class: str + previous_body: str + synthetic_file_uuid: str + mtime: Optional[datetime] = None + description: str = "file:edit" + + +Action = TrafficAction | FileAction | EditAction def _has_ssh(decky: dict[str, Any]) -> bool: @@ -100,26 +121,64 @@ async def pick_file( *, now: Optional[datetime] = None, rand: Optional[secrets.SystemRandom] = None, -) -> Optional[FileAction]: - """Realism-driven file action. +) -> Optional[Action]: + """Realism-driven file action — create or edit. Resolves personas per decky (topology pool when the decky has a parent topology; global pool otherwise), filters to deckies in any - persona's work hours, asks :func:`decnet.realism.planner.pick` to - pick the (decky, persona, content_class, path, body, mtime), and - maps the resulting :class:`Plan` to a :class:`FileAction` the + persona's work hours, optionally fetches an edit candidate from + the synthetic_files table, and asks + :func:`decnet.realism.planner.pick` to choose between create / edit + / leave-alone. Maps the resulting :class:`Plan` to a + :class:`FileAction` (create) or :class:`EditAction` (edit) the SSH driver can dispatch. Returns ``None`` when no decky has a non-empty persona pool with a - persona currently in its active-hours window. + persona currently in its active-hours window, or when the planner + rolled "leave alone." """ rng = rand or secrets.SystemRandom() when = now or datetime.now(timezone.utc) enriched = await _resolve_personas(deckies, repo) - plan = _realism_pick(enriched, when, rand=rng) + if not enriched: + return None + + # Pre-fetch a single edit candidate from a random eligible decky, + # so the planner can decide whether to use it. We pick the decky + # client-side (cheap) and ask the repo for one row; if there's + # nothing editable, planner falls back to create. + edit_candidate = None + if rng.random() < 0.5 and enriched: + # Half the ticks consider an edit. Lower than the planner's + # 30% edit weight on purpose — the repo lookup is the + # expensive part, no point doing it on every tick. + candidate_decky = rng.choice(enriched) + try: + row = await repo.pick_random_synthetic_file_for_edit( + candidate_decky["uuid"], + ) + except Exception: # noqa: BLE001 + row = None + if row is not None: + row = {**row, "decky_name": candidate_decky["name"]} + edit_candidate = row + + plan = _realism_pick(enriched, when, edit_candidate=edit_candidate, rand=rng) if plan is None: return None + + if plan.action == "edit": + return EditAction( + dst_uuid=plan.decky_uuid, + dst_name=plan.decky_name, + path=plan.target_path, + persona=plan.persona, + content_class=plan.content_class.value, + previous_body=plan.previous_body or "", + synthetic_file_uuid=(edit_candidate or {}).get("uuid", ""), + mtime=plan.mtime, + ) return FileAction( dst_uuid=plan.decky_uuid, dst_name=plan.decky_name, @@ -203,6 +262,7 @@ def _topology_personas(topology: Optional[dict[str, Any]]) -> list[EmailPersona] # ``Plan`` from the scheduler keep working through the migration. __all__ = [ "Action", + "EditAction", "FileAction", "Plan", "TrafficAction", diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 3d23afdd..f8e4d4a4 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -212,14 +212,24 @@ async def _one_tick(repo: BaseRepository, bus) -> None: await _persist_email(repo, action, result, bus) else: await _persist_event(repo, action, result, bus) - if isinstance(action, scheduler.FileAction) and result.success: - try: - await _record_synthetic_file(repo, action) - except Exception as exc: # noqa: BLE001 - logger.warning( - "orchestrator: synthetic_files write failed dst=%s path=%s: %s", - action.dst_uuid, action.path, exc, - ) + if result.success: + if isinstance(action, scheduler.FileAction): + try: + await _record_synthetic_file(repo, action) + except Exception as exc: # noqa: BLE001 + logger.warning( + "orchestrator: synthetic_files write failed dst=%s path=%s: %s", + action.dst_uuid, action.path, exc, + ) + elif isinstance(action, scheduler.EditAction): + try: + await _bump_synthetic_file_after_edit(repo, action, result) + except Exception as exc: # noqa: BLE001 + logger.warning( + "orchestrator: synthetic_files edit-bump failed " + "dst=%s path=%s: %s", + action.dst_uuid, action.path, exc, + ) async def _persist_event(repo, action, result, bus) -> None: @@ -284,6 +294,41 @@ async def _persist_email(repo, action: EmailAction, result, bus) -> None: ) +async def _bump_synthetic_file_after_edit(repo, action, result) -> None: + """Patch ``synthetic_files`` after a successful EditAction. + + Bumps ``edit_count`` + ``last_modified`` + ``content_hash`` so the + dashboard's lineage view shows the change. When the row's UUID + isn't on the action (planner produced an edit plan from a stale + candidate that the repo pruned in between), the update is a no-op + — resurrecting a pruned row isn't this layer's job. + + The new body comes from ``result.payload["new_body"]`` (the SSH + driver stashes it on success); we re-hash here so the orchestrator, + not the driver, owns the canonical hash field. + """ + if not action.synthetic_file_uuid: + return + new_body = result.payload.get("new_body", "") + rows = await repo.list_synthetic_files(decky_uuid=action.dst_uuid, limit=200) + existing = next( + (r for r in rows if r.get("uuid") == action.synthetic_file_uuid), + None, + ) + if existing is None: + return # candidate was pruned mid-flight; skip silently + patch: dict = { + "last_modified": datetime.now(timezone.utc), + "edit_count": int(existing.get("edit_count", 0)) + 1, + } + if new_body: + patch["content_hash"] = hashlib.sha256( + new_body.encode("utf-8"), + ).hexdigest() + patch["last_body"] = new_body[:65536] + await repo.update_synthetic_file(action.synthetic_file_uuid, patch) + + async def _record_synthetic_file(repo, action) -> None: """Persist (or patch) a synthetic_files row after a FileAction plant. diff --git a/decnet/realism/bodies.py b/decnet/realism/bodies.py index 801337ab..ec96f47c 100644 --- a/decnet/realism/bodies.py +++ b/decnet/realism/bodies.py @@ -231,3 +231,120 @@ def make_body( f"no body generator registered for content_class={content_class!r}" ) return gen(persona, rng) + + +# ── Edit-in-place mutators ───────────────────────────────────────────────── +# Stage 3b: deterministic per-class mutations. The contract: take the +# previous body bytes, return a plausible *next* iteration (append a +# line, flip a checkbox, fix a typo). Append-only for logs; small +# in-place edits for user content. LLM enrichment in stage 6 wires +# next_iteration to ask "what would write next" with the +# previous body in the prompt; the deterministic path stays as the +# fallback. + + +def _edit_todo( + prev: str, persona: str, rng: secrets.SystemRandom, +) -> str: + """Flip an unchecked box, append a new item, or both. + + Real TODO files evolve: items get checked off as work happens, new + items get added, occasionally a sub-bullet appears under an + existing one. We pick one of those mutations per call. + """ + lines = prev.splitlines() + unchecked_indices = [ + i for i, ln in enumerate(lines) if ln.startswith("- [ ]") + ] + op = rng.choice(("flip", "append", "both") if unchecked_indices else ("append",)) + if op in ("flip", "both") and unchecked_indices: + idx = rng.choice(unchecked_indices) + lines[idx] = lines[idx].replace("- [ ]", "- [x]", 1) + if op in ("append", "both"): + new_item = rng.choice(_TODO_VERBS) + marker = "[x]" if rng.random() < 0.15 else "[ ]" + lines.append(f"- {marker} {new_item}") + return "\n".join(lines) + ("" if prev.endswith("\n") else "\n") + + +def _edit_note( + prev: str, persona: str, rng: secrets.SystemRandom, +) -> str: + """Append one new note line or insert a follow-up under an existing one.""" + new_line = rng.choice(_NOTE_TEMPLATES) + if prev.endswith("\n"): + return prev + new_line + "\n" + return prev + "\n" + new_line + "\n" + + +def _edit_draft( + prev: str, persona: str, rng: secrets.SystemRandom, +) -> str: + """Append a new short paragraph to the existing draft.""" + addition = ( + "\nFollow-up: I'll send the deck once finance signs off on the numbers.\n", + "\nP.S.: Looping in ops on the rollout sequence — they have context I don't.\n", + "\nLet me know if any of this needs another pass.\n", + ) + return prev.rstrip() + "\n" + rng.choice(addition) + + +def _edit_script( + prev: str, persona: str, rng: secrets.SystemRandom, +) -> str: + """Append a comment line — scripts evolve via comments and small fixes.""" + comments = ( + "# TODO: handle the empty-input case\n", + "# 2026-04-27: hardened error path after the prod incident\n", + "# noqa: shellcheck disagrees but this is what the runbook says\n", + ) + return prev.rstrip() + "\n" + rng.choice(comments) + + +def _edit_log_cron( + prev: str, persona: str, rng: secrets.SystemRandom, +) -> str: + """Append one new cron syslog line — logs only ever grow.""" + extra = _body_log_cron(persona, rng) + return prev.rstrip() + "\n" + extra.splitlines()[-1] + "\n" + + +def _edit_log_daemon( + prev: str, persona: str, rng: secrets.SystemRandom, +) -> str: + extra = _body_log_daemon(persona, rng) + return prev.rstrip() + "\n" + extra.splitlines()[-1] + "\n" + + +_EDITORS: dict[ContentClass, Callable[[str, str, secrets.SystemRandom], str]] = { + ContentClass.NOTE: _edit_note, + ContentClass.TODO: _edit_todo, + ContentClass.DRAFT: _edit_draft, + ContentClass.SCRIPT: _edit_script, + ContentClass.LOG_CRON: _edit_log_cron, + ContentClass.LOG_DAEMON: _edit_log_daemon, +} + + +def next_iteration( + content_class: ContentClass, + persona: str, + previous_body: str, + *, + rand: Optional[secrets.SystemRandom] = None, +) -> str: + """Return the next-iteration body for an edit-in-place mutation. + + Raises :class:`KeyError` for content classes that don't support + editing (canary blobs, cache-tmp scratch files, email). The + planner filters those out before producing an :class:`EditAction`, + so reaching this branch with an unsupported class is a bug worth + surfacing loudly. + """ + rng = rand or secrets.SystemRandom() + editor = _EDITORS.get(content_class) + if editor is None: + raise KeyError( + f"content_class={content_class!r} does not support edits" + ) + return editor(previous_body, persona, rng) diff --git a/decnet/realism/planner.py b/decnet/realism/planner.py index 1000cdc9..bc2fd4b3 100644 --- a/decnet/realism/planner.py +++ b/decnet/realism/planner.py @@ -26,7 +26,7 @@ from typing import Any, Optional, Sequence from decnet.realism import bodies, naming from decnet.realism.diurnal import in_work_hours, sample_mtime from decnet.realism.personas import EmailPersona -from decnet.realism.taxonomy import ContentClass, Plan +from decnet.realism.taxonomy import ContentClass, Plan, PlanAction # noqa: F401 # Stage-3 weighted sampling: @@ -83,16 +83,23 @@ def pick( deckies: Sequence[dict[str, Any]], now: datetime, *, + edit_candidate: Optional[dict[str, Any]] = None, rand: Optional[secrets.SystemRandom] = None, ) -> Optional[Plan]: """Return a single :class:`Plan` for the orchestrator's tick. - Stage-3 policy: create-only. Stage 3b extends with the - create/edit/leave roll and the synthetic_files lookup for edits. + Stage-3b policy: weighted action roll — 60% create, 30% edit, 10% + "leave alone" (planner returns ``None`` to skip). When the roll + is "edit" and *edit_candidate* is set (a row from + :meth:`BaseRepository.pick_random_synthetic_file_for_edit`), we + return an edit Plan; otherwise we fall through to create. - Returns ``None`` when no eligible (decky, persona) pair exists — - the orchestrator treats that as "skip this tick" the same way the - pre-realism scheduler did. + The orchestrator scheduler is responsible for fetching the edit + candidate before calling — keeps this function pure-of-DB and + test-friendly. + + Returns ``None`` when no eligible (decky, persona) pair exists or + when the action roll lands on "leave alone." """ rng = rand or secrets.SystemRandom() @@ -100,12 +107,18 @@ def pick( if not eligible: return None + # Action roll. Edit only fires when there's a candidate from the + # repo — otherwise we either re-roll to create or skip. + roll = rng.random() + if roll < 0.10: + return None # "leave alone" — quiet tick is realism too + if roll < 0.40 and edit_candidate is not None: + return _edit_plan(edit_candidate, now, rng) + decky, persona = rng.choice(eligible) # User vs system content — biased toward user (realism wins are - # bigger there). Once stage 3b ships edit-in-place, the edit - # branch will reuse the same content_class as the existing row; - # the create branch picks fresh here. + # bigger there). if rng.random() < 0.7: content_class = _weighted_pick(_USER_CLASS_WEIGHTS, rng) else: @@ -130,3 +143,48 @@ def pick( f"window={persona.active_hours}", ), ) + + +def _edit_plan( + candidate: dict[str, Any], + now: datetime, + rng: secrets.SystemRandom, +) -> Optional[Plan]: + """Build an edit-action :class:`Plan` from a synthetic_files row. + + The candidate dict is the shape :meth:`BaseRepository.list_synthetic_files` + returns — we only need ``decky_uuid``, ``path``, ``persona``, + ``content_class``, ``last_body``, ``uuid``. Returns ``None`` if + the candidate's content_class is somehow not editable (defensive + — the repo query already filters those out). + """ + try: + cls = ContentClass(candidate["content_class"]) + except (KeyError, ValueError): + return None + if cls.is_canary() or cls == ContentClass.CACHE_TMP: + return None + # mtime: edits bump forward by ~hours-to-days, but never past now. + # We model as "the file was edited some time after creation but + # before now" — sample_mtime with a tighter cap keeps it recent. + edit_mtime = sample_mtime( + "00:00-00:00", now, rand=rng, + backdate_min_hours=1.0, backdate_max_days=2.0, + ) + return Plan( + decky_uuid=candidate["decky_uuid"], + decky_name=candidate.get("decky_name", ""), + persona=candidate.get("persona", ""), + content_class=cls, + action="edit", + target_path=candidate["path"], + mtime=edit_mtime, + body_hint=None, # edit uses previous_body, not a fresh hint + previous_body=candidate.get("last_body", ""), + notes=( + f"persona={candidate.get('persona', '')}", + f"class={cls.value}", + "action=edit", + f"synthetic_file_uuid={candidate.get('uuid', '')}", + ), + ) diff --git a/tests/realism/test_edit.py b/tests/realism/test_edit.py new file mode 100644 index 00000000..f6d58700 --- /dev/null +++ b/tests/realism/test_edit.py @@ -0,0 +1,98 @@ +"""next_iteration mutators per content class. + +Stage 3b — read-modify-write contract: each editor takes a previous +body and returns a plausible next iteration. Append-only for logs; +small in-place edits for user content. +""" +from __future__ import annotations + +import random + +import pytest + +from decnet.realism.bodies import next_iteration +from decnet.realism.taxonomy import ContentClass + + +def test_todo_edit_can_flip_an_unchecked_box() -> None: + prev = "- [ ] rotate keys\n- [ ] review pr\n" + seen_flip = False + for seed in range(40): + new = next_iteration( + ContentClass.TODO, "admin", prev, rand=random.Random(seed), + ) + if "[x]" in new and "rotate" in new and "[x] rotate" in new: + seen_flip = True + if "[x]" in new and "[x] review" in new: + seen_flip = True + if seen_flip: + break + assert seen_flip, "no checkbox flip across 40 seeds — mutator broken" + + +def test_todo_edit_grows_or_holds_line_count() -> None: + prev = "- [ ] rotate keys\n" + new = next_iteration( + ContentClass.TODO, "admin", prev, rand=random.Random(0), + ) + # Mutators may flip a box (same line count) or append (more lines) + # — but never shrink the file. + assert len(new.splitlines()) >= len(prev.splitlines()) + + +def test_log_cron_edit_is_append_only() -> None: + prev = ( + "Apr 27 09:00:01 hostname CRON[1234]: (root) CMD (run-parts /etc/cron.daily)\n" + ) + new = next_iteration( + ContentClass.LOG_CRON, "admin", prev, rand=random.Random(0), + ) + assert new.startswith(prev.rstrip()) + assert len(new.splitlines()) > len(prev.splitlines()) + + +def test_log_daemon_edit_is_append_only() -> None: + prev = "Apr 27 09:00:01 hostname systemd[1]: Started Daily apt download activities.\n" + new = next_iteration( + ContentClass.LOG_DAEMON, "admin", prev, rand=random.Random(0), + ) + assert new.startswith(prev.rstrip()) + + +def test_note_edit_grows_the_body() -> None: + prev = "remember to ping the on-call\n" + new = next_iteration( + ContentClass.NOTE, "admin", prev, rand=random.Random(0), + ) + assert prev in new + assert len(new) > len(prev) + + +def test_draft_edit_appends_paragraph() -> None: + prev = "Hi team,\n\nQuick update.\n" + new = next_iteration( + ContentClass.DRAFT, "admin", prev, rand=random.Random(0), + ) + assert new.startswith(prev.rstrip()) + assert len(new) > len(prev) + + +def test_script_edit_appends_comment() -> None: + prev = "#!/usr/bin/env bash\nset -e\necho 'hi'\n" + new = next_iteration( + ContentClass.SCRIPT, "admin", prev, rand=random.Random(0), + ) + assert new.startswith(prev.rstrip()) + # New tail must be a comment (the editor's contract); never a + # silently-injected new exec line. + new_tail = new[len(prev.rstrip()):].strip() + assert new_tail.startswith("#") + + +@pytest.mark.parametrize("cls", [ + ContentClass.CACHE_TMP, ContentClass.EMAIL, + ContentClass.CANARY_AWS_CREDS, ContentClass.CANARY_HONEYDOC, +]) +def test_unsupported_classes_raise_in_edit(cls: ContentClass) -> None: + with pytest.raises(KeyError): + next_iteration(cls, "admin", "anything")