diff --git a/decnet/cli/emailgen.py b/decnet/cli/emailgen.py index 60e332a3..59817e81 100644 --- a/decnet/cli/emailgen.py +++ b/decnet/cli/emailgen.py @@ -1,13 +1,23 @@ -"""``decnet emailgen`` — second orchestrator worker. +"""``decnet emailgen ...`` — orchestrator-sibling email generator. -Sibling of :mod:`decnet.cli.orchestrator`. Two distinct CLI entrypoints -match the "workers are independent, never coupled" principle: a wedged -ollama call in emailgen does not stall the SSH-flavoured orchestrator, -and systemd supervises each loop separately. +Sub-commands: + +* ``decnet emailgen run`` — start the long-running worker + (default when invoked with no sub-command, so the historical + ``decnet emailgen`` invocation still works). +* ``decnet emailgen import-personas`` — validate a JSON file and + install it as the host-wide global persona pool consumed by fleet + (MACVLAN/IPVLAN) and SWARM-shard mail deckies. + +The worker itself stays in :mod:`decnet.orchestrator.emailgen.worker`; +this module only owns the CLI surface. """ from __future__ import annotations +import json import os +from pathlib import Path +from typing import Optional import typer @@ -16,8 +26,26 @@ from .utils import console, log def register(app: typer.Typer) -> None: - @app.command(name="emailgen") - def emailgen_cmd( + emailgen_app = typer.Typer( + name="emailgen", + help=( + "Drip persona-driven fake corporate email into running " + "IMAP/POP3 mail deckies." + ), + invoke_without_command=True, + no_args_is_help=False, + ) + app.add_typer(emailgen_app, name="emailgen") + + @emailgen_app.callback() + def _default(ctx: typer.Context) -> None: + # Calling ``decnet emailgen`` with no sub-command defers to ``run`` + # so the documented (and shipped) invocation stays valid. + if ctx.invoked_subcommand is None: + ctx.invoke(emailgen_run) + + @emailgen_app.command("run") + def emailgen_run( interval: int = typer.Option( 300, "--interval", "-i", help="Seconds between fake-email generation ticks (default 5m)", @@ -32,7 +60,7 @@ def register(app: typer.Typer) -> None: "or 'llama3.1')", ), ) -> None: - """Drip fake corporate emails into running IMAP/POP3 mail deckies.""" + """Start the long-running email-generation worker.""" import asyncio from decnet.orchestrator.emailgen import emailgen_worker from decnet.web.dependencies import repo @@ -64,3 +92,88 @@ def register(app: typer.Typer) -> None: asyncio.run(_run()) except KeyboardInterrupt: console.print("\n[yellow]Emailgen stopped.[/]") + + @emailgen_app.command("import-personas") + def emailgen_import_personas( + path: Path = typer.Argument( + ..., exists=True, file_okay=True, dir_okay=False, readable=True, + help="JSON file containing a list of EmailPersona objects", + ), + output: Optional[Path] = typer.Option( + None, "--output", "-o", + help=( + "Override the destination path. Defaults to the canonical " + "global pool (DECNET_EMAILGEN_PERSONAS, /etc/decnet/" + "email_personas.json, or ~/.decnet/email_personas.json)." + ), + ), + ) -> None: + """Validate + install a personas JSON file as the global pool. + + Use this when deploying with IMAP/POP3 services on fleet + (MACVLAN/IPVLAN) or SWARM-shard mail deckies — those have no + parent topology row, so they read this host-wide list. MazeNET + topology mail deckies use ``Topology.email_personas`` instead and + this command does not touch them. + """ + from decnet.orchestrator.emailgen import global_pool + from decnet.orchestrator.emailgen.personas import parse_personas + + try: + raw = path.read_text(encoding="utf-8") + except OSError as exc: + console.print(f"[red]Cannot read {path}:[/] {exc}") + raise typer.Exit(code=1) from exc + + # Validate by parsing — we want operators to find out about + # broken personas at import time, not at the next worker tick. + try: + payload = json.loads(raw) + except json.JSONDecodeError as exc: + console.print(f"[red]Invalid JSON in {path}:[/] {exc}") + raise typer.Exit(code=1) from exc + if not isinstance(payload, list): + console.print( + f"[red]{path} must contain a JSON list of personas, " + f"got {type(payload).__name__}[/]" + ) + raise typer.Exit(code=1) + + personas = parse_personas(payload) + if not personas: + console.print( + f"[red]No valid personas in {path}.[/] " + "Check the schema (name, email, role, tone, mannerisms)." + ) + raise typer.Exit(code=1) + if len(personas) < 2: + console.print( + f"[yellow]Warning: only {len(personas)} valid persona(s) — " + "the worker requires at least 2 to send mail; importing " + "anyway in case more are added later.[/]" + ) + + dest = output or global_pool.resolve_path() + dest.parent.mkdir(parents=True, exist_ok=True) + # Re-serialise from the parsed-and-validated objects rather than + # copying the source file: drops invalid entries, normalises + # whitespace, and gives operators a single canonical layout to + # eyeball after the import. + dest.write_text( + json.dumps( + [p.model_dump(exclude_none=False) for p in personas], + indent=2, + ensure_ascii=False, + ), + encoding="utf-8", + ) + # Cache invalidation happens automatically on next ``load()`` + # via the mtime check, but reset the in-process cache too in + # case the CLI process is the same as the worker (uncommon but + # cheap to be correct about). + global_pool.reset_cache() + console.print( + f"[green]Imported {len(personas)} personas to[/] {dest}" + ) + if path != dest: + log.info("emailgen import-personas src=%s dest=%s", path, dest) diff --git a/decnet/orchestrator/drivers/email.py b/decnet/orchestrator/drivers/email.py index c30fe228..2ccd1c48 100644 --- a/decnet/orchestrator/drivers/email.py +++ b/decnet/orchestrator/drivers/email.py @@ -239,9 +239,17 @@ class EmailDriver: container = _container_for( action.mail_decky_name, list(action.mail_decky_services), ) + # Stamp the file's mtime + atime to match the EML's Date: header + # so an attacker `ls -lt`'ing the spool doesn't see a wall of + # files all created within the worker's tick window — the cluster + # itself is a tell. ``touch -d`` on GNU coreutils accepts RFC + # 2822 dates directly via the same formatdate() string we wrote + # into the header, so no extra parsing on the container side. + eml_date_header = formatdate(ts.timestamp(), localtime=False) sh_cmd = ( f"mkdir -p {shlex.quote(eml_dir)} && " - f"tee {shlex.quote(eml_path)} >/dev/null" + f"tee {shlex.quote(eml_path)} >/dev/null && " + f"touch -d {shlex.quote(eml_date_header)} {shlex.quote(eml_path)}" ) argv = [_DOCKER, "exec", "-i", container, "sh", "-c", sh_cmd] rc2, _stdout2, stderr2 = await _run_capture( diff --git a/decnet/orchestrator/emailgen/global_pool.py b/decnet/orchestrator/emailgen/global_pool.py new file mode 100644 index 00000000..a5d6d4d9 --- /dev/null +++ b/decnet/orchestrator/emailgen/global_pool.py @@ -0,0 +1,136 @@ +"""Global persona pool — non-topology mail deckies. + +DECNET runs in three deployment shapes that emit running deckies: + +* **MazeNET topologies** — each topology owns its own + :attr:`Topology.email_personas` JSON list; the scheduler walks back + from the mail decky to its parent topology row. +* **Unihost fleet** — MACVLAN/IPVLAN deckies that have no + parent topology row at all. They share one host-wide pool. +* **SWARM shards** — DeckyShard rows on enrolled workers. + Same shape as fleet for emailgen purposes (no parent topology row), + so they read the same global pool. + +This module owns the global pool: a JSON file on disk that operators +populate via ``decnet emailgen import-personas `` (or by editing +the file directly). The file is loaded lazily on first read and +re-loaded on mtime change so a CLI import takes effect for the running +worker without a restart. + +Path resolution order: + +1. ``DECNET_EMAILGEN_PERSONAS`` environment variable — explicit override. +2. ``/etc/decnet/email_personas.json`` — canonical master path; this is + what ``decnet init`` will eventually own. +3. ``~/.decnet/email_personas.json`` — dev fallback so a developer can + exercise the worker without root or ``decnet init``. + +When the file is missing / empty / unparseable, the pool is empty and +the scheduler skips fleet/shard mail deckies the same way it skips a +topology with too few personas. No silent fallback to dummy personas; +silence is correct when there's no opinion to convey. +""" +from __future__ import annotations + +import os +import threading +from pathlib import Path +from typing import Optional + +from decnet.logging import get_logger +from decnet.orchestrator.emailgen.personas import EmailPersona, parse_personas + +logger = get_logger("orchestrator.emailgen") + +_ENV_VAR = "DECNET_EMAILGEN_PERSONAS" +_SYSTEM_PATH = Path("/etc/decnet/email_personas.json") + + +def _user_path() -> Path: + return Path(os.path.expanduser("~/.decnet/email_personas.json")) + + +def resolve_path() -> Path: + """Return the path the global pool would load from right now. + + The file may not exist; callers are expected to handle that. The + function is pure (no I/O) so the ``decnet emailgen import-personas`` + CLI can ask "where would I write to?" without touching the disk. + """ + override = os.environ.get(_ENV_VAR, "").strip() + if override: + return Path(override) + if _SYSTEM_PATH.parent.exists() or _SYSTEM_PATH.exists(): + return _SYSTEM_PATH + return _user_path() + + +# ── Cache ──────────────────────────────────────────────────────────────────── +# Lock-protected because two scheduler ticks could race on the first load, +# and the read path is hot enough (every tick, every fleet/shard mail +# decky) that re-parsing on every call is wasteful. + +_lock = threading.Lock() +_cache: list[EmailPersona] = [] +_cache_path: Optional[Path] = None +_cache_mtime: float = 0.0 + + +def load(*, language_default: str = "en") -> list[EmailPersona]: + """Return the parsed global persona pool. + + *language_default* fills in any persona missing a ``language`` field; + fleet/shard sources have no topology-level default, so callers + should pass the worker's best guess (typically ``"en"``). + + Threadsafe and cheap on the steady state (mtime check + dict lookup); + expensive only when the file changed since the last call. + """ + path = resolve_path() + try: + st = path.stat() + except OSError: + with _lock: + global _cache, _cache_path, _cache_mtime + _cache = [] + _cache_path = path + _cache_mtime = 0.0 + return [] + + with _lock: + if ( + _cache_path == path + and _cache_mtime == st.st_mtime + and _cache # non-empty cache; empty re-parses cheaply anyway + ): + return _cache + + try: + raw = path.read_text(encoding="utf-8") + except OSError as exc: + logger.warning("emailgen global pool: read failed path=%s: %s", path, exc) + return [] + + parsed = parse_personas(raw, language_default=language_default) + with _lock: + _cache = parsed + _cache_path = path + _cache_mtime = st.st_mtime + if parsed: + logger.info( + "emailgen global pool: loaded %d personas from %s", len(parsed), path, + ) + return parsed + + +def reset_cache() -> None: + """Clear the in-process cache. + + Test-only helper — avoids stale state when several tests in the + same process exercise different on-disk pools. + """ + global _cache, _cache_path, _cache_mtime + with _lock: + _cache = [] + _cache_path = None + _cache_mtime = 0.0 diff --git a/decnet/orchestrator/emailgen/scheduler.py b/decnet/orchestrator/emailgen/scheduler.py index cbaec9c0..e0cfa276 100644 --- a/decnet/orchestrator/emailgen/scheduler.py +++ b/decnet/orchestrator/emailgen/scheduler.py @@ -25,6 +25,7 @@ from datetime import datetime from typing import Any, Optional from decnet.logging import get_logger +from decnet.orchestrator.emailgen import global_pool from decnet.orchestrator.emailgen.personas import ( EmailPersona, in_active_hours, @@ -104,55 +105,81 @@ def _is_mail_decky(decky: dict[str, Any]) -> bool: return any(s in services for s in _MAIL_SERVICES) +async def _resolve_personas( + repo: Any, mail_decky: dict[str, Any], +) -> tuple[list[EmailPersona], str]: + """Pick the right persona source for *mail_decky* and return the list. + + Returns ``(personas, source_label)`` so logs can disambiguate why a + tick was skipped. Source label is the same string ``list_running_deckies`` + sets on the row (``"topology" | "fleet" | "shard"``) so the logger + reads consistently against the rest of the orchestrator. + + Resolution rules (matches the design discussion): + * **topology** source → walk to ``Topology.email_personas``; the + topology owns its own list. Each topology can have different + personas. + * **fleet** / **shard** source → unihost MACVLAN/IPVLAN deckies and + SWARM shards have no parent topology row, so they share a single + host-wide pool loaded from disk by :mod:`global_pool`. + """ + source = mail_decky.get("source") or "unknown" + if source == "topology": + topology_id = mail_decky.get("topology_id") + if not topology_id: + return [], source + topology = await repo.get_topology(topology_id) + if not topology: + return [], source + return ( + parse_personas( + topology.get("email_personas"), + language_default=topology.get("language_default") or "en", + ), + source, + ) + # Fleet / shard / anything else → global pool. + return global_pool.load(), source + + async def pick( repo: Any, *, rand: Optional[secrets.SystemRandom] = None, now: Optional[datetime] = None, ) -> Optional[EmailAction]: - """Pick one email action against the running fleet. + """Pick one email action against any running mail decky. - *repo* is a :class:`BaseRepository`; we fetch running topology - deckies + their parent topology row directly. *now* is the - wall-clock used for ``active_hours`` filtering — injected so tests - can pin the hour deterministically. + Mail-decky discovery uses the **union view** (``list_running_deckies``): + MazeNET topology deckies, unihost fleet deckies, and SWARM shards are + all eligible. Persona source is per-decky-source; see + :func:`_resolve_personas`. *now* is the wall-clock used for + ``active_hours`` filtering — injected so tests can pin the hour + deterministically. """ rng = rand or secrets.SystemRandom() now_dt = now or datetime.now() - deckies = await repo.list_running_topology_deckies() + deckies = await repo.list_running_deckies() mail_deckies = [d for d in deckies if _is_mail_decky(d)] if not mail_deckies: logger.debug("emailgen pick: no running mail decky") return None mail_decky = rng.choice(mail_deckies) - topology_id = mail_decky.get("topology_id") - if not topology_id: - logger.debug("emailgen pick: mail decky has no topology_id") - return None - - topology = await repo.get_topology(topology_id) - if not topology: - logger.debug("emailgen pick: topology %s not found", topology_id) - return None - - personas = parse_personas( - topology.get("email_personas"), - language_default=topology.get("language_default") or "en", - ) + personas, source = await _resolve_personas(repo, mail_decky) if len(personas) < 2: logger.debug( - "emailgen pick: topology=%s has only %d personas; need >=2", - topology_id, len(personas), + "emailgen pick: source=%s mail_decky=%s only %d personas; need >=2", + source, mail_decky.get("uuid"), len(personas), ) return None active = [p for p in personas if in_active_hours(p, now_dt.hour)] if len(active) < 2: logger.debug( - "emailgen pick: topology=%s only %d personas in-hours", - topology_id, len(active), + "emailgen pick: source=%s mail_decky=%s only %d personas in-hours", + source, mail_decky.get("uuid"), len(active), ) return None diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 274c9e52..9a43eaec 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -2119,13 +2119,17 @@ class SQLModelRepository(BaseRepository): async def list_running_deckies(self) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] - # MazeNET — already shaped {uuid, name, ip, services} + # MazeNET — already shaped {uuid, name, ip, services}. We carry + # topology_id through so consumers (emailgen scheduler) can walk + # back to the parent topology row without a second round-trip; + # fleet/shard rows never have one, hence Optional. for d in await self.list_running_topology_deckies(): out.append({ "uuid": d.get("uuid"), "name": d.get("name"), "ip": d.get("ip"), "services": d.get("services") or [], + "topology_id": d.get("topology_id"), "source": "topology", }) # Fleet — column is `decky_ip`, PK is composite (host_uuid, name) diff --git a/tests/cli/test_emailgen_import_personas.py b/tests/cli/test_emailgen_import_personas.py new file mode 100644 index 00000000..326493a6 --- /dev/null +++ b/tests/cli/test_emailgen_import_personas.py @@ -0,0 +1,129 @@ +"""``decnet emailgen import-personas`` CLI command.""" +from __future__ import annotations + +import json + +import pytest +from typer.testing import CliRunner + +from decnet.cli import app +from decnet.orchestrator.emailgen import global_pool + + +@pytest.fixture(autouse=True) +def _reset_pool(): + global_pool.reset_cache() + yield + global_pool.reset_cache() + + +_TWO = [ + { + "name": "John Smith", + "email": "john@corp.com", + "role": "COO", + "tone": "formal", + "mannerisms": ["uses 'Best regards'"], + }, + { + "name": "Sarah Johnson", + "email": "sarah@corp.com", + "role": "PM", + "tone": "direct", + "mannerisms": ["uses bullets"], + }, +] + + +def test_import_personas_writes_canonical_file(tmp_path, monkeypatch): + src = tmp_path / "src.json" + src.write_text(json.dumps(_TWO)) + dest = tmp_path / "global_pool.json" + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(dest)) + + result = CliRunner().invoke( + app, ["emailgen", "import-personas", str(src)] + ) + assert result.exit_code == 0, result.stdout + assert dest.exists() + written = json.loads(dest.read_text()) + assert {p["email"] for p in written} == {"john@corp.com", "sarah@corp.com"} + + +def test_import_personas_explicit_output_overrides_env(tmp_path, monkeypatch): + src = tmp_path / "src.json" + src.write_text(json.dumps(_TWO)) + env_dest = tmp_path / "env.json" + explicit = tmp_path / "explicit.json" + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(env_dest)) + + result = CliRunner().invoke( + app, + ["emailgen", "import-personas", str(src), "--output", str(explicit)], + ) + assert result.exit_code == 0, result.stdout + assert explicit.exists() + assert not env_dest.exists() + + +def test_import_personas_rejects_invalid_json(tmp_path): + src = tmp_path / "src.json" + src.write_text("{not valid") + result = CliRunner().invoke( + app, ["emailgen", "import-personas", str(src)] + ) + assert result.exit_code != 0 + assert "Invalid JSON" in result.stdout + + +def test_import_personas_rejects_non_list(tmp_path, monkeypatch): + src = tmp_path / "src.json" + src.write_text(json.dumps({"not": "a list"})) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(tmp_path / "out.json")) + result = CliRunner().invoke( + app, ["emailgen", "import-personas", str(src)] + ) + assert result.exit_code != 0 + assert "list" in result.stdout.lower() + + +def test_import_personas_rejects_all_invalid_entries(tmp_path, monkeypatch): + src = tmp_path / "src.json" + src.write_text(json.dumps([ + {"name": "broken", "email": "no-at-symbol"}, + ])) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(tmp_path / "out.json")) + result = CliRunner().invoke( + app, ["emailgen", "import-personas", str(src)] + ) + assert result.exit_code != 0 + assert "No valid personas" in result.stdout + + +def test_import_personas_warns_on_single_persona(tmp_path, monkeypatch): + src = tmp_path / "src.json" + src.write_text(json.dumps(_TWO[:1])) + dest = tmp_path / "out.json" + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(dest)) + result = CliRunner().invoke( + app, ["emailgen", "import-personas", str(src)] + ) + assert result.exit_code == 0, result.stdout + assert "Warning" in result.stdout + assert dest.exists() + + +def test_imported_personas_load_via_global_pool(tmp_path, monkeypatch): + src = tmp_path / "src.json" + src.write_text(json.dumps(_TWO)) + dest = tmp_path / "out.json" + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(dest)) + + result = CliRunner().invoke( + app, ["emailgen", "import-personas", str(src)] + ) + assert result.exit_code == 0, result.stdout + + personas = global_pool.load() + assert len(personas) == 2 + assert {p.email for p in personas} == {"john@corp.com", "sarah@corp.com"} diff --git a/tests/orchestrator/emailgen/test_driver.py b/tests/orchestrator/emailgen/test_driver.py index 5be05e84..88b8d913 100644 --- a/tests/orchestrator/emailgen/test_driver.py +++ b/tests/orchestrator/emailgen/test_driver.py @@ -135,6 +135,15 @@ async def test_driver_run_success_path(monkeypatch): # Two subprocess calls: ollama, then docker exec. assert calls[0][0] == "ollama" assert calls[1][0] == "docker" + # docker exec shell command must include `touch -d` so the file's + # mtime matches the EML's Date: header — otherwise the spool's + # `ls -lt` clusters every email inside the worker tick window. + docker_sh = calls[1][-1] + assert "touch -d" in docker_sh + assert "tee" in docker_sh + # And tee must come before touch so we don't touch a file that + # doesn't exist yet. + assert docker_sh.index("tee") < docker_sh.index("touch -d") @pytest.mark.asyncio diff --git a/tests/orchestrator/emailgen/test_global_pool.py b/tests/orchestrator/emailgen/test_global_pool.py new file mode 100644 index 00000000..0d0cdb9c --- /dev/null +++ b/tests/orchestrator/emailgen/test_global_pool.py @@ -0,0 +1,99 @@ +"""Global persona pool — disk-backed source for fleet/shard mail deckies.""" +from __future__ import annotations + +import json + +import pytest + +from decnet.orchestrator.emailgen import global_pool + + +@pytest.fixture(autouse=True) +def _reset(): + global_pool.reset_cache() + yield + global_pool.reset_cache() + + +_TWO = [ + { + "name": "John Smith", + "email": "john@corp.com", + "role": "COO", + "tone": "formal", + "mannerisms": ["uses 'Best regards'"], + }, + { + "name": "Sarah Johnson", + "email": "sarah@corp.com", + "role": "PM", + "tone": "direct", + "mannerisms": ["uses bullets"], + }, +] + + +def test_load_returns_empty_when_file_missing(tmp_path, monkeypatch): + monkeypatch.setenv( + "DECNET_EMAILGEN_PERSONAS", str(tmp_path / "does-not-exist.json") + ) + assert global_pool.load() == [] + + +def test_load_returns_parsed_personas(tmp_path, monkeypatch): + f = tmp_path / "personas.json" + f.write_text(json.dumps(_TWO)) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(f)) + personas = global_pool.load() + assert len(personas) == 2 + assert {p.email for p in personas} == {"john@corp.com", "sarah@corp.com"} + + +def test_load_resolves_language_default(tmp_path, monkeypatch): + f = tmp_path / "personas.json" + f.write_text(json.dumps(_TWO)) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(f)) + personas = global_pool.load(language_default="es") + assert all(p.language == "es" for p in personas) + + +def test_load_invalid_json_returns_empty(tmp_path, monkeypatch): + f = tmp_path / "personas.json" + f.write_text("{not valid") + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(f)) + assert global_pool.load() == [] + + +def test_load_caches_until_mtime_changes(tmp_path, monkeypatch): + f = tmp_path / "personas.json" + f.write_text(json.dumps(_TWO)) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(f)) + + first = global_pool.load() + assert len(first) == 2 + + # Re-write with a single persona; bump mtime so the cache invalidates. + import time as _time + _time.sleep(0.01) + f.write_text(json.dumps(_TWO[:1])) + import os + os.utime(f, None) + + second = global_pool.load() + assert len(second) == 1 + + +def test_resolve_path_honours_env_override(tmp_path, monkeypatch): + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(tmp_path / "x.json")) + assert global_pool.resolve_path() == tmp_path / "x.json" + + +def test_resolve_path_falls_back_to_user_path_when_system_missing(monkeypatch): + monkeypatch.delenv("DECNET_EMAILGEN_PERSONAS", raising=False) + # In a typical dev box /etc/decnet/ doesn't exist; the resolver + # should pick ~/.decnet/email_personas.json. + p = global_pool.resolve_path() + # We don't assert the exact path (depends on whether /etc/decnet + # exists on the test host), only that it ends with the canonical + # filename and isn't an empty path. + assert p.name == "email_personas.json" diff --git a/tests/orchestrator/emailgen/test_scheduler.py b/tests/orchestrator/emailgen/test_scheduler.py index a0519a3e..f3010d40 100644 --- a/tests/orchestrator/emailgen/test_scheduler.py +++ b/tests/orchestrator/emailgen/test_scheduler.py @@ -7,7 +7,14 @@ from typing import Any import pytest -from decnet.orchestrator.emailgen import scheduler +from decnet.orchestrator.emailgen import global_pool, scheduler + + +@pytest.fixture(autouse=True) +def _reset_global_pool(): + global_pool.reset_cache() + yield + global_pool.reset_cache() _PERSONAS_TWO = [ @@ -43,7 +50,7 @@ class _FakeRepo: self.threads = threads or [] self.thread_calls = 0 - async def list_running_topology_deckies(self): + async def list_running_deckies(self): return self.deckies async def get_topology(self, topology_id: str): @@ -54,12 +61,19 @@ class _FakeRepo: return list(self.threads) -def _decky(uuid="d1", name="mailhost", services=("imap",), topology_id="t1"): +def _decky( + uuid="d1", + name="mailhost", + services=("imap",), + topology_id="t1", + source="topology", +): return { "uuid": uuid, "name": name, "services": list(services), "topology_id": topology_id, + "source": source, } @@ -127,6 +141,70 @@ async def test_pick_uses_pop3_decky_too(): assert action is not None +@pytest.mark.asyncio +async def test_pick_for_fleet_source_uses_global_pool(tmp_path, monkeypatch): + """Fleet (MACVLAN/IPVLAN) mail decky has no parent topology row; + personas come from the host-wide JSON file.""" + pool_file = tmp_path / "personas.json" + pool_file.write_text(json.dumps(_PERSONAS_TWO)) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(pool_file)) + + repo = _FakeRepo( + deckies=[_decky(source="fleet", topology_id=None)], + # No topology row — confirms we never walk back to the topology. + ) + action = await scheduler.pick(repo, now=datetime(2026, 4, 26, 12, 0, 0)) + assert action is not None + assert action.mail_decky_uuid == "d1" + + +@pytest.mark.asyncio +async def test_pick_for_shard_source_uses_global_pool(tmp_path, monkeypatch): + """SWARM shards are non-topology too — same path as fleet.""" + pool_file = tmp_path / "personas.json" + pool_file.write_text(json.dumps(_PERSONAS_TWO)) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(pool_file)) + + repo = _FakeRepo( + deckies=[_decky(source="shard", topology_id=None)], + ) + action = await scheduler.pick(repo, now=datetime(2026, 4, 26, 12, 0, 0)) + assert action is not None + + +@pytest.mark.asyncio +async def test_pick_fleet_with_empty_global_pool_returns_none(tmp_path, monkeypatch): + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(tmp_path / "missing.json")) + repo = _FakeRepo(deckies=[_decky(source="fleet", topology_id=None)]) + assert await scheduler.pick(repo, now=datetime(2026, 4, 26, 12, 0, 0)) is None + + +@pytest.mark.asyncio +async def test_topology_personas_isolated_from_global_pool(tmp_path, monkeypatch): + """A topology with its own personas must NOT leak into / pull from + the global pool — per-topology richness is the whole point.""" + pool_file = tmp_path / "personas.json" + pool_file.write_text(json.dumps([{ + "name": "Pool Persona", + "email": "pool@corp.com", + "role": "Pooler", + "tone": "casual", + "mannerisms": [], + }])) + monkeypatch.setenv("DECNET_EMAILGEN_PERSONAS", str(pool_file)) + + repo = _FakeRepo( + deckies=[_decky()], + topologies={"t1": _topology()}, # topology has _PERSONAS_TWO + ) + action = await scheduler.pick(repo, now=datetime(2026, 4, 26, 12, 0, 0)) + assert action is not None + # The chosen sender + recipient must come from the topology's pool, + # not the global one — pool@corp.com would be a leak. + assert action.sender.email != "pool@corp.com" + assert action.recipient.email != "pool@corp.com" + + @pytest.mark.asyncio async def test_pick_reply_chain_sets_in_reply_to(): threads = [{