feat(emailgen): global persona pool + Date-stamped EML mtimes
Two changes that unwind earlier MazeNET-only assumptions and fix a
realism tell:
1. Persona resolution is now per-decky-source, not topology-only. The
scheduler walks the union view (list_running_deckies, including
fleet MACVLAN/IPVLAN + SWARM shards) and picks the right persona
list for each source:
* topology decky -> Topology.email_personas (per-topology richness
preserved)
* fleet / shard -> a single host-wide pool loaded from disk
(DECNET_EMAILGEN_PERSONAS, /etc/decnet/email_personas.json, or
~/.decnet/email_personas.json)
Operators install the global pool via 'decnet emailgen
import-personas <file>' which validates with the same Pydantic
schema the worker uses.
2. The driver now runs 'touch -d <Date>' inside the docker exec right
after the EML write so file mtime matches the email's RFC 2822
Date: header. Without this an attacker 'ls -lt'ing the spool sees
every email clustered inside the worker's tick window — the
cluster itself was a stylometric tell.
CLI now exposes 'decnet emailgen' as a sub-app with 'run' (default,
backwards-compatible with bare 'decnet emailgen') and 'import-personas'.
list_running_deckies carries topology_id through so consumers can resolve
the parent topology without a second round-trip.
This commit is contained in:
@@ -239,9 +239,17 @@ class EmailDriver:
|
||||
container = _container_for(
|
||||
action.mail_decky_name, list(action.mail_decky_services),
|
||||
)
|
||||
# Stamp the file's mtime + atime to match the EML's Date: header
|
||||
# so an attacker `ls -lt`'ing the spool doesn't see a wall of
|
||||
# files all created within the worker's tick window — the cluster
|
||||
# itself is a tell. ``touch -d`` on GNU coreutils accepts RFC
|
||||
# 2822 dates directly via the same formatdate() string we wrote
|
||||
# into the header, so no extra parsing on the container side.
|
||||
eml_date_header = formatdate(ts.timestamp(), localtime=False)
|
||||
sh_cmd = (
|
||||
f"mkdir -p {shlex.quote(eml_dir)} && "
|
||||
f"tee {shlex.quote(eml_path)} >/dev/null"
|
||||
f"tee {shlex.quote(eml_path)} >/dev/null && "
|
||||
f"touch -d {shlex.quote(eml_date_header)} {shlex.quote(eml_path)}"
|
||||
)
|
||||
argv = [_DOCKER, "exec", "-i", container, "sh", "-c", sh_cmd]
|
||||
rc2, _stdout2, stderr2 = await _run_capture(
|
||||
|
||||
136
decnet/orchestrator/emailgen/global_pool.py
Normal file
136
decnet/orchestrator/emailgen/global_pool.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Global persona pool — non-topology mail deckies.
|
||||
|
||||
DECNET runs in three deployment shapes that emit running deckies:
|
||||
|
||||
* **MazeNET topologies** — each topology owns its own
|
||||
:attr:`Topology.email_personas` JSON list; the scheduler walks back
|
||||
from the mail decky to its parent topology row.
|
||||
* **Unihost fleet** — MACVLAN/IPVLAN deckies that have no
|
||||
parent topology row at all. They share one host-wide pool.
|
||||
* **SWARM shards** — DeckyShard rows on enrolled workers.
|
||||
Same shape as fleet for emailgen purposes (no parent topology row),
|
||||
so they read the same global pool.
|
||||
|
||||
This module owns the global pool: a JSON file on disk that operators
|
||||
populate via ``decnet emailgen import-personas <file>`` (or by editing
|
||||
the file directly). The file is loaded lazily on first read and
|
||||
re-loaded on mtime change so a CLI import takes effect for the running
|
||||
worker without a restart.
|
||||
|
||||
Path resolution order:
|
||||
|
||||
1. ``DECNET_EMAILGEN_PERSONAS`` environment variable — explicit override.
|
||||
2. ``/etc/decnet/email_personas.json`` — canonical master path; this is
|
||||
what ``decnet init`` will eventually own.
|
||||
3. ``~/.decnet/email_personas.json`` — dev fallback so a developer can
|
||||
exercise the worker without root or ``decnet init``.
|
||||
|
||||
When the file is missing / empty / unparseable, the pool is empty and
|
||||
the scheduler skips fleet/shard mail deckies the same way it skips a
|
||||
topology with too few personas. No silent fallback to dummy personas;
|
||||
silence is correct when there's no opinion to convey.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.orchestrator.emailgen.personas import EmailPersona, parse_personas
|
||||
|
||||
logger = get_logger("orchestrator.emailgen")
|
||||
|
||||
_ENV_VAR = "DECNET_EMAILGEN_PERSONAS"
|
||||
_SYSTEM_PATH = Path("/etc/decnet/email_personas.json")
|
||||
|
||||
|
||||
def _user_path() -> Path:
|
||||
return Path(os.path.expanduser("~/.decnet/email_personas.json"))
|
||||
|
||||
|
||||
def resolve_path() -> Path:
|
||||
"""Return the path the global pool would load from right now.
|
||||
|
||||
The file may not exist; callers are expected to handle that. The
|
||||
function is pure (no I/O) so the ``decnet emailgen import-personas``
|
||||
CLI can ask "where would I write to?" without touching the disk.
|
||||
"""
|
||||
override = os.environ.get(_ENV_VAR, "").strip()
|
||||
if override:
|
||||
return Path(override)
|
||||
if _SYSTEM_PATH.parent.exists() or _SYSTEM_PATH.exists():
|
||||
return _SYSTEM_PATH
|
||||
return _user_path()
|
||||
|
||||
|
||||
# ── Cache ────────────────────────────────────────────────────────────────────
|
||||
# Lock-protected because two scheduler ticks could race on the first load,
|
||||
# and the read path is hot enough (every tick, every fleet/shard mail
|
||||
# decky) that re-parsing on every call is wasteful.
|
||||
|
||||
_lock = threading.Lock()
|
||||
_cache: list[EmailPersona] = []
|
||||
_cache_path: Optional[Path] = None
|
||||
_cache_mtime: float = 0.0
|
||||
|
||||
|
||||
def load(*, language_default: str = "en") -> list[EmailPersona]:
|
||||
"""Return the parsed global persona pool.
|
||||
|
||||
*language_default* fills in any persona missing a ``language`` field;
|
||||
fleet/shard sources have no topology-level default, so callers
|
||||
should pass the worker's best guess (typically ``"en"``).
|
||||
|
||||
Threadsafe and cheap on the steady state (mtime check + dict lookup);
|
||||
expensive only when the file changed since the last call.
|
||||
"""
|
||||
path = resolve_path()
|
||||
try:
|
||||
st = path.stat()
|
||||
except OSError:
|
||||
with _lock:
|
||||
global _cache, _cache_path, _cache_mtime
|
||||
_cache = []
|
||||
_cache_path = path
|
||||
_cache_mtime = 0.0
|
||||
return []
|
||||
|
||||
with _lock:
|
||||
if (
|
||||
_cache_path == path
|
||||
and _cache_mtime == st.st_mtime
|
||||
and _cache # non-empty cache; empty re-parses cheaply anyway
|
||||
):
|
||||
return _cache
|
||||
|
||||
try:
|
||||
raw = path.read_text(encoding="utf-8")
|
||||
except OSError as exc:
|
||||
logger.warning("emailgen global pool: read failed path=%s: %s", path, exc)
|
||||
return []
|
||||
|
||||
parsed = parse_personas(raw, language_default=language_default)
|
||||
with _lock:
|
||||
_cache = parsed
|
||||
_cache_path = path
|
||||
_cache_mtime = st.st_mtime
|
||||
if parsed:
|
||||
logger.info(
|
||||
"emailgen global pool: loaded %d personas from %s", len(parsed), path,
|
||||
)
|
||||
return parsed
|
||||
|
||||
|
||||
def reset_cache() -> None:
|
||||
"""Clear the in-process cache.
|
||||
|
||||
Test-only helper — avoids stale state when several tests in the
|
||||
same process exercise different on-disk pools.
|
||||
"""
|
||||
global _cache, _cache_path, _cache_mtime
|
||||
with _lock:
|
||||
_cache = []
|
||||
_cache_path = None
|
||||
_cache_mtime = 0.0
|
||||
@@ -25,6 +25,7 @@ from datetime import datetime
|
||||
from typing import Any, Optional
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.orchestrator.emailgen import global_pool
|
||||
from decnet.orchestrator.emailgen.personas import (
|
||||
EmailPersona,
|
||||
in_active_hours,
|
||||
@@ -104,55 +105,81 @@ def _is_mail_decky(decky: dict[str, Any]) -> bool:
|
||||
return any(s in services for s in _MAIL_SERVICES)
|
||||
|
||||
|
||||
async def _resolve_personas(
|
||||
repo: Any, mail_decky: dict[str, Any],
|
||||
) -> tuple[list[EmailPersona], str]:
|
||||
"""Pick the right persona source for *mail_decky* and return the list.
|
||||
|
||||
Returns ``(personas, source_label)`` so logs can disambiguate why a
|
||||
tick was skipped. Source label is the same string ``list_running_deckies``
|
||||
sets on the row (``"topology" | "fleet" | "shard"``) so the logger
|
||||
reads consistently against the rest of the orchestrator.
|
||||
|
||||
Resolution rules (matches the design discussion):
|
||||
* **topology** source → walk to ``Topology.email_personas``; the
|
||||
topology owns its own list. Each topology can have different
|
||||
personas.
|
||||
* **fleet** / **shard** source → unihost MACVLAN/IPVLAN deckies and
|
||||
SWARM shards have no parent topology row, so they share a single
|
||||
host-wide pool loaded from disk by :mod:`global_pool`.
|
||||
"""
|
||||
source = mail_decky.get("source") or "unknown"
|
||||
if source == "topology":
|
||||
topology_id = mail_decky.get("topology_id")
|
||||
if not topology_id:
|
||||
return [], source
|
||||
topology = await repo.get_topology(topology_id)
|
||||
if not topology:
|
||||
return [], source
|
||||
return (
|
||||
parse_personas(
|
||||
topology.get("email_personas"),
|
||||
language_default=topology.get("language_default") or "en",
|
||||
),
|
||||
source,
|
||||
)
|
||||
# Fleet / shard / anything else → global pool.
|
||||
return global_pool.load(), source
|
||||
|
||||
|
||||
async def pick(
|
||||
repo: Any,
|
||||
*,
|
||||
rand: Optional[secrets.SystemRandom] = None,
|
||||
now: Optional[datetime] = None,
|
||||
) -> Optional[EmailAction]:
|
||||
"""Pick one email action against the running fleet.
|
||||
"""Pick one email action against any running mail decky.
|
||||
|
||||
*repo* is a :class:`BaseRepository`; we fetch running topology
|
||||
deckies + their parent topology row directly. *now* is the
|
||||
wall-clock used for ``active_hours`` filtering — injected so tests
|
||||
can pin the hour deterministically.
|
||||
Mail-decky discovery uses the **union view** (``list_running_deckies``):
|
||||
MazeNET topology deckies, unihost fleet deckies, and SWARM shards are
|
||||
all eligible. Persona source is per-decky-source; see
|
||||
:func:`_resolve_personas`. *now* is the wall-clock used for
|
||||
``active_hours`` filtering — injected so tests can pin the hour
|
||||
deterministically.
|
||||
"""
|
||||
rng = rand or secrets.SystemRandom()
|
||||
now_dt = now or datetime.now()
|
||||
|
||||
deckies = await repo.list_running_topology_deckies()
|
||||
deckies = await repo.list_running_deckies()
|
||||
mail_deckies = [d for d in deckies if _is_mail_decky(d)]
|
||||
if not mail_deckies:
|
||||
logger.debug("emailgen pick: no running mail decky")
|
||||
return None
|
||||
|
||||
mail_decky = rng.choice(mail_deckies)
|
||||
topology_id = mail_decky.get("topology_id")
|
||||
if not topology_id:
|
||||
logger.debug("emailgen pick: mail decky has no topology_id")
|
||||
return None
|
||||
|
||||
topology = await repo.get_topology(topology_id)
|
||||
if not topology:
|
||||
logger.debug("emailgen pick: topology %s not found", topology_id)
|
||||
return None
|
||||
|
||||
personas = parse_personas(
|
||||
topology.get("email_personas"),
|
||||
language_default=topology.get("language_default") or "en",
|
||||
)
|
||||
personas, source = await _resolve_personas(repo, mail_decky)
|
||||
if len(personas) < 2:
|
||||
logger.debug(
|
||||
"emailgen pick: topology=%s has only %d personas; need >=2",
|
||||
topology_id, len(personas),
|
||||
"emailgen pick: source=%s mail_decky=%s only %d personas; need >=2",
|
||||
source, mail_decky.get("uuid"), len(personas),
|
||||
)
|
||||
return None
|
||||
|
||||
active = [p for p in personas if in_active_hours(p, now_dt.hour)]
|
||||
if len(active) < 2:
|
||||
logger.debug(
|
||||
"emailgen pick: topology=%s only %d personas in-hours",
|
||||
topology_id, len(active),
|
||||
"emailgen pick: source=%s mail_decky=%s only %d personas in-hours",
|
||||
source, mail_decky.get("uuid"), len(active),
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user