Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
431 lines
16 KiB
Python
431 lines
16 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Per-content-class body generators (deterministic templates).
|
|
|
|
Stage 3 of the realism migration ships deterministic per-class
|
|
templates — varied enough that two notes on the same decky aren't
|
|
identical, formulaic enough that system-class files (cron logs,
|
|
journal entries) look like cron actually wrote them.
|
|
|
|
Stage 6 wires LLM enrichment for user-classes; the templates here
|
|
remain the fallback path so the orchestrator tick never blocks on
|
|
Ollama.
|
|
|
|
Determinism: every namer/body takes a :class:`SystemRandom` (from
|
|
:mod:`secrets`). Tests pin the RNG seed for reproducibility; the
|
|
orchestrator passes a fresh RNG per tick so production picks are
|
|
unpredictable.
|
|
|
|
The factory mirrors :mod:`decnet.realism.naming`: caller passes a
|
|
:class:`~decnet.realism.taxonomy.ContentClass`; we return the body
|
|
generator registered for it. Email + canary classes raise —
|
|
those bodies come from the email driver and canary cultivator
|
|
respectively, not from realism.bodies.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import secrets
|
|
from datetime import datetime, timezone
|
|
from typing import TYPE_CHECKING, Callable, Optional
|
|
|
|
from decnet.logging import get_logger
|
|
from decnet.realism.taxonomy import ContentClass
|
|
|
|
if TYPE_CHECKING:
|
|
from decnet.realism.personas import EmailPersona
|
|
|
|
log = get_logger("realism.bodies")
|
|
|
|
|
|
# ── User-class body generators ─────────────────────────────────────────────
|
|
|
|
|
|
_NOTE_TEMPLATES: tuple[str, ...] = (
|
|
"follow up with the team on this",
|
|
"remember to ping the on-call",
|
|
"ask about the staging migration timeline",
|
|
"double-check the runbook before next shift",
|
|
"todo: rotate keys; check on backup task",
|
|
"meeting notes from yesterday — copy onto wiki when free",
|
|
"this is broken in prod; talk to ops monday",
|
|
"draft response to the auditor — keep it short",
|
|
)
|
|
|
|
|
|
def _body_note(persona: str, rng: secrets.SystemRandom) -> str:
|
|
n = rng.randint(2, 5)
|
|
lines = rng.sample(_NOTE_TEMPLATES, k=min(n, len(_NOTE_TEMPLATES)))
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
_TODO_VERBS: tuple[str, ...] = (
|
|
"rotate keys", "review pr",
|
|
"clean up logs", "update docs",
|
|
"follow up on ticket",
|
|
"test backup restore",
|
|
"deploy to staging",
|
|
"ack auditor email",
|
|
"patch CVE backlog",
|
|
)
|
|
|
|
|
|
def _body_todo(persona: str, rng: secrets.SystemRandom) -> str:
|
|
n = rng.randint(3, 7)
|
|
items = rng.sample(_TODO_VERBS, k=min(n, len(_TODO_VERBS)))
|
|
# Roughly a third pre-checked — looks like a list that's been
|
|
# touched at least once.
|
|
out = []
|
|
for item in items:
|
|
marker = "[x]" if rng.random() < 0.33 else "[ ]"
|
|
out.append(f"- {marker} {item}")
|
|
return "\n".join(out) + "\n"
|
|
|
|
|
|
_DRAFT_PARAGRAPHS: tuple[str, ...] = (
|
|
"Hi team,\n\nQuick update on the project. We're tracking ahead of schedule "
|
|
"on the migration but the staging soak revealed a regression in the "
|
|
"auth path. I'll have a fix in by end of week.\n\nThanks,\n",
|
|
"Hi,\n\nFollowing up on yesterday's meeting. Action items below:\n\n"
|
|
"- Engineering owns the deployment plan\n"
|
|
"- Ops will draft the runbook update\n"
|
|
"- We sync again Friday\n\n",
|
|
"All,\n\nProposal attached. Key points:\n\n"
|
|
"1. We are not changing the data model in this release\n"
|
|
"2. The new endpoint is opt-in via feature flag\n"
|
|
"3. Rollback path is one config flip\n\n"
|
|
"Feedback by EOD?\n\n",
|
|
)
|
|
|
|
|
|
def _body_draft(persona: str, rng: secrets.SystemRandom) -> str:
|
|
return rng.choice(_DRAFT_PARAGRAPHS)
|
|
|
|
|
|
_SCRIPT_TEMPLATES: tuple[str, ...] = (
|
|
"#!/usr/bin/env bash\nset -euo pipefail\n\n"
|
|
"BACKUP_DIR=/var/backups\n"
|
|
"STAMP=$(date +%Y%m%d-%H%M)\n"
|
|
"echo \"backup start $STAMP\"\n"
|
|
"tar czf \"$BACKUP_DIR/db-$STAMP.tar.gz\" /var/lib/mysql\n"
|
|
"echo \"backup done\"\n",
|
|
"#!/usr/bin/env bash\nset -e\n\n"
|
|
"# clean up old logs\n"
|
|
"find /var/log -name '*.log.*.gz' -mtime +30 -delete\n",
|
|
"#!/usr/bin/env python3\n\"\"\"Quick fix for the reporting job.\"\"\"\n"
|
|
"import sys\n\n"
|
|
"def main():\n print('todo: real fix here')\n\n"
|
|
"if __name__ == '__main__':\n sys.exit(main())\n",
|
|
)
|
|
|
|
|
|
def _body_script(persona: str, rng: secrets.SystemRandom) -> str:
|
|
return rng.choice(_SCRIPT_TEMPLATES)
|
|
|
|
|
|
# ── System-class body generators ───────────────────────────────────────────
|
|
|
|
|
|
_CRON_COMMANDS: tuple[str, ...] = (
|
|
"(root) CMD (run-parts /etc/cron.daily)",
|
|
"(root) CMD (run-parts /etc/cron.hourly)",
|
|
"(www-data) CMD (cd /var/www && /usr/bin/php artisan schedule:run)",
|
|
"(backup) CMD (/usr/local/bin/backup.sh)",
|
|
"(root) CMD (test -x /usr/sbin/anacron || ( cd / && run-parts --report /etc/cron.daily ))",
|
|
)
|
|
|
|
|
|
def _body_log_cron(persona: str, rng: secrets.SystemRandom) -> str:
|
|
n = rng.randint(8, 24)
|
|
base = datetime.now(timezone.utc)
|
|
lines = []
|
|
for i in range(n):
|
|
hour = (base.hour - i) % 24
|
|
minute = rng.randint(0, 59)
|
|
pid = rng.randint(1000, 99999)
|
|
cmd = rng.choice(_CRON_COMMANDS)
|
|
# ISO-ish "Apr 27 09:13:44 host CRON[1234]: ..." cron syslog shape.
|
|
date_s = base.strftime("%b %d")
|
|
lines.append(
|
|
f"{date_s} {hour:02d}:{minute:02d}:{rng.randint(0,59):02d} "
|
|
f"hostname CRON[{pid}]: {cmd}"
|
|
)
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
_DAEMON_LINES: tuple[str, ...] = (
|
|
"systemd[1]: Started Daily apt download activities.",
|
|
"systemd[1]: apt-daily.service: Succeeded.",
|
|
"systemd[1]: Reached target Multi-User System.",
|
|
"kernel: [UFW BLOCK] IN=eth0 OUT= MAC=…",
|
|
"sshd[2103]: pam_unix(sshd:session): session opened for user admin by (uid=0)",
|
|
"sshd[2103]: Received disconnect from 10.0.0.4 port 47282:11: disconnected by user",
|
|
"CRON[1894]: pam_unix(cron:session): session closed for user root",
|
|
)
|
|
|
|
|
|
def _body_log_daemon(persona: str, rng: secrets.SystemRandom) -> str:
|
|
n = rng.randint(10, 30)
|
|
lines = []
|
|
base = datetime.now(timezone.utc)
|
|
for _ in range(n):
|
|
lines.append(
|
|
f"{base.strftime('%b %d %H:%M:%S')} hostname "
|
|
f"{rng.choice(_DAEMON_LINES)}"
|
|
)
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def _body_cache_tmp(persona: str, rng: secrets.SystemRandom) -> str:
|
|
# ~64-256 bytes of opaque session-ish payload — most /tmp/.cache-*
|
|
# files in the wild are short binary or k=v dumps. We emit ASCII
|
|
# so docker exec write paths don't need binary-safety acrobatics.
|
|
nbytes = rng.randint(64, 256)
|
|
chars = "abcdefghijklmnopqrstuvwxyz0123456789"
|
|
return "session=" + "".join(rng.choice(chars) for _ in range(nbytes)) + "\n"
|
|
|
|
|
|
def _body_email(persona: str, rng: secrets.SystemRandom) -> str:
|
|
raise NotImplementedError(
|
|
"email bodies come from the email driver, not realism.bodies"
|
|
)
|
|
|
|
|
|
def _body_canary(persona: str, rng: secrets.SystemRandom) -> str:
|
|
raise NotImplementedError(
|
|
"canary bodies come from the canary cultivator (stage 7), "
|
|
"not realism.bodies"
|
|
)
|
|
|
|
|
|
# ── Dispatch ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
_BODIES: dict[ContentClass, Callable[[str, secrets.SystemRandom], str]] = {
|
|
ContentClass.NOTE: _body_note,
|
|
ContentClass.TODO: _body_todo,
|
|
ContentClass.DRAFT: _body_draft,
|
|
ContentClass.SCRIPT: _body_script,
|
|
ContentClass.LOG_CRON: _body_log_cron,
|
|
ContentClass.LOG_DAEMON: _body_log_daemon,
|
|
ContentClass.CACHE_TMP: _body_cache_tmp,
|
|
ContentClass.EMAIL: _body_email,
|
|
# All canary classes share one placeholder — content-class discriminant is the
|
|
# "what"; the real payload (token slug, DNS hook URL) is injected by the canary
|
|
# cultivator. Do not replace with distinct generators without updating cultivator.
|
|
ContentClass.CANARY_AWS_CREDS: _body_canary,
|
|
ContentClass.CANARY_ENV_FILE: _body_canary,
|
|
ContentClass.CANARY_GIT_CONFIG: _body_canary,
|
|
ContentClass.CANARY_SSH_KEY: _body_canary,
|
|
ContentClass.CANARY_HONEYDOC: _body_canary,
|
|
ContentClass.CANARY_HONEYDOC_DOCX: _body_canary,
|
|
ContentClass.CANARY_HONEYDOC_PDF: _body_canary,
|
|
ContentClass.CANARY_MYSQL_DUMP: _body_canary,
|
|
ContentClass.CANARY_FINGERPRINT_HTML: _body_canary,
|
|
ContentClass.CANARY_FINGERPRINT_SVG: _body_canary,
|
|
}
|
|
|
|
|
|
def make_body(
|
|
content_class: ContentClass,
|
|
persona: str,
|
|
*,
|
|
rand: Optional[secrets.SystemRandom] = None,
|
|
) -> str:
|
|
"""Return deterministic body bytes (utf-8 string) for *content_class*.
|
|
|
|
Stage 3 ships templates only. :func:`make_body_with_llm` is the
|
|
LLM-aware variant added in stage 6 — kept on a separate name so
|
|
the deterministic path stays trivially callable from tests and
|
|
from the LLM fallback itself.
|
|
"""
|
|
rng = rand or secrets.SystemRandom()
|
|
gen = _BODIES.get(content_class)
|
|
if gen is None:
|
|
raise KeyError(
|
|
f"no body generator registered for content_class={content_class!r}"
|
|
)
|
|
return gen(persona, rng)
|
|
|
|
|
|
async def make_body_with_llm(
|
|
content_class: ContentClass,
|
|
persona: "EmailPersona",
|
|
*,
|
|
llm=None, # LLMBackend | None
|
|
breaker=None, # LLMCircuitBreaker | None
|
|
timeout: float = 60.0,
|
|
rand: Optional[secrets.SystemRandom] = None,
|
|
) -> str:
|
|
"""LLM-enriched body for user-classes; deterministic fallback otherwise.
|
|
|
|
Falls back to :func:`make_body` whenever:
|
|
|
|
* ``llm`` is None,
|
|
* ``breaker.allow_call()`` returns False (sustained failure),
|
|
* the LLM call times out or returns empty,
|
|
* the content class isn't a user-class (system-class content
|
|
should look formulaic, so we never invoke LLM there).
|
|
|
|
Em-dash stripping runs on the LLM output as a belt-and-braces
|
|
guard (see :mod:`decnet.realism.prompts._style`). The function
|
|
is async because LLM calls are; the deterministic path returns
|
|
immediately so the orchestrator's tick doesn't pay async overhead
|
|
when LLM is disabled.
|
|
"""
|
|
rng = rand or secrets.SystemRandom()
|
|
|
|
# System / canary / email classes never touch the LLM.
|
|
if not content_class.is_user_class():
|
|
return make_body(content_class, persona.name, rand=rng)
|
|
|
|
if llm is None or (breaker is not None and not breaker.allow_call()):
|
|
return make_body(content_class, persona.name, rand=rng)
|
|
|
|
# Lazy imports keep the prompt + style modules out of the
|
|
# deterministic path's import graph.
|
|
from decnet.realism.llm.base import LLMTimeout
|
|
from decnet.realism.prompts import filebody as _filebody
|
|
from decnet.realism.prompts._style import strip_em_dashes
|
|
|
|
prompt = _filebody.build(content_class, persona)
|
|
try:
|
|
result = await asyncio.wait_for(llm.generate(prompt), timeout=timeout)
|
|
except (LLMTimeout, asyncio.TimeoutError):
|
|
log.debug("realism.bodies LLM timeout class=%s persona=%s",
|
|
content_class.value, persona.name)
|
|
if breaker is not None:
|
|
breaker.record_failure()
|
|
return make_body(content_class, persona.name, rand=rng)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("realism.bodies LLM error class=%s persona=%s: %s",
|
|
content_class.value, persona.name, exc)
|
|
if breaker is not None:
|
|
breaker.record_failure()
|
|
return make_body(content_class, persona.name, rand=rng)
|
|
|
|
if not result.success or not result.text.strip():
|
|
if breaker is not None:
|
|
breaker.record_failure()
|
|
return make_body(content_class, persona.name, rand=rng)
|
|
|
|
if breaker is not None:
|
|
breaker.record_success()
|
|
return strip_em_dashes(result.text.rstrip() + "\n", persona)
|
|
|
|
|
|
# ── Edit-in-place mutators ─────────────────────────────────────────────────
|
|
# Stage 3b: deterministic per-class mutations. The contract: take the
|
|
# previous body bytes, return a plausible *next* iteration (append a
|
|
# line, flip a checkbox, fix a typo). Append-only for logs; small
|
|
# in-place edits for user content. LLM enrichment in stage 6 wires
|
|
# next_iteration to ask "what would <persona> write next" with the
|
|
# previous body in the prompt; the deterministic path stays as the
|
|
# fallback.
|
|
|
|
|
|
def _edit_todo(
|
|
prev: str, persona: str, rng: secrets.SystemRandom,
|
|
) -> str:
|
|
"""Flip an unchecked box, append a new item, or both.
|
|
|
|
Real TODO files evolve: items get checked off as work happens, new
|
|
items get added, occasionally a sub-bullet appears under an
|
|
existing one. We pick one of those mutations per call.
|
|
"""
|
|
lines = prev.splitlines()
|
|
unchecked_indices = [
|
|
i for i, ln in enumerate(lines) if ln.startswith("- [ ]")
|
|
]
|
|
op = rng.choice(("flip", "append", "both") if unchecked_indices else ("append",))
|
|
if op in ("flip", "both") and unchecked_indices:
|
|
idx = rng.choice(unchecked_indices)
|
|
lines[idx] = lines[idx].replace("- [ ]", "- [x]", 1)
|
|
if op in ("append", "both"):
|
|
new_item = rng.choice(_TODO_VERBS)
|
|
marker = "[x]" if rng.random() < 0.15 else "[ ]"
|
|
lines.append(f"- {marker} {new_item}")
|
|
return "\n".join(lines) + ("" if prev.endswith("\n") else "\n")
|
|
|
|
|
|
def _edit_note(
|
|
prev: str, persona: str, rng: secrets.SystemRandom,
|
|
) -> str:
|
|
"""Append one new note line or insert a follow-up under an existing one."""
|
|
new_line = rng.choice(_NOTE_TEMPLATES)
|
|
if prev.endswith("\n"):
|
|
return prev + new_line + "\n"
|
|
return prev + "\n" + new_line + "\n"
|
|
|
|
|
|
def _edit_draft(
|
|
prev: str, persona: str, rng: secrets.SystemRandom,
|
|
) -> str:
|
|
"""Append a new short paragraph to the existing draft."""
|
|
addition = (
|
|
"\nFollow-up: I'll send the deck once finance signs off on the numbers.\n",
|
|
"\nP.S.: Looping in ops on the rollout sequence — they have context I don't.\n",
|
|
"\nLet me know if any of this needs another pass.\n",
|
|
)
|
|
return prev.rstrip() + "\n" + rng.choice(addition)
|
|
|
|
|
|
def _edit_script(
|
|
prev: str, persona: str, rng: secrets.SystemRandom,
|
|
) -> str:
|
|
"""Append a comment line — scripts evolve via comments and small fixes."""
|
|
comments = (
|
|
"# TODO: handle the empty-input case\n",
|
|
"# 2026-04-27: hardened error path after the prod incident\n",
|
|
"# noqa: shellcheck disagrees but this is what the runbook says\n",
|
|
)
|
|
return prev.rstrip() + "\n" + rng.choice(comments)
|
|
|
|
|
|
def _edit_log_cron(
|
|
prev: str, persona: str, rng: secrets.SystemRandom,
|
|
) -> str:
|
|
"""Append one new cron syslog line — logs only ever grow."""
|
|
extra = _body_log_cron(persona, rng)
|
|
return prev.rstrip() + "\n" + extra.splitlines()[-1] + "\n"
|
|
|
|
|
|
def _edit_log_daemon(
|
|
prev: str, persona: str, rng: secrets.SystemRandom,
|
|
) -> str:
|
|
extra = _body_log_daemon(persona, rng)
|
|
return prev.rstrip() + "\n" + extra.splitlines()[-1] + "\n"
|
|
|
|
|
|
_EDITORS: dict[ContentClass, Callable[[str, str, secrets.SystemRandom], str]] = {
|
|
ContentClass.NOTE: _edit_note,
|
|
ContentClass.TODO: _edit_todo,
|
|
ContentClass.DRAFT: _edit_draft,
|
|
ContentClass.SCRIPT: _edit_script,
|
|
ContentClass.LOG_CRON: _edit_log_cron,
|
|
ContentClass.LOG_DAEMON: _edit_log_daemon,
|
|
}
|
|
|
|
|
|
def next_iteration(
|
|
content_class: ContentClass,
|
|
persona: str,
|
|
previous_body: str,
|
|
*,
|
|
rand: Optional[secrets.SystemRandom] = None,
|
|
) -> str:
|
|
"""Return the next-iteration body for an edit-in-place mutation.
|
|
|
|
Raises :class:`KeyError` for content classes that don't support
|
|
editing (canary blobs, cache-tmp scratch files, email). The
|
|
planner filters those out before producing an :class:`EditAction`,
|
|
so reaching this branch with an unsupported class is a bug worth
|
|
surfacing loudly.
|
|
"""
|
|
rng = rand or secrets.SystemRandom()
|
|
editor = _EDITORS.get(content_class)
|
|
if editor is None:
|
|
raise KeyError(
|
|
f"content_class={content_class!r} does not support edits"
|
|
)
|
|
return editor(previous_body, persona, rng)
|