Files
DECNET/decnet/orchestrator/emailgen/global_pool.py
anti 4badc75fb2 feat(emailgen): global persona pool + Date-stamped EML mtimes
Two changes that unwind earlier MazeNET-only assumptions and fix a
realism tell:

1. Persona resolution is now per-decky-source, not topology-only.  The
   scheduler walks the union view (list_running_deckies, including
   fleet MACVLAN/IPVLAN + SWARM shards) and picks the right persona
   list for each source:
     * topology decky -> Topology.email_personas (per-topology richness
       preserved)
     * fleet / shard  -> a single host-wide pool loaded from disk
       (DECNET_EMAILGEN_PERSONAS, /etc/decnet/email_personas.json, or
       ~/.decnet/email_personas.json)
   Operators install the global pool via 'decnet emailgen
   import-personas <file>' which validates with the same Pydantic
   schema the worker uses.

2. The driver now runs 'touch -d <Date>' inside the docker exec right
   after the EML write so file mtime matches the email's RFC 2822
   Date: header.  Without this an attacker 'ls -lt'ing the spool sees
   every email clustered inside the worker's tick window — the
   cluster itself was a stylometric tell.

CLI now exposes 'decnet emailgen' as a sub-app with 'run' (default,
backwards-compatible with bare 'decnet emailgen') and 'import-personas'.
list_running_deckies carries topology_id through so consumers can resolve
the parent topology without a second round-trip.
2026-04-26 22:39:16 -04:00

137 lines
4.7 KiB
Python

"""Global persona pool — non-topology mail deckies.
DECNET runs in three deployment shapes that emit running deckies:
* **MazeNET topologies** — each topology owns its own
:attr:`Topology.email_personas` JSON list; the scheduler walks back
from the mail decky to its parent topology row.
* **Unihost fleet** — MACVLAN/IPVLAN deckies that have no
parent topology row at all. They share one host-wide pool.
* **SWARM shards** — DeckyShard rows on enrolled workers.
Same shape as fleet for emailgen purposes (no parent topology row),
so they read the same global pool.
This module owns the global pool: a JSON file on disk that operators
populate via ``decnet emailgen import-personas <file>`` (or by editing
the file directly). The file is loaded lazily on first read and
re-loaded on mtime change so a CLI import takes effect for the running
worker without a restart.
Path resolution order:
1. ``DECNET_EMAILGEN_PERSONAS`` environment variable — explicit override.
2. ``/etc/decnet/email_personas.json`` — canonical master path; this is
what ``decnet init`` will eventually own.
3. ``~/.decnet/email_personas.json`` — dev fallback so a developer can
exercise the worker without root or ``decnet init``.
When the file is missing / empty / unparseable, the pool is empty and
the scheduler skips fleet/shard mail deckies the same way it skips a
topology with too few personas. No silent fallback to dummy personas;
silence is correct when there's no opinion to convey.
"""
from __future__ import annotations
import os
import threading
from pathlib import Path
from typing import Optional
from decnet.logging import get_logger
from decnet.orchestrator.emailgen.personas import EmailPersona, parse_personas
logger = get_logger("orchestrator.emailgen")
_ENV_VAR = "DECNET_EMAILGEN_PERSONAS"
_SYSTEM_PATH = Path("/etc/decnet/email_personas.json")
def _user_path() -> Path:
return Path(os.path.expanduser("~/.decnet/email_personas.json"))
def resolve_path() -> Path:
"""Return the path the global pool would load from right now.
The file may not exist; callers are expected to handle that. The
function is pure (no I/O) so the ``decnet emailgen import-personas``
CLI can ask "where would I write to?" without touching the disk.
"""
override = os.environ.get(_ENV_VAR, "").strip()
if override:
return Path(override)
if _SYSTEM_PATH.parent.exists() or _SYSTEM_PATH.exists():
return _SYSTEM_PATH
return _user_path()
# ── Cache ────────────────────────────────────────────────────────────────────
# Lock-protected because two scheduler ticks could race on the first load,
# and the read path is hot enough (every tick, every fleet/shard mail
# decky) that re-parsing on every call is wasteful.
_lock = threading.Lock()
_cache: list[EmailPersona] = []
_cache_path: Optional[Path] = None
_cache_mtime: float = 0.0
def load(*, language_default: str = "en") -> list[EmailPersona]:
"""Return the parsed global persona pool.
*language_default* fills in any persona missing a ``language`` field;
fleet/shard sources have no topology-level default, so callers
should pass the worker's best guess (typically ``"en"``).
Threadsafe and cheap on the steady state (mtime check + dict lookup);
expensive only when the file changed since the last call.
"""
path = resolve_path()
try:
st = path.stat()
except OSError:
with _lock:
global _cache, _cache_path, _cache_mtime
_cache = []
_cache_path = path
_cache_mtime = 0.0
return []
with _lock:
if (
_cache_path == path
and _cache_mtime == st.st_mtime
and _cache # non-empty cache; empty re-parses cheaply anyway
):
return _cache
try:
raw = path.read_text(encoding="utf-8")
except OSError as exc:
logger.warning("emailgen global pool: read failed path=%s: %s", path, exc)
return []
parsed = parse_personas(raw, language_default=language_default)
with _lock:
_cache = parsed
_cache_path = path
_cache_mtime = st.st_mtime
if parsed:
logger.info(
"emailgen global pool: loaded %d personas from %s", len(parsed), path,
)
return parsed
def reset_cache() -> None:
"""Clear the in-process cache.
Test-only helper — avoids stale state when several tests in the
same process exercise different on-disk pools.
"""
global _cache, _cache_path, _cache_mtime
with _lock:
_cache = []
_cache_path = None
_cache_mtime = 0.0