From cb1872c52f4e05659c87ec1fb40c32a44dbdacb8 Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 27 Apr 2026 16:22:07 -0400 Subject: [PATCH] feat(realism): synthetic_files table + planner wiring + scheduler swap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 3 of the realism migration. Replaces orchestrator/scheduler.py's hardcoded _FILE_TEMPLATES/_USERS (3 templates emitting epoch-suffixed filenames like notes-1777315854.txt with identical bodies per template) with a persona-driven realism engine. New surface: - SyntheticFile SQLModel (synthetic_files table, UNIQUE on decky_uuid+path) — per-(decky, path) state for the future edit-in-place flow. Pre-v1, no _migrate_* helper. - BaseRepository methods: record_synthetic_file, update_synthetic_file, list_synthetic_files, pick_random_synthetic_file_for_edit (used by stage 3b). - realism/naming.py: per-content-class filename templates, persona-conditioned. /var/log/cron.log + logrotate skeleton for system-class; /home//TODO.md, scratch.md, etc. for user-class. Anti-regression test pins "no 8+ digit decimals in basenames" (the realism failure today). - realism/bodies.py: deterministic body templates per content_class. TODO body uses checkbox markdown, script body has a shebang, cron body matches syslog cron shape ("CRON[PID]: (user) CMD (...)"). - realism/planner.py: pick(deckies, now, rng) returns a Plan. Diurnal-gated, weighted user/system content split (70/30 user bias). Create-only in stage 3; edit branch lands in stage 3b. Scheduler split: - scheduler.pick is now traffic-only (sync). - scheduler.pick_file is async, takes a repo, resolves personas (Topology.email_personas for topology-source deckies; global realism.personas_pool otherwise), and maps Plan -> FileAction. - FileAction gains persona/content_class/mtime fields. Worker: - _one_tick rolls 50/50 between traffic and file each tick. After a successful FileAction plant, _record_synthetic_file persists or patches the synthetic_files row (catching the unique-constraint collision on re-plant of the same path). - SSHDriver._run_file passes action.mtime through to plant_file so files don't all stamp at wall-clock-now. --- decnet/orchestrator/drivers/ssh.py | 3 + decnet/orchestrator/scheduler.py | 198 +++++++++++++---- decnet/orchestrator/worker.py | 86 +++++++- decnet/realism/bodies.py | 233 +++++++++++++++++++++ decnet/realism/naming.py | 192 +++++++++++++++++ decnet/realism/planner.py | 143 ++++++++++--- decnet/web/db/models/__init__.py | 7 + decnet/web/db/models/realism.py | 72 +++++++ decnet/web/db/repository.py | 57 +++++ decnet/web/db/sqlmodel_repo.py | 78 +++++++ tests/orchestrator/test_scheduler.py | 197 ++++++++++++++--- tests/realism/test_bodies.py | 68 ++++++ tests/realism/test_naming.py | 95 +++++++++ tests/realism/test_planner.py | 101 +++++++++ tests/realism/test_synthetic_files_repo.py | 116 ++++++++++ 15 files changed, 1541 insertions(+), 105 deletions(-) create mode 100644 decnet/realism/bodies.py create mode 100644 decnet/realism/naming.py create mode 100644 decnet/web/db/models/realism.py create mode 100644 tests/realism/test_bodies.py create mode 100644 tests/realism/test_naming.py create mode 100644 tests/realism/test_planner.py create mode 100644 tests/realism/test_synthetic_files_repo.py diff --git a/decnet/orchestrator/drivers/ssh.py b/decnet/orchestrator/drivers/ssh.py index 73289b73..276264aa 100644 --- a/decnet/orchestrator/drivers/ssh.py +++ b/decnet/orchestrator/drivers/ssh.py @@ -139,11 +139,14 @@ class SSHDriver(ActivityDriver): # FileAction's content is a string; the realism path uses # bytes-typed plant_file so binary blobs (DOCX/PDF, future # canary artifacts) survive the wire. Encode-once here. + # mtime carries through from the realism planner so the file + # doesn't stamp at wall-clock-now (the realism failure today). return await self.plant_file( action.dst_name, action.path, action.content.encode("utf-8"), mode=0o644, + mtime=action.mtime, ) async def plant_file( diff --git a/decnet/orchestrator/scheduler.py b/decnet/orchestrator/scheduler.py index 1d1e68e7..35bb5703 100644 --- a/decnet/orchestrator/scheduler.py +++ b/decnet/orchestrator/scheduler.py @@ -1,28 +1,30 @@ """Action picker for the orchestrator. -MVP policy: flat random — pick one (src, dst) pair where both deckies -expose SSH, then choose one of {ssh-traffic, file-touch}. No diurnal -shaping, no role-aware pairing — those land in v1. +Stage-3 realism: file actions are sourced from +:func:`decnet.realism.planner.pick`, not the old hardcoded +``_FILE_TEMPLATES``/``_USERS`` constants. Persona resolution per +decky still belongs here (the realism planner is pure of +:class:`~decnet.web.db.repository.BaseRepository` knowledge) — we +walk each decky to either ``Topology.email_personas`` or the +``decnet.realism.personas_pool`` global pool, depending on +``decky["source"]``, then hand the resolved set to the planner. + +TrafficAction stays untouched: still a flat random pair-pick of +SSH-capable deckies. Email actions land in stage 5 of the realism +migration when the emailgen worker collapses into the orchestrator. """ from __future__ import annotations +import json import secrets from dataclasses import dataclass from datetime import datetime, timezone from typing import Any, Optional, Sequence -# A small set of plausible filenames the orchestrator drops or refreshes. -# Scope on purpose: the file driver is "prove the docker-exec write path -# works", not "generate believable user activity". Realism is v2. -# Paths target the filesystem *inside* a decoy container, not the host. -# Bandit B108 is a host-side concern; suppressed at the data definition. -_FILE_TEMPLATES: tuple[tuple[str, str], ...] = ( # nosec B108 - ("/tmp/.cache-{ts}.tmp", "session={ts}\n"), # nosec B108 - ("/var/log/cron-{ts}.log", "{ts} CRON[{n}]: ({user}) CMD (run-parts /etc/cron.daily)\n"), - ("/home/{user}/notes-{ts}.txt", "todo: rotate keys; check on backup task\n"), -) - -_USERS = ("admin", "ubuntu", "service") +from decnet.realism import personas_pool +from decnet.realism.personas import EmailPersona, parse_personas +from decnet.realism.planner import pick as _realism_pick +from decnet.realism.taxonomy import ContentClass, Plan @dataclass(frozen=True) @@ -38,10 +40,21 @@ class TrafficAction: @dataclass(frozen=True) class FileAction: + """One file plant request the SSH driver materialises. + + Stage-3 realism: ``persona`` / ``content_class`` / ``mtime`` are + populated when the action came through :func:`pick_file`. Older + direct constructions (tests, manual operator drives) leave them + at the defaults — back-compat for the pre-realism call sites + that haven't migrated yet. + """ dst_uuid: str dst_name: str path: str content: str + persona: str = "" + content_class: str = ContentClass.NOTE.value + mtime: Optional[datetime] = None description: str = "file:create" @@ -60,38 +73,139 @@ def pick( *, rand: Optional[secrets.SystemRandom] = None, ) -> Optional[Action]: - """Pick one action against the given decky set. + """Pick one *traffic* action against the given decky set. - Returns ``None`` when no action is possible (fewer than two SSH-capable - deckies for traffic, or no deckies at all for file ops). The worker - treats ``None`` as "skip this tick". + Returns ``None`` when no SSH-capable pair is available. File + actions are produced by :func:`pick_file` (async — needs the repo + for persona resolution). The orchestrator worker calls one or the + other per tick, weighted 50/50. """ rng = rand or secrets.SystemRandom() ssh_deckies = [d for d in deckies if _has_ssh(d) and d.get("ip")] - if not ssh_deckies: + if len(ssh_deckies) < 2: return None - - kind = "traffic" if (len(ssh_deckies) >= 2 and rng.random() < 0.5) else "file" - - if kind == "traffic": - src, dst = rng.sample(ssh_deckies, 2) - return TrafficAction( - src_uuid=src["uuid"], - src_name=src["name"], - dst_uuid=dst["uuid"], - dst_name=dst["name"], - dst_ip=dst["ip"], - ) - - dst = rng.choice(ssh_deckies) - template, content_template = rng.choice(_FILE_TEMPLATES) - ts = int(datetime.now(timezone.utc).timestamp()) - user = rng.choice(_USERS) - path = template.format(ts=ts, user=user) - content = content_template.format(ts=ts, user=user, n=rng.randint(1000, 99999)) - return FileAction( + src, dst = rng.sample(ssh_deckies, 2) + return TrafficAction( + src_uuid=src["uuid"], + src_name=src["name"], dst_uuid=dst["uuid"], dst_name=dst["name"], - path=path, - content=content, + dst_ip=dst["ip"], ) + + +async def pick_file( + deckies: Sequence[dict[str, Any]], + repo: Any, + *, + now: Optional[datetime] = None, + rand: Optional[secrets.SystemRandom] = None, +) -> Optional[FileAction]: + """Realism-driven file action. + + Resolves personas per decky (topology pool when the decky has a + parent topology; global pool otherwise), filters to deckies in any + persona's work hours, asks :func:`decnet.realism.planner.pick` to + pick the (decky, persona, content_class, path, body, mtime), and + maps the resulting :class:`Plan` to a :class:`FileAction` the + SSH driver can dispatch. + + Returns ``None`` when no decky has a non-empty persona pool with a + persona currently in its active-hours window. + """ + rng = rand or secrets.SystemRandom() + when = now or datetime.now(timezone.utc) + + enriched = await _resolve_personas(deckies, repo) + plan = _realism_pick(enriched, when, rand=rng) + if plan is None: + return None + return FileAction( + dst_uuid=plan.decky_uuid, + dst_name=plan.decky_name, + path=plan.target_path, + content=plan.body_hint or "", + persona=plan.persona, + content_class=plan.content_class.value, + mtime=plan.mtime, + ) + + +async def _resolve_personas( + deckies: Sequence[dict[str, Any]], + repo: Any, +) -> list[dict[str, Any]]: + """Attach a resolved persona list to each decky dict. + + The realism planner expects each decky to carry + ``_realism_personas`` (list of :class:`EmailPersona`). We do the + repo lookups here so the planner stays pure-of-DB. + + Topology-source deckies pull from ``Topology.email_personas``. + Fleet/shard deckies pull from the global pool + (:func:`decnet.realism.personas_pool.load`). Decky source unknown + → fall back to global pool too; better noisy than silent. + """ + enriched: list[dict[str, Any]] = [] + topology_cache: dict[str, list[EmailPersona]] = {} + global_personas: Optional[list[EmailPersona]] = None + + for decky in deckies: + # Files are planted via the SSH service, same as TrafficAction. + # A decky without ssh has no realism file path today (windows + # personas / SMB writes land in a future stage). + if not _has_ssh(decky): + continue + + source = (decky.get("source") or "").lower() + topology_id = decky.get("topology_id") + + personas: list[EmailPersona] = [] + if source == "topology" and topology_id: + if topology_id not in topology_cache: + try: + topology = await repo.get_topology(topology_id) + except Exception: # noqa: BLE001 + topology = None + topology_cache[topology_id] = _topology_personas(topology) + personas = topology_cache[topology_id] + else: + if global_personas is None: + # Lazy-load once per call; the global-pool cache inside + # personas_pool already mtime-checks. + global_personas = personas_pool.load() + personas = global_personas + + if not personas: + continue + enriched.append({**decky, "_realism_personas": personas}) + + return enriched + + +def _topology_personas(topology: Optional[dict[str, Any]]) -> list[EmailPersona]: + if not topology: + return [] + raw = topology.get("email_personas") + if raw is None: + return [] + if isinstance(raw, list): + return parse_personas(raw, language_default=topology.get("language_default") or "en") + if isinstance(raw, str): + try: + return parse_personas(json.loads(raw), language_default=topology.get("language_default") or "en") + except json.JSONDecodeError: + return [] + return [] + + +# Lightweight no-op alias kept so external callers that already import +# ``Plan`` from the scheduler keep working through the migration. +__all__ = [ + "Action", + "FileAction", + "Plan", + "TrafficAction", + "pick", + "pick_file", +] diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 14b7cf9c..56750fe7 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -98,11 +98,29 @@ async def orchestrator_worker( async def _one_tick(repo: BaseRepository, driver, bus) -> None: + import secrets as _secrets + # Union view: MazeNET topology + unihost fleet + SWARM shards. Pre-fleet # this only saw topology_deckies and was permanently blind to MACVLAN / # IPVLAN unihost decoys. deckies = await repo.list_running_deckies() - action = scheduler.pick(deckies) + rng = _secrets.SystemRandom() + + # Action-kind roll: 50/50 traffic vs file. Stage 5 of the realism + # migration adds an email branch (when emailgen folds in). When a + # roll yields nothing actionable (e.g. file branch with no personas + # in any persona's work hours), we fall through to the other side + # so a quiet half doesn't silence the whole tick. + action = None + if rng.random() < 0.5: + action = scheduler.pick(deckies, rand=rng) + if action is None: + action = await scheduler.pick_file(deckies, repo, rand=rng) + else: + action = await scheduler.pick_file(deckies, repo, rand=rng) + if action is None: + action = scheduler.pick(deckies, rand=rng) + if action is None: # Report the actual SSH-eligible count (what the scheduler filters # to), not just len(deckies) — the old "running+ssh count=N" line @@ -128,6 +146,18 @@ async def _one_tick(repo: BaseRepository, driver, bus) -> None: result = await driver.run(action) row = events.to_row(action, result) await repo.record_orchestrator_event(row) + # Persist realism state for FileAction so stage 3b's edit-in-place + # has something to read back. Failure here is logged but doesn't + # tank the tick — the orchestrator event is the source of truth + # for "this action happened." + if isinstance(action, scheduler.FileAction) and result.success: + try: + await _record_synthetic_file(repo, action, result) + except Exception as exc: # noqa: BLE001 + logger.warning( + "orchestrator: synthetic_files write failed dst=%s path=%s: %s", + action.dst_uuid, action.path, exc, + ) if bus is not None: topic = events.topic_for(action) @@ -151,3 +181,57 @@ async def _one_tick(repo: BaseRepository, driver, bus) -> None: "orchestrator tick kind=%s success=%s dst=%s", row["kind"], row["success"], row["dst_decky_uuid"], ) + + +async def _record_synthetic_file(repo, action, result) -> None: + """Persist a synthetic_files row after a successful FileAction plant. + + Idempotent on ``(decky_uuid, path)``: when the unique constraint + fires (the file existed already), we instead patch the existing + row's ``last_modified`` / ``content_hash`` / ``last_body`` / bump + ``edit_count`` so the dashboard's "files this decky has grown" + view stays accurate even when the orchestrator re-plants the same + location. + """ + import hashlib + from datetime import datetime, timezone + + body = action.content or "" + content_hash = hashlib.sha256(body.encode("utf-8")).hexdigest() + now = datetime.now(timezone.utc) + row = { + "decky_uuid": action.dst_uuid, + "path": action.path, + "persona": action.persona, + "content_class": action.content_class, + "created_at": now, + "last_modified": now, + "edit_count": 0, + "content_hash": content_hash, + # Cap the persisted body — large blobs (DOCX/PDF/canary + # artifacts in stage 7) are wasted disk on this side; the + # decky filesystem holds the canonical bytes. + "last_body": body[:65536], + } + try: + await repo.record_synthetic_file(row) + except Exception: # noqa: BLE001 + # Most likely the unique constraint on (decky_uuid, path) + # fired — flip to update mode by looking up the existing row. + existing = await repo.list_synthetic_files( + decky_uuid=action.dst_uuid, limit=200, + ) + match = next( + (r for r in existing if r.get("path") == action.path), None, + ) + if match is None: + raise + await repo.update_synthetic_file( + match["uuid"], + { + "last_modified": now, + "content_hash": content_hash, + "last_body": body[:65536], + "edit_count": int(match.get("edit_count", 0)) + 1, + }, + ) diff --git a/decnet/realism/bodies.py b/decnet/realism/bodies.py new file mode 100644 index 00000000..801337ab --- /dev/null +++ b/decnet/realism/bodies.py @@ -0,0 +1,233 @@ +"""Per-content-class body generators (deterministic templates). + +Stage 3 of the realism migration ships deterministic per-class +templates — varied enough that two notes on the same decky aren't +identical, formulaic enough that system-class files (cron logs, +journal entries) look like cron actually wrote them. + +Stage 6 wires LLM enrichment for user-classes; the templates here +remain the fallback path so the orchestrator tick never blocks on +Ollama. + +Determinism: every namer/body takes a :class:`SystemRandom` (from +:mod:`secrets`). Tests pin the RNG seed for reproducibility; the +orchestrator passes a fresh RNG per tick so production picks are +unpredictable. + +The factory mirrors :mod:`decnet.realism.naming`: caller passes a +:class:`~decnet.realism.taxonomy.ContentClass`; we return the body +generator registered for it. Email + canary classes raise — +those bodies come from the email driver and canary cultivator +respectively, not from realism.bodies. +""" +from __future__ import annotations + +import secrets +from datetime import datetime, timezone +from typing import Callable, Optional + +from decnet.realism.taxonomy import ContentClass + + +# ── User-class body generators ───────────────────────────────────────────── + + +_NOTE_TEMPLATES: tuple[str, ...] = ( + "follow up with the team on this", + "remember to ping the on-call", + "ask about the staging migration timeline", + "double-check the runbook before next shift", + "todo: rotate keys; check on backup task", + "meeting notes from yesterday — copy onto wiki when free", + "this is broken in prod; talk to ops monday", + "draft response to the auditor — keep it short", +) + + +def _body_note(persona: str, rng: secrets.SystemRandom) -> str: + n = rng.randint(2, 5) + lines = rng.sample(_NOTE_TEMPLATES, k=min(n, len(_NOTE_TEMPLATES))) + return "\n".join(lines) + "\n" + + +_TODO_VERBS: tuple[str, ...] = ( + "rotate keys", "review pr", + "clean up logs", "update docs", + "follow up on ticket", + "test backup restore", + "deploy to staging", + "ack auditor email", + "patch CVE backlog", +) + + +def _body_todo(persona: str, rng: secrets.SystemRandom) -> str: + n = rng.randint(3, 7) + items = rng.sample(_TODO_VERBS, k=min(n, len(_TODO_VERBS))) + # Roughly a third pre-checked — looks like a list that's been + # touched at least once. + out = [] + for item in items: + marker = "[x]" if rng.random() < 0.33 else "[ ]" + out.append(f"- {marker} {item}") + return "\n".join(out) + "\n" + + +_DRAFT_PARAGRAPHS: tuple[str, ...] = ( + "Hi team,\n\nQuick update on the project. We're tracking ahead of schedule " + "on the migration but the staging soak revealed a regression in the " + "auth path. I'll have a fix in by end of week.\n\nThanks,\n", + "Hi,\n\nFollowing up on yesterday's meeting. Action items below:\n\n" + "- Engineering owns the deployment plan\n" + "- Ops will draft the runbook update\n" + "- We sync again Friday\n\n", + "All,\n\nProposal attached. Key points:\n\n" + "1. We are not changing the data model in this release\n" + "2. The new endpoint is opt-in via feature flag\n" + "3. Rollback path is one config flip\n\n" + "Feedback by EOD?\n\n", +) + + +def _body_draft(persona: str, rng: secrets.SystemRandom) -> str: + return rng.choice(_DRAFT_PARAGRAPHS) + + +_SCRIPT_TEMPLATES: tuple[str, ...] = ( + "#!/usr/bin/env bash\nset -euo pipefail\n\n" + "BACKUP_DIR=/var/backups\n" + "STAMP=$(date +%Y%m%d-%H%M)\n" + "echo \"backup start $STAMP\"\n" + "tar czf \"$BACKUP_DIR/db-$STAMP.tar.gz\" /var/lib/mysql\n" + "echo \"backup done\"\n", + "#!/usr/bin/env bash\nset -e\n\n" + "# clean up old logs\n" + "find /var/log -name '*.log.*.gz' -mtime +30 -delete\n", + "#!/usr/bin/env python3\n\"\"\"Quick fix for the reporting job.\"\"\"\n" + "import sys\n\n" + "def main():\n print('todo: real fix here')\n\n" + "if __name__ == '__main__':\n sys.exit(main())\n", +) + + +def _body_script(persona: str, rng: secrets.SystemRandom) -> str: + return rng.choice(_SCRIPT_TEMPLATES) + + +# ── System-class body generators ─────────────────────────────────────────── + + +_CRON_COMMANDS: tuple[str, ...] = ( + "(root) CMD (run-parts /etc/cron.daily)", + "(root) CMD (run-parts /etc/cron.hourly)", + "(www-data) CMD (cd /var/www && /usr/bin/php artisan schedule:run)", + "(backup) CMD (/usr/local/bin/backup.sh)", + "(root) CMD (test -x /usr/sbin/anacron || ( cd / && run-parts --report /etc/cron.daily ))", +) + + +def _body_log_cron(persona: str, rng: secrets.SystemRandom) -> str: + n = rng.randint(8, 24) + base = datetime.now(timezone.utc) + lines = [] + for i in range(n): + hour = (base.hour - i) % 24 + minute = rng.randint(0, 59) + pid = rng.randint(1000, 99999) + cmd = rng.choice(_CRON_COMMANDS) + # ISO-ish "Apr 27 09:13:44 host CRON[1234]: ..." cron syslog shape. + date_s = base.strftime("%b %d") + lines.append( + f"{date_s} {hour:02d}:{minute:02d}:{rng.randint(0,59):02d} " + f"hostname CRON[{pid}]: {cmd}" + ) + return "\n".join(lines) + "\n" + + +_DAEMON_LINES: tuple[str, ...] = ( + "systemd[1]: Started Daily apt download activities.", + "systemd[1]: apt-daily.service: Succeeded.", + "systemd[1]: Reached target Multi-User System.", + "kernel: [UFW BLOCK] IN=eth0 OUT= MAC=…", + "sshd[2103]: pam_unix(sshd:session): session opened for user admin by (uid=0)", + "sshd[2103]: Received disconnect from 10.0.0.4 port 47282:11: disconnected by user", + "CRON[1894]: pam_unix(cron:session): session closed for user root", +) + + +def _body_log_daemon(persona: str, rng: secrets.SystemRandom) -> str: + n = rng.randint(10, 30) + lines = [] + base = datetime.now(timezone.utc) + for _ in range(n): + lines.append( + f"{base.strftime('%b %d %H:%M:%S')} hostname " + f"{rng.choice(_DAEMON_LINES)}" + ) + return "\n".join(lines) + "\n" + + +def _body_cache_tmp(persona: str, rng: secrets.SystemRandom) -> str: + # ~64-256 bytes of opaque session-ish payload — most /tmp/.cache-* + # files in the wild are short binary or k=v dumps. We emit ASCII + # so docker exec write paths don't need binary-safety acrobatics. + nbytes = rng.randint(64, 256) + chars = "abcdefghijklmnopqrstuvwxyz0123456789" + return "session=" + "".join(rng.choice(chars) for _ in range(nbytes)) + "\n" + + +def _body_email(persona: str, rng: secrets.SystemRandom) -> str: + raise NotImplementedError( + "email bodies come from the email driver, not realism.bodies" + ) + + +def _body_canary(persona: str, rng: secrets.SystemRandom) -> str: + raise NotImplementedError( + "canary bodies come from the canary cultivator (stage 7), " + "not realism.bodies" + ) + + +# ── Dispatch ─────────────────────────────────────────────────────────────── + + +_BODIES: dict[ContentClass, Callable[[str, secrets.SystemRandom], str]] = { + ContentClass.NOTE: _body_note, + ContentClass.TODO: _body_todo, + ContentClass.DRAFT: _body_draft, + ContentClass.SCRIPT: _body_script, + ContentClass.LOG_CRON: _body_log_cron, + ContentClass.LOG_DAEMON: _body_log_daemon, + ContentClass.CACHE_TMP: _body_cache_tmp, + ContentClass.EMAIL: _body_email, + ContentClass.CANARY_AWS_CREDS: _body_canary, + ContentClass.CANARY_ENV_FILE: _body_canary, + ContentClass.CANARY_GIT_CONFIG: _body_canary, + ContentClass.CANARY_SSH_KEY: _body_canary, + ContentClass.CANARY_HONEYDOC: _body_canary, + ContentClass.CANARY_HONEYDOC_DOCX: _body_canary, + ContentClass.CANARY_HONEYDOC_PDF: _body_canary, + ContentClass.CANARY_MYSQL_DUMP: _body_canary, +} + + +def make_body( + content_class: ContentClass, + persona: str, + *, + rand: Optional[secrets.SystemRandom] = None, +) -> str: + """Return deterministic body bytes (utf-8 string) for *content_class*. + + Stage 3 ships templates only; stage 6 adds an optional + ``LLMBackend`` parameter that, when supplied and the breaker is + closed, replaces the template return for user-classes. + """ + rng = rand or secrets.SystemRandom() + gen = _BODIES.get(content_class) + if gen is None: + raise KeyError( + f"no body generator registered for content_class={content_class!r}" + ) + return gen(persona, rng) diff --git a/decnet/realism/naming.py b/decnet/realism/naming.py new file mode 100644 index 00000000..7193581f --- /dev/null +++ b/decnet/realism/naming.py @@ -0,0 +1,192 @@ +"""Per-content-class filename generators. + +The pre-realism orchestrator emitted ``notes-1777315854.txt`` +(unix-epoch suffix) — a tell on first glance. Real users name +``notes.txt``, ``TODO.md``, ``backup-2025-04.sql.gz``. Real systems +write ``cron.log``, ``cron.log.1``, ``cron.log.2.gz`` (logrotate +shape, no epoch). + +Stage 3 ships **deterministic templates only**, persona-conditioned. +Stage 6 wires LLM enrichment for the user-classes (``note``, ``todo``, +``draft``, ``script``); the deterministic templates remain the +fallback when LLM is disabled or times out. + +The factory mirrors :func:`decnet.canary.factory.get_generator`: +caller passes a :class:`~decnet.realism.taxonomy.ContentClass`; we +return the namer registered for it. Renaming a content_class is a +schema change and would invalidate ``synthetic_files.path`` lookups, +so the dispatch is exhaustive — no silent fallbacks for unknown +classes. +""" +from __future__ import annotations + +import secrets +import string +from typing import Callable, Optional + +from decnet.realism.taxonomy import ContentClass + + +# Persona → home-dir convention. Most personas are linux-style; the +# rare "windows" persona gets ``C:\\Users\\\\Documents`` style +# paths (out of scope until per-OS personas land). For now everything +# is POSIX. +def _home(persona: str) -> str: + """Return the canonical home directory for *persona*. + + The persona's ``name`` is used as the linux username when it's a + plausible login (lowercase, no spaces); otherwise we fall back to + a generic ``user`` so the path doesn't reveal a persona display + name on the decky filesystem. + """ + candidate = persona.lower().replace(" ", "") + if candidate.isalnum() and candidate.isascii() and candidate: + return f"/home/{candidate}" + return "/home/user" + + +def _random_token(rng: secrets.SystemRandom, length: int = 6) -> str: + """Lowercase-alphanum token of length *length* — like ``mkstemp``.""" + return "".join(rng.choice(string.ascii_lowercase + string.digits) for _ in range(length)) + + +# ── User-class namers ────────────────────────────────────────────────────── + + +_NOTE_NAMES: tuple[str, ...] = ( + "notes.txt", "scratch.md", "ideas.txt", "Untitled-3.txt", + "draft.md", "keys.txt", "passwords.txt", "TODO.md", +) + +_TODO_NAMES: tuple[str, ...] = ( + "TODO.md", "todo.txt", "things.md", "tasks.txt", "punchlist.md", +) + +_DRAFT_NAMES: tuple[str, ...] = ( + "Q3-budget-DRAFT.md", "proposal.md", "letter.txt", + "rfc-internal.md", "memo.txt", "1on1-notes.md", +) + +_SCRIPT_NAMES: tuple[str, ...] = ( + "backup.sh", "deploy.sh", "cleanup.sh", "rotate.sh", + "fix.py", "tmp.py", "scratch.py", +) + + +def _name_user( + persona: str, names: tuple[str, ...], rng: secrets.SystemRandom, +) -> str: + return f"{_home(persona)}/{rng.choice(names)}" + + +def _name_note(persona: str, rng: secrets.SystemRandom) -> str: + return _name_user(persona, _NOTE_NAMES, rng) + + +def _name_todo(persona: str, rng: secrets.SystemRandom) -> str: + return _name_user(persona, _TODO_NAMES, rng) + + +def _name_draft(persona: str, rng: secrets.SystemRandom) -> str: + return _name_user(persona, _DRAFT_NAMES, rng) + + +def _name_script(persona: str, rng: secrets.SystemRandom) -> str: + return _name_user(persona, _SCRIPT_NAMES, rng) + + +# ── System-class namers ──────────────────────────────────────────────────── + + +# logrotate skeleton: cron.log, cron.log.1, cron.log.2.gz. No epoch +# suffix — the realism failure today is `cron-1777317867.log`. +_CRON_LOGROTATE: tuple[str, ...] = ( + "/var/log/cron.log", "/var/log/cron.log.1", "/var/log/cron.log.2.gz", +) +_DAEMON_LOGROTATE: tuple[str, ...] = ( + "/var/log/daemon.log", "/var/log/syslog", "/var/log/messages", + "/var/log/auth.log", "/var/log/auth.log.1", +) + + +def _name_log_cron(persona: str, rng: secrets.SystemRandom) -> str: + return rng.choice(_CRON_LOGROTATE) + + +def _name_log_daemon(persona: str, rng: secrets.SystemRandom) -> str: + return rng.choice(_DAEMON_LOGROTATE) + + +def _name_cache_tmp(persona: str, rng: secrets.SystemRandom) -> str: + # mkstemp shape: /tmp/.cache-XXXXXX with random alphanumerics. + # Hidden dot keeps it out of `ls` by default — same as glibc/python. + # Bandit B108 fires on the literal "/tmp/" path; suppressed at the + # site because this is a path we are *generating for a target + # decky*, not a file we are opening on the host. + return f"/tmp/.cache-{_random_token(rng, 6)}" # nosec B108 + + +# ── Email + canary placeholders ──────────────────────────────────────────── +# Email "names" (paths) are produced by the email driver's spool logic, +# not by realism naming. Canary paths are advisory — operators usually +# specify ``placement_path`` directly. Stage 7 of the realism migration +# refines canary placement based on persona + content_class. + + +def _name_email(persona: str, rng: secrets.SystemRandom) -> str: + raise NotImplementedError( + "email paths come from the email driver's spool logic, not " + "realism.naming" + ) + + +def _name_canary(persona: str, rng: secrets.SystemRandom) -> str: + raise NotImplementedError( + "canary placement is set by the canary cultivator (stage 7), " + "not realism.naming" + ) + + +# ── Dispatch ─────────────────────────────────────────────────────────────── + + +_NAMERS: dict[ContentClass, Callable[[str, secrets.SystemRandom], str]] = { + ContentClass.NOTE: _name_note, + ContentClass.TODO: _name_todo, + ContentClass.DRAFT: _name_draft, + ContentClass.SCRIPT: _name_script, + ContentClass.LOG_CRON: _name_log_cron, + ContentClass.LOG_DAEMON: _name_log_daemon, + ContentClass.CACHE_TMP: _name_cache_tmp, + ContentClass.EMAIL: _name_email, + ContentClass.CANARY_AWS_CREDS: _name_canary, + ContentClass.CANARY_ENV_FILE: _name_canary, + ContentClass.CANARY_GIT_CONFIG: _name_canary, + ContentClass.CANARY_SSH_KEY: _name_canary, + ContentClass.CANARY_HONEYDOC: _name_canary, + ContentClass.CANARY_HONEYDOC_DOCX: _name_canary, + ContentClass.CANARY_HONEYDOC_PDF: _name_canary, + ContentClass.CANARY_MYSQL_DUMP: _name_canary, +} + + +def make_path( + content_class: ContentClass, + persona: str, + *, + rand: Optional[secrets.SystemRandom] = None, +) -> str: + """Return a plausible absolute container-side path for *content_class*. + + Persona-conditioned for user-classes (``/home//…``). + System-classes ignore persona and pick from a logrotate-shaped + skeleton. Email and canary classes raise — those paths come + from the respective drivers, not from realism naming. + """ + rng = rand or secrets.SystemRandom() + namer = _NAMERS.get(content_class) + if namer is None: + raise KeyError( + f"no namer registered for content_class={content_class!r}" + ) + return namer(persona, rng) diff --git a/decnet/realism/planner.py b/decnet/realism/planner.py index 3d22e92f..1000cdc9 100644 --- a/decnet/realism/planner.py +++ b/decnet/realism/planner.py @@ -1,13 +1,21 @@ """Realism planner — picks the next ``(decky, persona, class, action)`` tuple. -Stage-1 stub: the public signature is in place so the orchestrator -worker (stage 3) can import it, but the body returns ``None`` ("nothing -to do this tick") until stage 3 wires the synthetic_files table and -naming/body generators. +Stage 3: returns ``create``-only plans (the edit branch lands in +stage 3b). Pure-function, deterministic given the same inputs: +caller passes deckies (with personas pre-resolved on each row), +``now``, and an RNG. -The eventual policy lives entirely in :func:`pick`; downstream -consumers should not branch on ``ContentClass`` themselves — let the -planner decide weights and rate-limits in one place. +The persona resolution split — topology-pool vs. global-pool — is +the orchestrator's job, not the planner's. Each decky dict reaching +:func:`pick` carries a ``_realism_personas`` key with the resolved +:class:`~decnet.realism.personas.EmailPersona` list. Keeps the +planner test-isolated and avoids forcing it to know about the +:class:`~decnet.web.db.repository.BaseRepository` / topology pool / +global pool. + +Diurnal gating uses :func:`decnet.realism.diurnal.in_work_hours` per +persona; we filter the (decky, persona) pairs *before* picking, so a +persona outside its window is never considered. """ from __future__ import annotations @@ -15,39 +23,110 @@ import secrets from datetime import datetime from typing import Any, Optional, Sequence -from decnet.realism.taxonomy import Plan +from decnet.realism import bodies, naming +from decnet.realism.diurnal import in_work_hours, sample_mtime +from decnet.realism.personas import EmailPersona +from decnet.realism.taxonomy import ContentClass, Plan + + +# Stage-3 weighted sampling: +# * User content (notes/todo/draft/script) gets the bulk — those are +# the realism win when a persona "looks busy." +# * System content (cron/daemon/cache) is plausible filler. +# * Email + canary are owned by other paths and not picked here. +_USER_CLASS_WEIGHTS: tuple[tuple[ContentClass, int], ...] = ( + (ContentClass.NOTE, 30), + (ContentClass.TODO, 20), + (ContentClass.DRAFT, 15), + (ContentClass.SCRIPT, 10), +) +_SYSTEM_CLASS_WEIGHTS: tuple[tuple[ContentClass, int], ...] = ( + (ContentClass.LOG_CRON, 12), + (ContentClass.LOG_DAEMON, 8), + (ContentClass.CACHE_TMP, 5), +) + + +def _weighted_pick( + weights: tuple[tuple[ContentClass, int], ...], + rng: secrets.SystemRandom, +) -> ContentClass: + total = sum(w for _, w in weights) + target = rng.randint(1, total) + running = 0 + for cls, w in weights: + running += w + if target <= running: + return cls + return weights[-1][0] # unreachable, satisfy mypy + + +def _eligible_pairs( + deckies: Sequence[dict[str, Any]], + now: datetime, +) -> list[tuple[dict[str, Any], EmailPersona]]: + """Cross-product of deckies × resolved personas, diurnal-filtered. + + A decky with no personas (empty ``_realism_personas``) is skipped + entirely; same fail-quiet semantics as the emailgen scheduler. + """ + out: list[tuple[dict[str, Any], EmailPersona]] = [] + for decky in deckies: + personas: list[EmailPersona] = decky.get("_realism_personas") or [] + for persona in personas: + if in_work_hours(persona.active_hours, now): + out.append((decky, persona)) + return out def pick( deckies: Sequence[dict[str, Any]], now: datetime, *, - repo: Any = None, rand: Optional[secrets.SystemRandom] = None, ) -> Optional[Plan]: - """Return the next :class:`Plan` for the orchestrator's tick. + """Return a single :class:`Plan` for the orchestrator's tick. - Stage-1 stub returns ``None`` unconditionally so the orchestrator - can import this function before the real implementation lands. The - full policy (diurnal gate, action distribution 60/30/10 - create/edit/leave, content-class weights, canary rate-limit) lands - in stage 3 of the realism migration. + Stage-3 policy: create-only. Stage 3b extends with the + create/edit/leave roll and the synthetic_files lookup for edits. - Parameters - ---------- - deckies : - Output of :meth:`BaseRepository.list_running_deckies`. Each - entry must carry ``uuid``, ``name``, ``services``, - ``email_personas`` (topology-pool JSON or list). - now : - Tick timestamp. Injected so tests don't need to monkey-patch - :func:`datetime.utcnow`. - repo : - :class:`BaseRepository` for synthetic_files lookup (edit - action). Optional in stage 1; required from stage 3 onward. - rand : - RNG for sampling. Defaults to a fresh - :class:`secrets.SystemRandom`. + Returns ``None`` when no eligible (decky, persona) pair exists — + the orchestrator treats that as "skip this tick" the same way the + pre-realism scheduler did. """ - _ = (deckies, now, repo, rand) # silence unused-arg until stage 3 - return None + rng = rand or secrets.SystemRandom() + + eligible = _eligible_pairs(deckies, now) + if not eligible: + return None + + decky, persona = rng.choice(eligible) + + # User vs system content — biased toward user (realism wins are + # bigger there). Once stage 3b ships edit-in-place, the edit + # branch will reuse the same content_class as the existing row; + # the create branch picks fresh here. + if rng.random() < 0.7: + content_class = _weighted_pick(_USER_CLASS_WEIGHTS, rng) + else: + content_class = _weighted_pick(_SYSTEM_CLASS_WEIGHTS, rng) + + target_path = naming.make_path(content_class, persona.name, rand=rng) + body_hint = bodies.make_body(content_class, persona.name, rand=rng) + mtime = sample_mtime(persona.active_hours, now, rand=rng) + + return Plan( + decky_uuid=decky["uuid"], + decky_name=decky["name"], + persona=persona.name, + content_class=content_class, + action="create", + target_path=target_path, + mtime=mtime, + body_hint=body_hint, + notes=( + f"persona={persona.name}", + f"class={content_class.value}", + f"window={persona.active_hours}", + ), + ) diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 17f73cf1..3cbb3fb3 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -77,6 +77,10 @@ from .orchestrator import ( OrchestratorEvent, OrchestratorEventsResponse, ) +from .realism import ( + SyntheticFile, + SyntheticFilesResponse, +) from .logs import ( Bounty, BountyResponse, @@ -226,6 +230,9 @@ __all__ = [ "OrchestratorEmailsResponse", "OrchestratorEvent", "OrchestratorEventsResponse", + # realism + "SyntheticFile", + "SyntheticFilesResponse", # logs "Bounty", "BountyResponse", diff --git a/decnet/web/db/models/realism.py b/decnet/web/db/models/realism.py new file mode 100644 index 00000000..f7fba84a --- /dev/null +++ b/decnet/web/db/models/realism.py @@ -0,0 +1,72 @@ +"""Realism — synthetic-file state across orchestrator ticks. + +The orchestrator's pre-realism file generator forgot every file the +moment it was planted: each tick wrote a brand-new ``notes-{ts}.txt`` +with a literal unix-epoch suffix. No edits, no rotation, no diurnal +shape — three of the realism failures the migration is fixing. + +:class:`SyntheticFile` is the per-(decky, path) memory that lets the +realism engine read back yesterday's ``TODO.md``, mutate it, write +back the new body, and let the dashboard inspect the lineage. + +Pre-v1: schema lives directly in the SQLModel; no ``_migrate_*`` +helper (per the project's "no new migrations pre-v1" rule — +``feedback_no_new_migrations_prev1.md``). Alembic lands at v1. +""" +from datetime import datetime, timezone +from typing import Any, List +from uuid import uuid4 + +from pydantic import BaseModel +from sqlalchemy import Column, Index, Text, UniqueConstraint +from sqlmodel import Field, SQLModel + + +class SyntheticFile(SQLModel, table=True): + """One realism-planted file on one decky. + + The unique key is ``(decky_uuid, path)`` — there's at most one + realism record per location, even if the planter has rotated the + file (rotation updates ``edit_count`` and ``last_modified``, not + a new row). + + ``last_body`` is capped — large blobs (DOCX/PDF, future canary + artifacts) are truncated at write time. The edit-in-place flow + (stage 3b) only needs the body when the content class supports + body-level mutation (``note``, ``todo``, ``draft``, ``script``), + so storing the canonical bytes for binary blobs would be wasted. + + ``content_hash`` is sha256 of the *body bytes only* — never of + metadata or wrapper headers — so a hash compare is a cheap + "did the body change?" check across edits. + """ + __tablename__ = "synthetic_files" + __table_args__ = ( + UniqueConstraint( + "decky_uuid", "path", name="uq_synthetic_files_decky_path", + ), + Index("ix_synthetic_files_decky_modified", "decky_uuid", "last_modified"), + ) + uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True) + decky_uuid: str = Field(index=True, max_length=64) + path: str = Field(max_length=1024) + persona: str = Field(max_length=128) # EmailPersona.name + content_class: str = Field(max_length=32, index=True) # ContentClass enum value + created_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True, + ) + last_modified: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + ) + edit_count: int = Field(default=0) + content_hash: str = Field(max_length=64) # sha256 hex + last_body: str = Field( + sa_column=Column("last_body", Text, nullable=False, default="") + ) + + +class SyntheticFilesResponse(BaseModel): + total: int + limit: int + offset: int + data: List[dict[str, Any]] diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index d4813742..869bacb1 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -1100,3 +1100,60 @@ class BaseRepository(ABC): this on a periodic tick. """ raise NotImplementedError + + # ------------------------------------------------------------- realism + + async def record_synthetic_file(self, data: dict[str, Any]) -> str: + """Insert a new synthetic_files row, returning its uuid. + + The ``(decky_uuid, path)`` pair has a UNIQUE constraint, so two + creates for the same target raise — callers either use this for + first-time plants and :meth:`update_synthetic_file` for edits, + or wrap in a transaction that catches the conflict. + """ + raise NotImplementedError + + async def update_synthetic_file( + self, uuid: str, data: dict[str, Any], + ) -> None: + """Patch an existing synthetic_files row. + + Used by the realism edit-in-place flow (stage 3b): bumps + ``last_body``, ``content_hash``, ``last_modified``, and + ``edit_count``. No-op when *uuid* doesn't exist (the row may + have been pruned between pick and apply). + """ + raise NotImplementedError + + async def list_synthetic_files( + self, + *, + decky_uuid: Optional[str] = None, + persona: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict[str, Any]]: + """Paginated synthetic_files newest-first. + + Optional filters narrow to one decky and/or one persona, used by + the dashboard's "files this decky has grown" view. + """ + raise NotImplementedError + + async def pick_random_synthetic_file_for_edit( + self, + decky_uuid: str, + *, + max_age_days: int = 30, + ) -> Optional[dict[str, Any]]: + """Return a random eligible synthetic_files row for re-edit. + + "Eligible" = belongs to *decky_uuid*, last_modified within + *max_age_days*, content_class supports body-level mutation + (``note``, ``todo``, ``draft``, ``script``, ``log_*``). + Returns ``None`` when nothing matches. + + Used by the realism planner's ``action="edit"`` branch + (stage 3b). + """ + raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index e0fc56b6..175213d9 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -53,6 +53,7 @@ from decnet.web.db.models import ( TopologyMutation, OrchestratorEmail, OrchestratorEvent, + SyntheticFile, WebhookSubscription, CanaryBlob, CanaryToken, @@ -3330,3 +3331,80 @@ class SQLModelRepository(BaseRepository): deleted += res.rowcount or 0 await session.commit() return deleted + + # ------------------------------------------------------------ realism + + async def record_synthetic_file(self, data: dict[str, Any]) -> str: + async with self._session() as session: + row = SyntheticFile(**data) + session.add(row) + await session.commit() + await session.refresh(row) + return row.uuid + + async def update_synthetic_file( + self, row_uuid: str, data: dict[str, Any], + ) -> None: + async with self._session() as session: + stmt = ( + update(SyntheticFile) + .where(SyntheticFile.uuid == row_uuid) + .values(**data) + ) + await session.execute(stmt) + await session.commit() + + async def list_synthetic_files( + self, + *, + decky_uuid: Optional[str] = None, + persona: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict[str, Any]]: + async with self._session() as session: + stmt = select(SyntheticFile) + if decky_uuid is not None: + stmt = stmt.where(SyntheticFile.decky_uuid == decky_uuid) + if persona is not None: + stmt = stmt.where(SyntheticFile.persona == persona) + stmt = ( + stmt.order_by(desc(SyntheticFile.last_modified)) + .offset(offset) + .limit(limit) + ) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def pick_random_synthetic_file_for_edit( + self, + decky_uuid: str, + *, + max_age_days: int = 30, + ) -> Optional[dict[str, Any]]: + # Editable classes: anything whose body is plain text we can + # mutate idempotently. Binary canary artifacts are out — they + # rotate via a fresh plant, not an edit. + editable = ( + "note", "todo", "draft", "script", "log_cron", "log_daemon", + ) + from datetime import timedelta + cutoff = datetime.now(timezone.utc) - timedelta(days=max_age_days) + async with self._session() as session: + stmt = ( + select(SyntheticFile) + .where( + SyntheticFile.decky_uuid == decky_uuid, + SyntheticFile.content_class.in_(editable), # type: ignore[attr-defined] + SyntheticFile.last_modified >= cutoff, + ) + # SQLite + MySQL both support func.random() / RAND() — + # SQLAlchemy's func.random() compiles per-dialect. + .order_by(func.random()) + .limit(1) + ) + result = await session.execute(stmt) + row = result.scalars().first() + if row is None: + return None + return row.model_dump(mode="json") diff --git a/tests/orchestrator/test_scheduler.py b/tests/orchestrator/test_scheduler.py index 1ef34c3a..607c9d35 100644 --- a/tests/orchestrator/test_scheduler.py +++ b/tests/orchestrator/test_scheduler.py @@ -1,60 +1,197 @@ -"""Picker policy tests for the orchestrator scheduler.""" +"""Picker policy tests for the orchestrator scheduler. + +Stage-3 realism split: + +* :func:`scheduler.pick` is now traffic-only — sync, returns + :class:`TrafficAction` or ``None``. +* :func:`scheduler.pick_file` is async, takes a repo (for persona + resolution), and returns a :class:`FileAction` driven by + :func:`decnet.realism.planner.pick`. + +Pre-realism behavior (one ``pick()`` returning either kind) is gone; +the orchestrator worker rolls per tick. +""" from __future__ import annotations -import secrets +from datetime import datetime, timezone import pytest from decnet.orchestrator import scheduler -def _decky(uuid: str, name: str, ip: str | None, services: list[str] | str): - return {"uuid": uuid, "name": name, "ip": ip, "services": services} +def _decky( + uuid: str = "u1", + name: str = "decky-01", + ip: str | None = "10.0.0.1", + services: list[str] | str = ("ssh",), + *, + source: str = "topology", + topology_id: str | None = "t1", +) -> dict: + return { + "uuid": uuid, + "name": name, + "ip": ip, + "services": list(services) if not isinstance(services, str) else services, + "source": source, + "topology_id": topology_id, + } + + +# --------------------------------------------------------------------------- +# Sync pick() — traffic only. +# --------------------------------------------------------------------------- def test_pick_returns_none_when_no_ssh_deckies(): deckies = [ - _decky("u1", "decky-01", "10.0.0.1", ["http"]), - _decky("u2", "decky-02", "10.0.0.2", ["smb"]), + _decky("u1", services=["http"]), + _decky("u2", services=["smb"]), ] assert scheduler.pick(deckies) is None +def test_pick_returns_none_with_single_ssh_decky(): + # Traffic needs a pair; one decky alone can't generate inter-decky + # SSH probes. Realism file actions reach this single decky via the + # async pick_file() entry point instead. + deckies = [_decky()] + assert scheduler.pick(deckies) is None + + def test_pick_returns_none_when_ssh_decky_has_no_ip(): - deckies = [_decky("u1", "decky-01", None, ["ssh"])] + deckies = [_decky(ip=None)] assert scheduler.pick(deckies) is None -def test_pick_file_action_with_single_ssh_decky(): - deckies = [_decky("u1", "decky-01", "10.0.0.1", ["ssh"])] - rng = secrets.SystemRandom() - rng.seed = lambda *_: None # SystemRandom doesn't seed; ignore - action = scheduler.pick(deckies, rand=rng) - assert isinstance(action, scheduler.FileAction) - assert action.dst_uuid == "u1" - assert action.path.startswith("/") - assert action.content - - -def test_pick_traffic_or_file_with_two_ssh_deckies(): +def test_pick_traffic_with_two_ssh_deckies(): deckies = [ _decky("u1", "decky-01", "10.0.0.1", ["ssh"]), _decky("u2", "decky-02", "10.0.0.2", ["ssh"]), ] - seen_kinds: set[str] = set() - # 50/50 split — 40 trials makes both kinds essentially certain - for _ in range(40): + for _ in range(20): action = scheduler.pick(deckies) - assert action is not None - seen_kinds.add("traffic" if isinstance(action, scheduler.TrafficAction) else "file") - if isinstance(action, scheduler.TrafficAction): - assert action.src_uuid != action.dst_uuid - assert action.dst_ip in {"10.0.0.1", "10.0.0.2"} - assert action.protocol == "ssh" - assert seen_kinds == {"traffic", "file"} + assert isinstance(action, scheduler.TrafficAction) + assert action.src_uuid != action.dst_uuid + assert action.dst_ip in {"10.0.0.1", "10.0.0.2"} + assert action.protocol == "ssh" def test_pick_skips_non_deserialised_services(): """If services is still a JSON string (defensive), the decky is excluded.""" - deckies = [_decky("u1", "decky-01", "10.0.0.1", '["ssh"]')] + deckies = [_decky(services='["ssh"]')] assert scheduler.pick(deckies) is None + + +# --------------------------------------------------------------------------- +# Async pick_file() — realism-driven file actions. +# --------------------------------------------------------------------------- + + +_PERSONAS_TWO = [ + { + "name": "admin", + "email": "admin@corp.com", + "role": "ops", + "tone": "direct", + "mannerisms": [], + "active_hours": "00:00-00:00", # always-on for predictability + }, + { + "name": "ubuntu", + "email": "ubuntu@corp.com", + "role": "service", + "tone": "casual", + "mannerisms": [], + "active_hours": "00:00-00:00", + }, +] + + +class _FakeRepo: + """Minimal repo with just the methods scheduler.pick_file needs.""" + + def __init__(self, *, topologies=None, fleet_pool=None): + self._topologies = topologies or {} + # Fleet/global pool gets read via realism.personas_pool.load(); + # the test pins the pool path via env in fleet-source tests. + + async def get_topology(self, topology_id): + return self._topologies.get(topology_id) + + +def _topology_row(personas): + import json + return { + "id": "t1", + "email_personas": json.dumps(personas), + "language_default": "en", + } + + +@pytest.mark.asyncio +async def test_pick_file_returns_none_when_no_ssh_deckies(): + repo = _FakeRepo(topologies={"t1": _topology_row(_PERSONAS_TWO)}) + deckies = [_decky(services=["http"])] + assert await scheduler.pick_file(deckies, repo) is None + + +@pytest.mark.asyncio +async def test_pick_file_returns_none_when_topology_has_no_personas(): + repo = _FakeRepo(topologies={"t1": _topology_row([])}) + deckies = [_decky()] + assert await scheduler.pick_file(deckies, repo) is None + + +@pytest.mark.asyncio +async def test_pick_file_produces_file_action_for_topology_decky(): + repo = _FakeRepo(topologies={"t1": _topology_row(_PERSONAS_TWO)}) + deckies = [_decky()] + action = await scheduler.pick_file( + deckies, repo, + now=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc), + ) + assert isinstance(action, scheduler.FileAction) + assert action.dst_uuid == "u1" + assert action.persona in {"admin", "ubuntu"} + assert action.path.startswith("/") + assert action.content + assert action.mtime is not None + # mtime must be in the past (the realism failure today is + # wall-clock-now stamps). + assert action.mtime < datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc) + + +@pytest.mark.asyncio +async def test_pick_file_skips_decky_when_personas_outside_window(): + out_of_hours = [{**p, "active_hours": "01:00-02:00"} for p in _PERSONAS_TWO] + repo = _FakeRepo(topologies={"t1": _topology_row(out_of_hours)}) + deckies = [_decky()] + action = await scheduler.pick_file( + deckies, repo, + now=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc), + ) + assert action is None + + +@pytest.mark.asyncio +async def test_pick_file_uses_global_pool_for_fleet_source(tmp_path, monkeypatch): + import json + pool = tmp_path / "personas.json" + pool.write_text(json.dumps(_PERSONAS_TWO)) + monkeypatch.setenv("DECNET_REALISM_PERSONAS", str(pool)) + + # Reset the global cache so the new pool path takes effect. + from decnet.realism import personas_pool + personas_pool.reset_cache() + + repo = _FakeRepo() # no topology rows — fleet path + deckies = [_decky(source="fleet", topology_id=None)] + + action = await scheduler.pick_file( + deckies, repo, + now=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc), + ) + assert isinstance(action, scheduler.FileAction) + assert action.dst_uuid == "u1" diff --git a/tests/realism/test_bodies.py b/tests/realism/test_bodies.py new file mode 100644 index 00000000..52e26f6f --- /dev/null +++ b/tests/realism/test_bodies.py @@ -0,0 +1,68 @@ +"""Body templates produce realistic, non-empty output per content class.""" +from __future__ import annotations + +import secrets + +import pytest + +from decnet.realism.bodies import make_body +from decnet.realism.taxonomy import ContentClass + + +_INERT_CLASSES = ( + ContentClass.NOTE, + ContentClass.TODO, + ContentClass.DRAFT, + ContentClass.SCRIPT, + ContentClass.LOG_CRON, + ContentClass.LOG_DAEMON, + ContentClass.CACHE_TMP, +) + + +@pytest.mark.parametrize("cls", _INERT_CLASSES) +def test_body_is_nonempty(cls: ContentClass) -> None: + body = make_body(cls, "admin", rand=secrets.SystemRandom()) + assert isinstance(body, str) + assert body.strip() + + +def test_todo_body_uses_checkbox_markdown() -> None: + body = make_body(ContentClass.TODO, "admin") + # Each line should look like a markdown checkbox; we don't pin the + # exact distribution because the % checked is randomised. + for line in body.strip().splitlines(): + assert line.startswith("- [") + + +def test_script_body_starts_with_shebang() -> None: + seen_shebangs: set[str] = set() + rng = secrets.SystemRandom() + for _ in range(20): + body = make_body(ContentClass.SCRIPT, "admin", rand=rng) + assert body.startswith("#!") + seen_shebangs.add(body.splitlines()[0]) + # We should pick from at least two interpreter shebangs across 20 + # trials; if not, the template list collapsed. + assert len(seen_shebangs) >= 2 + + +def test_log_cron_body_has_cron_syslog_shape() -> None: + body = make_body(ContentClass.LOG_CRON, "admin", rand=secrets.SystemRandom()) + for line in body.strip().splitlines(): + assert "CRON[" in line + assert "CMD (" in line + + +@pytest.mark.parametrize( + "cls", + [c for c in ContentClass if c.value.startswith("canary_")], +) +def test_canary_classes_raise_in_bodies(cls: ContentClass) -> None: + with pytest.raises(NotImplementedError, match="canary"): + make_body(cls, "admin") + + +def test_email_class_raises_in_bodies() -> None: + with pytest.raises(NotImplementedError, match="email"): + make_body(ContentClass.EMAIL, "admin") diff --git a/tests/realism/test_naming.py b/tests/realism/test_naming.py new file mode 100644 index 00000000..9a67dde9 --- /dev/null +++ b/tests/realism/test_naming.py @@ -0,0 +1,95 @@ +"""Filename realism contracts. + +The pre-realism orchestrator emitted ``notes-1777315854.txt`` — +unix-epoch suffix, instant tell. This file pins the anti-regression: +no namer is allowed to drop a raw decimal timestamp into a filename. +""" +from __future__ import annotations + +import re +import secrets + +import pytest + +from decnet.realism.naming import make_path +from decnet.realism.taxonomy import ContentClass + + +_USER_CLASSES = ( + ContentClass.NOTE, + ContentClass.TODO, + ContentClass.DRAFT, + ContentClass.SCRIPT, +) +_SYSTEM_CLASSES = ( + ContentClass.LOG_CRON, + ContentClass.LOG_DAEMON, + ContentClass.CACHE_TMP, +) + + +@pytest.mark.parametrize("cls", _USER_CLASSES) +def test_user_class_paths_live_under_persona_home(cls: ContentClass) -> None: + p = make_path(cls, "admin", rand=secrets.SystemRandom()) + assert p.startswith("/home/admin/"), p + + +@pytest.mark.parametrize("cls", _SYSTEM_CLASSES) +def test_system_class_paths_have_no_epoch_suffix(cls: ContentClass) -> None: + rng = secrets.SystemRandom() + for _ in range(20): + p = make_path(cls, "admin", rand=rng) + # The realism failure today: filenames carry raw unix epochs. + # 8+ consecutive digits in the basename is the tell. + basename = p.rsplit("/", 1)[-1] + assert not re.search(r"\d{8,}", basename), ( + f"epoch-shaped suffix found in {p!r}" + ) + + +def test_log_cron_uses_logrotate_skeleton() -> None: + seen: set[str] = set() + rng = secrets.SystemRandom() + for _ in range(40): + seen.add(make_path(ContentClass.LOG_CRON, "admin", rand=rng)) + # Real cron only ever writes a fixed set of names; anything outside + # the logrotate cycle is a realism bug. + expected = {"/var/log/cron.log", "/var/log/cron.log.1", "/var/log/cron.log.2.gz"} + assert seen <= expected + # And we should see at least the canonical name across 40 trials. + assert "/var/log/cron.log" in seen + + +def test_cache_tmp_uses_mkstemp_shape() -> None: + p = make_path(ContentClass.CACHE_TMP, "admin") + assert re.match(r"^/tmp/\.cache-[a-z0-9]{6}$", p), p + + +@pytest.mark.parametrize( + "cls", + [c for c in ContentClass if c.value.startswith("canary_")], +) +def test_canary_classes_raise_in_naming(cls: ContentClass) -> None: + with pytest.raises(NotImplementedError, match="canary"): + make_path(cls, "admin") + + +def test_email_class_raises_in_naming() -> None: + with pytest.raises(NotImplementedError, match="email"): + make_path(ContentClass.EMAIL, "admin") + + +def test_persona_with_spaces_normalises_to_login() -> None: + # "John Smith" → "johnsmith" is a plausible login, so the namer + # collapses spaces rather than falling back. This pins that + # behaviour against a future overcorrection. + p = make_path(ContentClass.NOTE, "John Smith") + assert p.startswith("/home/johnsmith/") + + +def test_persona_with_punctuation_falls_back_to_user_home() -> None: + # A persona name with punctuation (or non-ASCII letters) can't + # cleanly become a username; the namer must fall back to + # /home/user rather than leak weird chars onto the filesystem. + p = make_path(ContentClass.NOTE, "C-3PO!") + assert p.startswith("/home/user/") diff --git a/tests/realism/test_planner.py b/tests/realism/test_planner.py new file mode 100644 index 00000000..c55e3926 --- /dev/null +++ b/tests/realism/test_planner.py @@ -0,0 +1,101 @@ +"""Realism planner — picks (decky, persona, class, action, mtime). + +Stage 3 ships create-only plans; the edit branch lands in 3b. Tests +pin the diurnal gate, the eligibility filter, and the create +contract. +""" +from __future__ import annotations + +import random +from datetime import datetime, timezone + +import pytest + +from decnet.realism.personas import EmailPersona +from decnet.realism.planner import pick +from decnet.realism.taxonomy import ContentClass + + +def _persona(name: str = "admin", window: str = "00:00-00:00") -> EmailPersona: + return EmailPersona( + name=name, + email=f"{name}@corp.com", + role="ops", + tone="direct", + active_hours=window, + ) + + +def _decky(uuid: str = "u1", name: str = "decky-01", personas=None) -> dict: + return { + "uuid": uuid, + "name": name, + "_realism_personas": personas or [_persona()], + } + + +_NOW = datetime(2026, 4, 27, 14, 0, tzinfo=timezone.utc) + + +def test_pick_returns_none_when_no_deckies() -> None: + assert pick([], _NOW) is None + + +def test_pick_returns_none_when_decky_has_no_personas() -> None: + assert pick([{"uuid": "u1", "name": "d", "_realism_personas": []}], _NOW) is None + + +def test_pick_filters_personas_outside_window() -> None: + # A persona pegged to 01:00-02:00 with now=14:00 must not be picked. + out_of_hours = _persona(window="01:00-02:00") + deckies = [_decky(personas=[out_of_hours])] + assert pick(deckies, _NOW) is None + + +def test_pick_returns_create_plan_with_mtime_in_past() -> None: + deckies = [_decky()] + plan = pick(deckies, _NOW, rand=random.Random(0)) + assert plan is not None + assert plan.action == "create" + assert plan.decky_uuid == "u1" + assert plan.persona == "admin" + assert plan.target_path.startswith("/") + assert plan.body_hint + assert plan.mtime < _NOW + + +def test_pick_distributes_across_user_and_system_classes() -> None: + deckies = [_decky()] + seen: set[ContentClass] = set() + for seed in range(80): + plan = pick(deckies, _NOW, rand=random.Random(seed)) + if plan is not None: + seen.add(plan.content_class) + # Across 80 seeds we should hit both buckets — at least one user + # class and at least one system class — otherwise the weights or + # the 70/30 split is broken. + user_classes = {c for c in seen if c.is_user_class()} + system_classes = {c for c in seen if c.is_system_class()} + assert user_classes, f"no user-class plans in 80 trials: {seen}" + assert system_classes, f"no system-class plans in 80 trials: {seen}" + + +def test_pick_never_returns_canary_class_in_stage3() -> None: + deckies = [_decky()] + for seed in range(40): + plan = pick(deckies, _NOW, rand=random.Random(seed)) + if plan is None: + continue + assert not plan.content_class.is_canary(), ( + "canary class slipped into the realism planner; cultivator " + "lands in stage 7" + ) + + +def test_pick_persists_persona_window_in_notes() -> None: + plan = pick([_decky()], _NOW, rand=random.Random(0)) + assert plan is not None + # The plan's notes carry the persona name and the window — useful + # for the dashboard's "why this file" inspector. + assert any("persona=admin" in n for n in plan.notes) + assert any("window=" in n for n in plan.notes) diff --git a/tests/realism/test_synthetic_files_repo.py b/tests/realism/test_synthetic_files_repo.py new file mode 100644 index 00000000..9b61b4b3 --- /dev/null +++ b/tests/realism/test_synthetic_files_repo.py @@ -0,0 +1,116 @@ +"""record / update / list / pick-for-edit on the synthetic_files table. + +Stage 3 of the realism migration introduces the synthetic_files +table for per-(decky, path) state. Tests pin the contract on a +real :class:`SQLiteRepository` so SQLModel schema bugs surface here +rather than in production. +""" +from __future__ import annotations + +import hashlib +from datetime import datetime, timedelta, timezone + +import pytest +import pytest_asyncio + +from decnet.web.db.sqlite.repository import SQLiteRepository + + +@pytest_asyncio.fixture +async def repo(tmp_path): + r = SQLiteRepository(db_path=str(tmp_path / "decnet.db")) + await r.initialize() + yield r + await r.engine.dispose() + + +def _row( + decky: str = "d1", + path: str = "/home/admin/TODO.md", + persona: str = "admin", + cls: str = "todo", + body: str = "- [ ] rotate keys\n", + ts: datetime | None = None, +) -> dict: + now = ts or datetime.now(timezone.utc) + return { + "decky_uuid": decky, + "path": path, + "persona": persona, + "content_class": cls, + "created_at": now, + "last_modified": now, + "edit_count": 0, + "content_hash": hashlib.sha256(body.encode()).hexdigest(), + "last_body": body, + } + + +@pytest.mark.asyncio +async def test_record_returns_uuid(repo): + uuid = await repo.record_synthetic_file(_row()) + assert isinstance(uuid, str) and uuid + + +@pytest.mark.asyncio +async def test_unique_constraint_on_decky_path(repo): + await repo.record_synthetic_file(_row()) + with pytest.raises(Exception): + await repo.record_synthetic_file(_row()) + + +@pytest.mark.asyncio +async def test_update_synthetic_file_patches_fields(repo): + uuid = await repo.record_synthetic_file(_row()) + await repo.update_synthetic_file( + uuid, + {"edit_count": 1, "last_body": "- [x] rotate keys\n"}, + ) + listing = await repo.list_synthetic_files(decky_uuid="d1") + assert len(listing) == 1 + assert listing[0]["edit_count"] == 1 + assert listing[0]["last_body"].startswith("- [x]") + + +@pytest.mark.asyncio +async def test_list_filters_by_decky_and_persona(repo): + await repo.record_synthetic_file(_row(decky="d1", path="/a")) + await repo.record_synthetic_file(_row(decky="d1", path="/b", persona="ubuntu")) + await repo.record_synthetic_file(_row(decky="d2", path="/c")) + + by_decky = await repo.list_synthetic_files(decky_uuid="d1") + assert {r["path"] for r in by_decky} == {"/a", "/b"} + + by_persona = await repo.list_synthetic_files(decky_uuid="d1", persona="ubuntu") + assert {r["path"] for r in by_persona} == {"/b"} + + +@pytest.mark.asyncio +async def test_pick_random_returns_none_when_empty(repo): + assert await repo.pick_random_synthetic_file_for_edit("d-empty") is None + + +@pytest.mark.asyncio +async def test_pick_random_excludes_canary_classes(repo): + # Canary-class files are stored on the same table (stage 7) but + # the editor must skip them — their bodies are binary blobs. + await repo.record_synthetic_file(_row(cls="canary_aws_creds")) + picked = await repo.pick_random_synthetic_file_for_edit("d1") + assert picked is None + + +@pytest.mark.asyncio +async def test_pick_random_excludes_too_old_rows(repo): + old = datetime.now(timezone.utc) - timedelta(days=120) + await repo.record_synthetic_file(_row(ts=old)) + picked = await repo.pick_random_synthetic_file_for_edit("d1", max_age_days=30) + assert picked is None + + +@pytest.mark.asyncio +async def test_pick_random_returns_eligible_row(repo): + await repo.record_synthetic_file(_row(cls="todo")) + picked = await repo.pick_random_synthetic_file_for_edit("d1") + assert picked is not None + assert picked["content_class"] == "todo" + assert picked["path"] == "/home/admin/TODO.md"