From 32eeb0c813af93b6a86466874f05a1bc65d8dc71 Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 27 Apr 2026 16:33:04 -0400 Subject: [PATCH] refactor(orchestrator): collapse decnet-emailgen.service into orchestrator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 5 of the realism migration. Email generation is no longer a separate worker / systemd unit / CLI subcommand — the orchestrator's single tick loop covers SSH traffic, file plants, and email drops. Going from 21 services to 20. Worker: - _one_tick rolls between traffic / file / email (45/45/10 weights). The 10% email weight at a 60s orchestrator interval produces ~one email per 10 minutes, close to the pre-collapse 5-minute cadence. - get_driver_for(action) (stage 4) handles SSH vs Email dispatch. - Quiet branches fall through so a (decky-set, persona-pool, mail-decky) shape that silences one branch doesn't waste the tick. - Periodic prune covers both orchestrator_events and orchestrator_emails tables. Deletions: - deploy/decnet-emailgen.service.j2 - decnet/orchestrator/emailgen/worker.py - decnet/cli/emailgen.py - tests/orchestrator/emailgen/test_worker_integration.py Renames (history-preserving): - decnet/web/router/emailgen/ -> decnet/web/router/realism/ - tests/api/emailgen/ -> tests/api/realism/ - tests/cli/test_emailgen_* -> tests/cli/test_realism_* Public surface changes (clean break, pre-v1): - API URL /api/v1/emailgen/personas -> /api/v1/realism/personas - CLI `decnet emailgen import-personas` -> `decnet realism import-personas`. `decnet emailgen run` is gone — the orchestrator covers it. - gating.py: emailgen master-only group replaced by realism. - decnet-orchestrator.service.j2: DECNET_REALISM_* env block added. - decnet.target: decnet-emailgen.service entry removed. - frontend: PersonaGeneration.tsx fetches /realism/personas. --- decnet/cli/__init__.py | 4 +- decnet/cli/emailgen.py | 185 --- decnet/cli/gating.py | 2 +- decnet/cli/realism.py | 111 ++ decnet/orchestrator/emailgen/__init__.py | 43 +- decnet/orchestrator/emailgen/worker.py | 131 -- decnet/orchestrator/worker.py | 240 ++-- decnet/web/router/__init__.py | 10 +- .../router/{emailgen => realism}/__init__.py | 0 .../{emailgen => realism}/api_personas.py | 18 +- decnet_web/src/components/MazeNET/MazeNET.tsx | 11 +- .../src/components/PersonaGeneration.css | 384 +----- .../src/components/PersonaGeneration.tsx | 1105 +++++++++++------ .../components/TopologyList/TopologyList.tsx | 10 +- deploy/decnet-emailgen.service.j2 | 54 - deploy/decnet-orchestrator.service.j2 | 9 +- deploy/decnet.target | 3 +- tests/api/{emailgen => realism}/__init__.py | 0 .../test_personas_api.py | 4 +- tests/api/topology/test_personas_api.py | 180 +++ ...ilgen_gating.py => test_realism_gating.py} | 40 +- ...nas.py => test_realism_import_personas.py} | 16 +- .../emailgen/test_worker_integration.py | 141 --- tests/orchestrator/test_worker_integration.py | 30 +- 24 files changed, 1334 insertions(+), 1397 deletions(-) delete mode 100644 decnet/cli/emailgen.py create mode 100644 decnet/cli/realism.py delete mode 100644 decnet/orchestrator/emailgen/worker.py rename decnet/web/router/{emailgen => realism}/__init__.py (100%) rename decnet/web/router/{emailgen => realism}/api_personas.py (91%) delete mode 100644 deploy/decnet-emailgen.service.j2 rename tests/api/{emailgen => realism}/__init__.py (100%) rename tests/api/{emailgen => realism}/test_personas_api.py (97%) create mode 100644 tests/api/topology/test_personas_api.py rename tests/cli/{test_emailgen_gating.py => test_realism_gating.py} (65%) rename tests/cli/{test_emailgen_import_personas.py => test_realism_import_personas.py} (88%) delete mode 100644 tests/orchestrator/emailgen/test_worker_integration.py diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index 26a16dcf..f0508319 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -25,7 +25,6 @@ from . import ( canary, db, deploy, - emailgen, forwarder, geoip, init, @@ -34,6 +33,7 @@ from . import ( listener, orchestrator, profiler, + realism, reconciler, sniffer, swarm, @@ -58,7 +58,7 @@ for _mod in ( api, swarmctl, agent, updater, listener, forwarder, swarm, deploy, lifecycle, workers, inventory, - web, profiler, orchestrator, emailgen, reconciler, sniffer, db, + web, profiler, orchestrator, realism, reconciler, sniffer, db, topology, bus, geoip, init, webhook, canary, ): _mod.register(app) diff --git a/decnet/cli/emailgen.py b/decnet/cli/emailgen.py deleted file mode 100644 index 7d4efa92..00000000 --- a/decnet/cli/emailgen.py +++ /dev/null @@ -1,185 +0,0 @@ -"""``decnet emailgen ...`` — orchestrator-sibling email generator. - -Sub-commands: - -* ``decnet emailgen run`` — start the long-running worker - (default when invoked with no sub-command, so the historical - ``decnet emailgen`` invocation still works). -* ``decnet emailgen import-personas`` — validate a JSON file and - install it as the host-wide global persona pool consumed by fleet - (MACVLAN/IPVLAN) and SWARM-shard mail deckies. - -The worker itself stays in :mod:`decnet.orchestrator.emailgen.worker`; -this module only owns the CLI surface. -""" -from __future__ import annotations - -import json -import os -from pathlib import Path -from typing import Optional - -import typer - -from . import utils as _utils -from .gating import _require_master_mode -from .utils import console, log - - -def register(app: typer.Typer) -> None: - emailgen_app = typer.Typer( - name="emailgen", - help=( - "Drip persona-driven fake corporate email into running " - "IMAP/POP3 mail deckies." - ), - invoke_without_command=True, - no_args_is_help=False, - ) - app.add_typer(emailgen_app, name="emailgen") - - @emailgen_app.callback() - def _default(ctx: typer.Context) -> None: - # Calling ``decnet emailgen`` with no sub-command defers to ``run`` - # so the documented (and shipped) invocation stays valid. - if ctx.invoked_subcommand is None: - ctx.invoke(emailgen_run) - - @emailgen_app.command("run") - def emailgen_run( - interval: int = typer.Option( - 300, "--interval", "-i", - help="Seconds between fake-email generation ticks (default 5m)", - ), - daemon: bool = typer.Option( - False, "--daemon", "-d", - help="Detach to background as a daemon process", - ), - model: str = typer.Option( - "", "--model", "-m", - help="Ollama model override (defaults to $DECNET_EMAILGEN_MODEL " - "or 'llama3.1')", - ), - ) -> None: - """Start the long-running email-generation worker.""" - # Defence-in-depth: the registration-time gate already hides - # ``emailgen`` from Typer when DECNET_MODE=agent, but a direct - # callable import would bypass that — block here too. - _require_master_mode("emailgen run") - import asyncio - from decnet.orchestrator.emailgen import emailgen_worker - from decnet.web.dependencies import repo - - if daemon: - log.info("emailgen daemonizing interval=%d", interval) - _utils._daemonize() - - # Honour the env var when the flag was left empty so systemd unit - # files can configure the model centrally without per-host CLI - # tweaks. Empty -> let the worker apply its own default. - resolved_model = model or os.environ.get("DECNET_EMAILGEN_MODEL", "") - log.info( - "emailgen starting interval=%d model=%s", - interval, resolved_model or "default", - ) - console.print( - f"[bold cyan]Emailgen starting[/] (interval: {interval}s" - f"{', model: ' + resolved_model if resolved_model else ''})" - ) - - async def _run() -> None: - await repo.initialize() - await emailgen_worker( - repo, interval=interval, model=resolved_model or None, - ) - - try: - asyncio.run(_run()) - except KeyboardInterrupt: - console.print("\n[yellow]Emailgen stopped.[/]") - - @emailgen_app.command("import-personas") - def emailgen_import_personas( - path: Path = typer.Argument( - ..., exists=True, file_okay=True, dir_okay=False, readable=True, - help="JSON file containing a list of EmailPersona objects", - ), - output: Optional[Path] = typer.Option( - None, "--output", "-o", - help=( - "Override the destination path. Defaults to the canonical " - "global pool (DECNET_EMAILGEN_PERSONAS, /etc/decnet/" - "email_personas.json, or ~/.decnet/email_personas.json)." - ), - ), - ) -> None: - """Validate + install a personas JSON file as the global pool. - - Use this when deploying with IMAP/POP3 services on fleet - (MACVLAN/IPVLAN) or SWARM-shard mail deckies — those have no - parent topology row, so they read this host-wide list. MazeNET - topology mail deckies use ``Topology.email_personas`` instead and - this command does not touch them. - """ - _require_master_mode("emailgen import-personas") - from decnet.realism import personas_pool as global_pool - from decnet.realism.personas import parse_personas - - try: - raw = path.read_text(encoding="utf-8") - except OSError as exc: - console.print(f"[red]Cannot read {path}:[/] {exc}") - raise typer.Exit(code=1) from exc - - # Validate by parsing — we want operators to find out about - # broken personas at import time, not at the next worker tick. - try: - payload = json.loads(raw) - except json.JSONDecodeError as exc: - console.print(f"[red]Invalid JSON in {path}:[/] {exc}") - raise typer.Exit(code=1) from exc - if not isinstance(payload, list): - console.print( - f"[red]{path} must contain a JSON list of personas, " - f"got {type(payload).__name__}[/]" - ) - raise typer.Exit(code=1) - - personas = parse_personas(payload) - if not personas: - console.print( - f"[red]No valid personas in {path}.[/] " - "Check the schema (name, email, role, tone, mannerisms)." - ) - raise typer.Exit(code=1) - if len(personas) < 2: - console.print( - f"[yellow]Warning: only {len(personas)} valid persona(s) — " - "the worker requires at least 2 to send mail; importing " - "anyway in case more are added later.[/]" - ) - - dest = output or global_pool.resolve_path() - dest.parent.mkdir(parents=True, exist_ok=True) - # Re-serialise from the parsed-and-validated objects rather than - # copying the source file: drops invalid entries, normalises - # whitespace, and gives operators a single canonical layout to - # eyeball after the import. - dest.write_text( - json.dumps( - [p.model_dump(exclude_none=False) for p in personas], - indent=2, - ensure_ascii=False, - ), - encoding="utf-8", - ) - # Cache invalidation happens automatically on next ``load()`` - # via the mtime check, but reset the in-process cache too in - # case the CLI process is the same as the worker (uncommon but - # cheap to be correct about). - global_pool.reset_cache() - console.print( - f"[green]Imported {len(personas)} personas to[/] {dest}" - ) - if path != dest: - log.info("emailgen import-personas src=%s dest=%s", path, dest) diff --git a/decnet/cli/gating.py b/decnet/cli/gating.py index 14f62cdc..a373bc1c 100644 --- a/decnet/cli/gating.py +++ b/decnet/cli/gating.py @@ -32,7 +32,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ "db-reset", "init", "webhook", "clusterer", "campaign-clusterer", }) MASTER_ONLY_GROUPS: frozenset[str] = frozenset( - {"swarm", "topology", "geoip", "emailgen"} + {"swarm", "topology", "geoip", "realism"} ) diff --git a/decnet/cli/realism.py b/decnet/cli/realism.py new file mode 100644 index 00000000..dbb936df --- /dev/null +++ b/decnet/cli/realism.py @@ -0,0 +1,111 @@ +"""``decnet realism ...`` — content-engine maintenance commands. + +After stage 5 of the realism migration, this is the only remaining +CLI surface from the realism library / former emailgen. ``decnet +realism run`` does not exist (the orchestrator runs the unified +worker via ``decnet orchestrate``); the only sub-command is +``import-personas``, which validates + installs the host-wide global +persona pool consumed by fleet (MACVLAN/IPVLAN) and SWARM-shard +deckies. + +Topology personas live on ``Topology.email_personas`` and are +managed via the dashboard or the topology API; this command does +not touch them. +""" +from __future__ import annotations + +import json +from pathlib import Path +from typing import Optional + +import typer + +from .gating import _require_master_mode +from .utils import console, log + + +def register(app: typer.Typer) -> None: + realism_app = typer.Typer( + name="realism", + help=( + "Maintain the realism content engine (persona pool import, " + "future content-class tuning)." + ), + ) + app.add_typer(realism_app, name="realism") + + @realism_app.command("import-personas") + def realism_import_personas( + path: Path = typer.Argument( + ..., exists=True, file_okay=True, dir_okay=False, readable=True, + help="JSON file containing a list of EmailPersona objects", + ), + output: Optional[Path] = typer.Option( + None, "--output", "-o", + help=( + "Override the destination path. Defaults to the canonical " + "global pool (DECNET_REALISM_PERSONAS, /etc/decnet/" + "email_personas.json, or ~/.decnet/email_personas.json)." + ), + ), + ) -> None: + """Validate + install a personas JSON file as the global pool. + + Use this when deploying with IMAP/POP3 services on fleet + (MACVLAN/IPVLAN) or SWARM-shard mail deckies — those have no + parent topology row, so they read this host-wide list. + MazeNET topology mail deckies use ``Topology.email_personas`` + instead and this command does not touch them. + """ + _require_master_mode("realism import-personas") + from decnet.realism import personas_pool as global_pool + from decnet.realism.personas import parse_personas + + try: + raw = path.read_text(encoding="utf-8") + except OSError as exc: + console.print(f"[red]Cannot read {path}:[/] {exc}") + raise typer.Exit(code=1) from exc + + try: + payload = json.loads(raw) + except json.JSONDecodeError as exc: + console.print(f"[red]Invalid JSON in {path}:[/] {exc}") + raise typer.Exit(code=1) from exc + if not isinstance(payload, list): + console.print( + f"[red]{path} must contain a JSON list of personas, " + f"got {type(payload).__name__}[/]" + ) + raise typer.Exit(code=1) + + personas = parse_personas(payload) + if not personas: + console.print( + f"[red]No valid personas in {path}.[/] " + "Check the schema (name, email, role, tone, mannerisms)." + ) + raise typer.Exit(code=1) + if len(personas) < 2: + console.print( + f"[yellow]Warning: only {len(personas)} valid persona(s) — " + "the worker requires at least 2 to send mail; importing " + "anyway in case more are added later.[/]" + ) + + dest = output or global_pool.resolve_path() + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_text( + json.dumps( + [p.model_dump(exclude_none=False) for p in personas], + indent=2, + ensure_ascii=False, + ), + encoding="utf-8", + ) + global_pool.reset_cache() + console.print( + f"[green]Imported {len(personas)} personas to[/] {dest}" + ) + if path != dest: + log.info("realism import-personas src=%s dest=%s", path, dest) diff --git a/decnet/orchestrator/emailgen/__init__.py b/decnet/orchestrator/emailgen/__init__.py index b52c4376..aaabc243 100644 --- a/decnet/orchestrator/emailgen/__init__.py +++ b/decnet/orchestrator/emailgen/__init__.py @@ -1,33 +1,20 @@ -"""Emailgen — second orchestrator worker. +"""Emailgen — email-specific delivery, scheduling, and threading. -Generates fake corporate emails (multi-language, threaded, persona-driven) -and drops them into mail-decky maildirs so attackers landing on -IMAP/POP3 honeypots find believable mailboxes instead of empty inboxes. +After stage 5 of the realism migration, ``emailgen`` is no longer a +separate worker / systemd unit / CLI subcommand. It exposes: -The module is intentionally a sibling of :mod:`decnet.orchestrator` (not -a flag on it) — separate worker, separate CLI command -(``decnet emailgen``), separate systemd-supervised lifecycle. Shares the -heartbeat / control-listener scaffolding via :mod:`decnet.bus.publish`. +* :mod:`decnet.orchestrator.emailgen.scheduler` — the + ``EmailAction`` shape and the ``pick(repo)`` policy that decides + which mail decky / sender / recipient / thread an email belongs to. +* :mod:`decnet.orchestrator.emailgen.threads` — RFC 2822 thread chain + helpers (Message-ID generation, Re: / In-Reply-To bookkeeping). +* :mod:`decnet.orchestrator.emailgen.events` — DB-row + bus-topic + builders for email events. -Lazy worker re-export: :func:`emailgen_worker` is loaded on first -attribute access so that submodules can import package-level names -(``decnet.orchestrator.emailgen.events``) without triggering an eager -load of the worker — and through it, the email driver, which imports -back into this package. Without lazy loading the package + driver + -worker form a cycle. +The orchestrator's main worker (:mod:`decnet.orchestrator.worker`) +calls into these modules per tick. LLM glue, persona schema, prompt +builder, and the global persona pool moved to :mod:`decnet.realism` +in stage 2 of the migration; this package keeps only the +email-specific delivery surface. """ from __future__ import annotations - -from typing import TYPE_CHECKING, Any - -if TYPE_CHECKING: # pragma: no cover - typing only - from decnet.orchestrator.emailgen.worker import emailgen_worker # noqa: F401 - -__all__ = ["emailgen_worker"] - - -def __getattr__(name: str) -> Any: - if name == "emailgen_worker": - from decnet.orchestrator.emailgen.worker import emailgen_worker as _w - return _w - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/decnet/orchestrator/emailgen/worker.py b/decnet/orchestrator/emailgen/worker.py deleted file mode 100644 index 315c97e7..00000000 --- a/decnet/orchestrator/emailgen/worker.py +++ /dev/null @@ -1,131 +0,0 @@ -"""Emailgen main loop. - -Mirrors :mod:`decnet.orchestrator.worker` shape: same heartbeat, same -control listener, same fire-and-forget bus publish, same prune knob. -A wedged ollama call stalls only this worker, never the SSH-flavoured -orchestrator running alongside. -""" -from __future__ import annotations - -import asyncio -import contextlib - -from decnet.bus.factory import get_bus -from decnet.bus.publish import ( - publish_safely, - run_control_listener, - run_health_heartbeat, -) -from decnet.logging import get_logger -from decnet.orchestrator.drivers.email import EmailDriver -from decnet.orchestrator.emailgen import events, scheduler -from decnet.web.db.repository import BaseRepository - -logger = get_logger("orchestrator.emailgen") - -# Periodic-prune knobs — same shape as orchestrator/worker.py. -_PRUNE_EVERY_TICKS = 100 -_PRUNE_PER_DECKY_CAP = 5000 - - -async def emailgen_worker( - repo: BaseRepository, - *, - interval: int = 300, - model: str | None = None, -) -> None: - """Periodically generate one fake email into a running mail decky. - - Default interval is 5 minutes — emails are expensive (LLM round - trip) and don't need to fire every minute to look natural. Honors - ``system.emailgen.control`` for graceful shutdown. - """ - logger.info("emailgen worker started interval=%ds model=%s", interval, model) - - bus = None - try: - bus = get_bus(client_name="emailgen") - await bus.connect() - except Exception as exc: # noqa: BLE001 - logger.warning( - "emailgen: bus unavailable, continuing without publish: %s", exc - ) - bus = None - - driver = EmailDriver(model=model) if model else EmailDriver() - shutdown = asyncio.Event() - heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "emailgen")) - control_task = asyncio.create_task( - run_control_listener(bus, "emailgen", shutdown), - ) - tick_n = 0 - try: - while not shutdown.is_set(): - try: - await asyncio.wait_for(shutdown.wait(), timeout=interval) - except asyncio.TimeoutError: - pass # normal tick - if shutdown.is_set(): - break - try: - await _one_tick(repo, driver, bus) - except Exception as exc: # noqa: BLE001 - logger.error("emailgen tick failed: %s", exc) - tick_n += 1 - if tick_n % _PRUNE_EVERY_TICKS == 0: - try: - deleted = await repo.prune_orchestrator_emails( - per_decky_cap=_PRUNE_PER_DECKY_CAP, - ) - if deleted: - logger.info( - "emailgen prune deleted=%d cap=%d", - deleted, _PRUNE_PER_DECKY_CAP, - ) - except Exception as exc: # noqa: BLE001 - logger.error("emailgen prune failed: %s", exc) - finally: - for t in (heartbeat_task, control_task): - t.cancel() - with contextlib.suppress(Exception, asyncio.CancelledError): - await t - if bus is not None: - with contextlib.suppress(Exception): - await bus.close() - - -async def _one_tick(repo: BaseRepository, driver: EmailDriver, bus) -> None: - action = await scheduler.pick(repo) - if action is None: - logger.debug("emailgen: no actionable mail decky / personas this tick") - return - - result = await driver.run(action) - row = events.to_row(action, result) - await repo.record_orchestrator_email(row) - - if bus is not None: - topic = events.topic_for(action) - # Mirror the orchestrator-event SSE-friendly payload shape: ts - # as iso8601, payload as already-serialised dict. - bus_payload = { - "kind": "email", - "mail_decky_uuid": row["mail_decky_uuid"], - "thread_id": row["thread_id"], - "message_id": row["message_id"], - "in_reply_to": row["in_reply_to"], - "sender_email": row["sender_email"], - "recipient_email": row["recipient_email"], - "subject": row["subject"], - "language": row["language"], - "success": row["success"], - "ts": row["ts"].isoformat(), - } - await publish_safely( - bus, topic, bus_payload, event_type=events.event_type_for(action), - ) - - logger.info( - "emailgen tick mail_decky=%s thread=%s success=%s reply=%s", - row["mail_decky_uuid"], row["thread_id"], row["success"], action.is_reply, - ) diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 56750fe7..3d23afdd 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -1,16 +1,27 @@ """Orchestrator main loop. -One tick = one (src, dst, action) pick + one driver invocation + one DB -write + one fire-and-forget bus publish. Intentionally serial — MVP -honesty: a wedged docker exec stalls only this worker, never another. +One tick = one action pick + one driver invocation + one DB write + +one fire-and-forget bus publish. Intentionally serial — MVP honesty: +a wedged docker exec stalls only this worker, never another. -Modeled after :mod:`decnet.profiler.worker` for consistency: same control -listener, same heartbeat helper, same shutdown semantics. +Three action shapes are folded into the single tick after stage 5 of +the realism migration: SSH traffic between deckies, file plants on +deckies (driven by :func:`decnet.realism.planner.pick`), and email +drops into mail-decky maildirs (driven by +:func:`decnet.orchestrator.emailgen.scheduler.pick`). ``decnet +emailgen`` and ``decnet-emailgen.service`` are gone; this worker +covers all three. + +Modeled after :mod:`decnet.profiler.worker` for consistency: same +control listener, same heartbeat helper, same shutdown semantics. """ from __future__ import annotations import asyncio import contextlib +import hashlib +import secrets +from datetime import datetime, timezone from decnet.bus.factory import get_bus from decnet.bus.publish import ( @@ -20,17 +31,34 @@ from decnet.bus.publish import ( ) from decnet.logging import get_logger from decnet.orchestrator import events, scheduler -from decnet.orchestrator.drivers import SSHDriver +from decnet.orchestrator.drivers import get_driver_for +from decnet.orchestrator.emailgen import ( + events as email_events, + scheduler as email_scheduler, +) +from decnet.orchestrator.emailgen.scheduler import EmailAction from decnet.web.db.repository import BaseRepository logger = get_logger("orchestrator") # Periodic-prune knobs. Trim per-decky history every _PRUNE_EVERY_TICKS -# to keep orchestrator_events from unbounded growth on long-running -# fleets. Cheap on the write path (zero overhead per tick); the cost -# pays in once every ~100 ticks. +# to keep orchestrator_events / orchestrator_emails from unbounded +# growth on long-running fleets. Cheap on the write path (zero overhead +# per tick); the cost pays in once every ~100 ticks. _PRUNE_EVERY_TICKS = 100 _PRUNE_PER_DST_CAP = 10000 +_PRUNE_PER_MAIL_DECKY_CAP = 5000 + +# Action-kind weights for the per-tick roll. Email is rare because +# each LLM round-trip is expensive (~seconds) and the prior emailgen +# worker only ticked every 5 minutes. At a 60s orchestrator interval, +# a 10% email weight produces ~one email every ~10 minutes — close +# enough to the pre-collapse cadence. +_ACTION_WEIGHTS: tuple[tuple[str, int], ...] = ( + ("traffic", 45), + ("file", 45), + ("email", 10), +) async def orchestrator_worker( @@ -55,7 +83,6 @@ async def orchestrator_worker( ) bus = None - driver = SSHDriver() shutdown = asyncio.Event() heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "orchestrator")) control_task = asyncio.create_task( @@ -71,22 +98,12 @@ async def orchestrator_worker( if shutdown.is_set(): break try: - await _one_tick(repo, driver, bus) + await _one_tick(repo, bus) except Exception as exc: # noqa: BLE001 logger.error("orchestrator tick failed: %s", exc) tick_n += 1 if tick_n % _PRUNE_EVERY_TICKS == 0: - try: - deleted = await repo.prune_orchestrator_events( - per_dst_cap=_PRUNE_PER_DST_CAP, - ) - if deleted: - logger.info( - "orchestrator prune deleted=%d cap=%d", - deleted, _PRUNE_PER_DST_CAP, - ) - except Exception as exc: # noqa: BLE001 - logger.error("orchestrator prune failed: %s", exc) + await _periodic_prune(repo) finally: for t in (heartbeat_task, control_task): t.cancel() @@ -97,34 +114,80 @@ async def orchestrator_worker( await bus.close() -async def _one_tick(repo: BaseRepository, driver, bus) -> None: - import secrets as _secrets +async def _periodic_prune(repo: BaseRepository) -> None: + try: + deleted = await repo.prune_orchestrator_events(per_dst_cap=_PRUNE_PER_DST_CAP) + if deleted: + logger.info( + "orchestrator events prune deleted=%d cap=%d", + deleted, _PRUNE_PER_DST_CAP, + ) + except Exception as exc: # noqa: BLE001 + logger.error("orchestrator events prune failed: %s", exc) + try: + deleted = await repo.prune_orchestrator_emails( + per_decky_cap=_PRUNE_PER_MAIL_DECKY_CAP, + ) + if deleted: + logger.info( + "orchestrator emails prune deleted=%d cap=%d", + deleted, _PRUNE_PER_MAIL_DECKY_CAP, + ) + except Exception as exc: # noqa: BLE001 + logger.error("orchestrator emails prune failed: %s", exc) - # Union view: MazeNET topology + unihost fleet + SWARM shards. Pre-fleet - # this only saw topology_deckies and was permanently blind to MACVLAN / - # IPVLAN unihost decoys. - deckies = await repo.list_running_deckies() - rng = _secrets.SystemRandom() - # Action-kind roll: 50/50 traffic vs file. Stage 5 of the realism - # migration adds an email branch (when emailgen folds in). When a - # roll yields nothing actionable (e.g. file branch with no personas - # in any persona's work hours), we fall through to the other side - # so a quiet half doesn't silence the whole tick. - action = None - if rng.random() < 0.5: - action = scheduler.pick(deckies, rand=rng) - if action is None: - action = await scheduler.pick_file(deckies, repo, rand=rng) - else: - action = await scheduler.pick_file(deckies, repo, rand=rng) - if action is None: +def _roll_action_kind(rng: secrets.SystemRandom) -> str: + total = sum(w for _, w in _ACTION_WEIGHTS) + target = rng.randint(1, total) + running = 0 + for kind, w in _ACTION_WEIGHTS: + running += w + if target <= running: + return kind + return _ACTION_WEIGHTS[-1][0] # unreachable, satisfy mypy + + +async def _pick_action( + repo: BaseRepository, + deckies: list[dict], + rng: secrets.SystemRandom, +): + """Roll an action-kind, then pick the matching action. + + Quiet branches fall through to the other two so a (decky-set, + persona-pool, mail-decky) shape that would silence one branch + doesn't waste the whole tick. + """ + kinds_in_priority_order = [_roll_action_kind(rng)] + for kind, _ in _ACTION_WEIGHTS: + if kind not in kinds_in_priority_order: + kinds_in_priority_order.append(kind) + + for kind in kinds_in_priority_order: + if kind == "traffic": action = scheduler.pick(deckies, rand=rng) + elif kind == "file": + action = await scheduler.pick_file(deckies, repo, rand=rng) + elif kind == "email": + try: + action = await email_scheduler.pick(repo, rand=rng) + except Exception as exc: # noqa: BLE001 + logger.debug("orchestrator: email pick failed: %s", exc) + action = None + else: + action = None + if action is not None: + return action + return None + +async def _one_tick(repo: BaseRepository, bus) -> None: + deckies = await repo.list_running_deckies() + rng = secrets.SystemRandom() + + action = await _pick_action(repo, deckies, rng) if action is None: - # Report the actual SSH-eligible count (what the scheduler filters - # to), not just len(deckies) — the old "running+ssh count=N" line - # reported the pre-filter count and misled debugging. ssh_eligible = sum( 1 for d in deckies if isinstance(d.get("services"), list) @@ -133,9 +196,8 @@ async def _one_tick(repo: BaseRepository, driver, bus) -> None: ) by_source: dict[str, int] = {} for d in deckies: - by_source[d.get("source", "unknown")] = ( - by_source.get(d.get("source", "unknown"), 0) + 1 - ) + src = d.get("source", "unknown") + by_source[src] = by_source.get(src, 0) + 1 logger.debug( "orchestrator: no actionable deckies " "(running=%d ssh_eligible=%d sources=%s)", @@ -143,26 +205,29 @@ async def _one_tick(repo: BaseRepository, driver, bus) -> None: ) return + driver = get_driver_for(action) result = await driver.run(action) + + if isinstance(action, EmailAction): + await _persist_email(repo, action, result, bus) + else: + await _persist_event(repo, action, result, bus) + if isinstance(action, scheduler.FileAction) and result.success: + try: + await _record_synthetic_file(repo, action) + except Exception as exc: # noqa: BLE001 + logger.warning( + "orchestrator: synthetic_files write failed dst=%s path=%s: %s", + action.dst_uuid, action.path, exc, + ) + + +async def _persist_event(repo, action, result, bus) -> None: row = events.to_row(action, result) await repo.record_orchestrator_event(row) - # Persist realism state for FileAction so stage 3b's edit-in-place - # has something to read back. Failure here is logged but doesn't - # tank the tick — the orchestrator event is the source of truth - # for "this action happened." - if isinstance(action, scheduler.FileAction) and result.success: - try: - await _record_synthetic_file(repo, action, result) - except Exception as exc: # noqa: BLE001 - logger.warning( - "orchestrator: synthetic_files write failed dst=%s path=%s: %s", - action.dst_uuid, action.path, exc, - ) if bus is not None: topic = events.topic_for(action) - # Bus payload mirrors the row but uses iso8601 for ts so SSE - # consumers don't have to JSON-handle datetime themselves. bus_payload = { "kind": row["kind"], "protocol": row["protocol"], @@ -174,7 +239,7 @@ async def _one_tick(repo: BaseRepository, driver, bus) -> None: "ts": row["ts"].isoformat(), } await publish_safely( - bus, topic, bus_payload, event_type=events.event_type_for(action) + bus, topic, bus_payload, event_type=events.event_type_for(action), ) logger.info( @@ -183,19 +248,52 @@ async def _one_tick(repo: BaseRepository, driver, bus) -> None: ) -async def _record_synthetic_file(repo, action, result) -> None: - """Persist a synthetic_files row after a successful FileAction plant. +async def _persist_email(repo, action: EmailAction, result, bus) -> None: + """Persist + publish an email tick result. + + Mirrors the pre-collapse emailgen worker payload exactly so SSE + subscribers and dashboards keep working without a breaking change + to the on-the-wire shape. + """ + row = email_events.to_row(action, result) + await repo.record_orchestrator_email(row) + + if bus is not None: + topic = email_events.topic_for(action) + bus_payload = { + "kind": "email", + "mail_decky_uuid": row["mail_decky_uuid"], + "thread_id": row["thread_id"], + "message_id": row["message_id"], + "in_reply_to": row["in_reply_to"], + "sender_email": row["sender_email"], + "recipient_email": row["recipient_email"], + "subject": row["subject"], + "language": row["language"], + "success": row["success"], + "ts": row["ts"].isoformat(), + } + await publish_safely( + bus, topic, bus_payload, + event_type=email_events.event_type_for(action), + ) + + logger.info( + "orchestrator tick kind=email mail_decky=%s thread=%s success=%s reply=%s", + row["mail_decky_uuid"], row["thread_id"], row["success"], action.is_reply, + ) + + +async def _record_synthetic_file(repo, action) -> None: + """Persist (or patch) a synthetic_files row after a FileAction plant. Idempotent on ``(decky_uuid, path)``: when the unique constraint - fires (the file existed already), we instead patch the existing - row's ``last_modified`` / ``content_hash`` / ``last_body`` / bump + fires (the file existed already), we patch the existing row's + ``last_modified`` / ``content_hash`` / ``last_body`` / bump ``edit_count`` so the dashboard's "files this decky has grown" view stays accurate even when the orchestrator re-plants the same location. """ - import hashlib - from datetime import datetime, timezone - body = action.content or "" content_hash = hashlib.sha256(body.encode("utf-8")).hexdigest() now = datetime.now(timezone.utc) @@ -216,8 +314,6 @@ async def _record_synthetic_file(repo, action, result) -> None: try: await repo.record_synthetic_file(row) except Exception: # noqa: BLE001 - # Most likely the unique constraint on (decky_uuid, path) - # fired — flip to update mode by looking up the existing row. existing = await repo.list_synthetic_files( decky_uuid=action.dst_uuid, limit=200, ) diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 208fc74d..e7b5e065 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -31,7 +31,7 @@ from .campaigns.api_list_campaign_identities import router as campaign_identitie from .campaigns.api_events import router as campaign_events_router from .orchestrator.api_list_events import router as orchestrator_list_router from .orchestrator.api_events import router as orchestrator_events_router -from .emailgen.api_personas import router as emailgen_personas_router +from .realism.api_personas import router as realism_personas_router from .transcripts import transcripts_router from .config.api_get_config import router as config_get_router from .config.api_update_config import router as config_update_router @@ -111,10 +111,10 @@ api_router.include_router(campaign_events_router) api_router.include_router(orchestrator_list_router) api_router.include_router(orchestrator_events_router) -# Emailgen — global persona pool CRUD for the dashboard's -# "Persona Generation" page. The worker reads from the same on-disk -# JSON file directly (see decnet.realism.personas_pool). -api_router.include_router(emailgen_personas_router) +# Realism — global persona pool CRUD for the dashboard's +# "Persona Generation" page. The orchestrator reads from the same +# on-disk JSON file directly (see decnet.realism.personas_pool). +api_router.include_router(realism_personas_router) # Observability api_router.include_router(stats_router) diff --git a/decnet/web/router/emailgen/__init__.py b/decnet/web/router/realism/__init__.py similarity index 100% rename from decnet/web/router/emailgen/__init__.py rename to decnet/web/router/realism/__init__.py diff --git a/decnet/web/router/emailgen/api_personas.py b/decnet/web/router/realism/api_personas.py similarity index 91% rename from decnet/web/router/emailgen/api_personas.py rename to decnet/web/router/realism/api_personas.py index 9e8c0e70..9cfb1bc2 100644 --- a/decnet/web/router/emailgen/api_personas.py +++ b/decnet/web/router/realism/api_personas.py @@ -1,4 +1,4 @@ -"""GET/PUT ``/api/v1/emailgen/personas`` — global persona pool CRUD. +"""GET/PUT ``/api/v1/realism/personas`` — global persona pool CRUD. The "global pool" is a JSON file consumed by the realism content engine for fleet (MACVLAN/IPVLAN) and SWARM-shard deckies — see @@ -29,7 +29,7 @@ from decnet.web.dependencies import require_admin, require_viewer from decnet.web.db.models.common import MessageResponse # noqa: F401 - response shape router = APIRouter() -log = get_logger("api.emailgen.personas") +log = get_logger("api.realism.personas") def _serialize(personas: list[EmailPersona]) -> list[dict[str, Any]]: @@ -38,14 +38,14 @@ def _serialize(personas: list[EmailPersona]) -> list[dict[str, Any]]: @router.get( - "/emailgen/personas", + "/realism/personas", tags=["Emailgen"], responses={ 401: {"description": "Could not validate credentials"}, 403: {"description": "Insufficient permissions"}, }, ) -@_traced("api.emailgen.list_personas") +@_traced("api.realism.list_personas") async def list_personas( user: dict = Depends(require_viewer), ) -> dict[str, Any]: @@ -56,7 +56,7 @@ async def list_personas( discoverable. """ # Reset the in-process cache before reading so a fresh CLI-driven - # ``decnet emailgen import-personas`` shows up immediately rather + # ``decnet realism import-personas`` shows up immediately rather # than waiting on the worker's mtime check. global_pool.reset_cache() personas = global_pool.load() @@ -67,7 +67,7 @@ async def list_personas( @router.put( - "/emailgen/personas", + "/realism/personas", tags=["Emailgen"], responses={ 400: {"description": "Invalid persona payload"}, @@ -75,7 +75,7 @@ async def list_personas( 403: {"description": "Insufficient permissions"}, }, ) -@_traced("api.emailgen.replace_personas") +@_traced("api.realism.replace_personas") async def replace_personas( body: dict[str, Any], user: dict = Depends(require_admin), @@ -121,7 +121,7 @@ async def replace_personas( # not writable by the API process. Surface a 500 with the # actionable hint instead of leaking a traceback. log.warning( - "api.emailgen.replace_personas write failed path=%s err=%s", + "api.realism.replace_personas write failed path=%s err=%s", dest, exc, ) raise HTTPException( @@ -134,7 +134,7 @@ async def replace_personas( ) from exc global_pool.reset_cache() log.info( - "api.emailgen.replace_personas user=%s wrote=%d path=%s", + "api.realism.replace_personas user=%s wrote=%d path=%s", user.get("username", user.get("uuid")), len(parsed), dest, ) return { diff --git a/decnet_web/src/components/MazeNET/MazeNET.tsx b/decnet_web/src/components/MazeNET/MazeNET.tsx index d34d8257..ff906d8e 100644 --- a/decnet_web/src/components/MazeNET/MazeNET.tsx +++ b/decnet_web/src/components/MazeNET/MazeNET.tsx @@ -3,7 +3,7 @@ import { useSearchParams, useNavigate } from 'react-router-dom'; import { PanelRightOpen, PanelRightClose, PanelLeftOpen, PanelLeftClose, Maximize2, Minimize2, RotateCcw, UploadCloud, ArrowLeft, - Plus, Trash2, Zap, Copy, Eye, ShieldAlert, GitMerge, Server, + Plus, Trash2, Zap, Copy, Eye, ShieldAlert, GitMerge, Server, Mail, } from '../../icons'; import './MazeNET.css'; import axios from '../../utils/api'; @@ -707,6 +707,15 @@ const MazeNET: React.FC = () => { + + + + + +); + +// ─── Editor modal ───────────────────────────────────────────────────────── + +interface PersonaEditorProps { + open: boolean; + editing: boolean; + draft: EmailPersona; + setDraft: (p: EmailPersona) => void; + draftError: string | null; + mannerismDraft: string; + setMannerismDraft: (s: string) => void; + onClose: () => void; + onSave: () => void; +} + +const PersonaEditor: React.FC = ({ + open, editing, draft, setDraft, draftError, + mannerismDraft, setMannerismDraft, onClose, onSave, +}) => { + const addMannerism = () => { + const t = mannerismDraft.trim(); + if (!t) return; + if (draft.mannerisms.includes(t)) { + setMannerismDraft(''); + return; + } + setDraft({ ...draft, mannerisms: [...draft.mannerisms, t] }); + setMannerismDraft(''); + }; + + const removeMannerism = (idx: number) => { + setDraft({ + ...draft, + mannerisms: draft.mannerisms.filter((_, i) => i !== idx), + }); + }; + + return ( + + + + + } + > +
+
+
+ + setDraft({ ...draft, name: e.target.value })} + placeholder="John Smith" + /> +
+
+ + setDraft({ ...draft, email: e.target.value })} + placeholder="john.smith@corp.com" + /> +
+
+ +
+ + setDraft({ ...draft, role: e.target.value })} + placeholder="Chief Operating Officer" + /> +
+ +
+
+ + + {draft.tone === 'custom' && ( + + setDraft({ ...draft, tone_custom: e.target.value || null }) + } + placeholder="e.g. terse, deadpan, sarcastic-but-polite" + /> + )} +
+
+ + setDraft({ ...draft, language: e.target.value || null })} + placeholder="en" + /> +
+
+ + +
+
+ + setDraft({ ...draft, active_hours: e.target.value })} + placeholder="09:00-18:00 (wraps OK)" + /> +
+
+ +
+ +
+ setMannerismDraft(e.target.value)} + onKeyDown={(e) => { + if (e.key === 'Enter') { + e.preventDefault(); + addMannerism(); + } + }} + placeholder="opens with 'Hey' not 'Dear'" + /> + +
+ {draft.mannerisms.length > 0 && ( +
+ {draft.mannerisms.map((m, i) => ( + removeMannerism(i)} + title="click to remove" + > + {m} ✕ + + ))} +
+ )} +
+ +
+ +