From 3ee55ec3418370215929f03250ba2c92f156b84b Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 22:16:19 -0400 Subject: [PATCH] feat(emailgen): Ollama-driven fake email worker for IMAP/POP3 deckies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second orchestrator worker (decnet emailgen) that drips persona-driven, threaded, multi-language fake emails into running mail deckies. Personas live on Topology.email_personas; topology-wide language_default falls through to any persona that doesn't pin its own. Em-dashes are suppressed at the prompt layer by default and only lifted for personas explicitly marked uses_llms_heavily — em-dashes are an LLM tell and a flat corpus of em-dashed mail is a giveaway. EML delivery writes into /var/spool/decnet-emails//.eml on the mail decky via docker exec; wiring the IMAP/POP3 templates to read from that spool (replacing the hardcoded _BAIT_EMAILS) is the next step. --- decnet/bus/topics.py | 6 + decnet/cli/__init__.py | 3 +- decnet/cli/emailgen.py | 66 +++++ decnet/orchestrator/drivers/email.py | 272 ++++++++++++++++++ decnet/orchestrator/emailgen/__init__.py | 33 +++ decnet/orchestrator/emailgen/events.py | 49 ++++ decnet/orchestrator/emailgen/personas.py | 119 ++++++++ decnet/orchestrator/emailgen/prompt.py | 151 ++++++++++ decnet/orchestrator/emailgen/scheduler.py | 228 +++++++++++++++ decnet/orchestrator/emailgen/threads.py | 75 +++++ decnet/orchestrator/emailgen/worker.py | 131 +++++++++ decnet/web/db/models/__init__.py | 4 + decnet/web/db/models/orchestrator.py | 49 ++++ decnet/web/db/models/topology.py | 12 + decnet/web/db/repository.py | 54 ++++ decnet/web/db/sqlmodel_repo.py | 111 +++++++ tests/orchestrator/emailgen/__init__.py | 0 tests/orchestrator/emailgen/test_driver.py | 169 +++++++++++ tests/orchestrator/emailgen/test_events.py | 72 +++++ tests/orchestrator/emailgen/test_personas.py | 101 +++++++ tests/orchestrator/emailgen/test_prompt.py | 152 ++++++++++ tests/orchestrator/emailgen/test_repo.py | 129 +++++++++ tests/orchestrator/emailgen/test_scheduler.py | 161 +++++++++++ tests/orchestrator/emailgen/test_threads.py | 61 ++++ .../emailgen/test_worker_integration.py | 136 +++++++++ 25 files changed, 2343 insertions(+), 1 deletion(-) create mode 100644 decnet/cli/emailgen.py create mode 100644 decnet/orchestrator/drivers/email.py create mode 100644 decnet/orchestrator/emailgen/__init__.py create mode 100644 decnet/orchestrator/emailgen/events.py create mode 100644 decnet/orchestrator/emailgen/personas.py create mode 100644 decnet/orchestrator/emailgen/prompt.py create mode 100644 decnet/orchestrator/emailgen/scheduler.py create mode 100644 decnet/orchestrator/emailgen/threads.py create mode 100644 decnet/orchestrator/emailgen/worker.py create mode 100644 tests/orchestrator/emailgen/__init__.py create mode 100644 tests/orchestrator/emailgen/test_driver.py create mode 100644 tests/orchestrator/emailgen/test_events.py create mode 100644 tests/orchestrator/emailgen/test_personas.py create mode 100644 tests/orchestrator/emailgen/test_prompt.py create mode 100644 tests/orchestrator/emailgen/test_repo.py create mode 100644 tests/orchestrator/emailgen/test_scheduler.py create mode 100644 tests/orchestrator/emailgen/test_threads.py create mode 100644 tests/orchestrator/emailgen/test_worker_integration.py diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 9f456efb..611fe3cc 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -12,6 +12,7 @@ Token structure (NATS-style, dot-separated): decky.{decky_id}.traffic orchestrator.traffic.{decky_id} orchestrator.file.{decky_id} + orchestrator.email.{decky_id} attacker.observed attacker.scored attacker.session.started @@ -172,6 +173,11 @@ CREDENTIAL_REUSE_DETECTED = "reuse.detected" # stream via ``orchestrator.*.``. ORCHESTRATOR_TRAFFIC = "traffic" ORCHESTRATOR_FILE = "file" +# Emailgen — published by the ``decnet emailgen`` worker once per generated +# fake email delivered into a mail decky's maildir. Third token is the +# destination mail-decky uuid (the IMAP/POP3 host serving the mailbox), +# matching the ``orchestrator.*.`` subscription pattern. +ORCHESTRATOR_EMAIL = "email" # System event types. SYSTEM_LOG = "log" diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index 2231bea1..39bfe3b7 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -24,6 +24,7 @@ from . import ( bus, db, deploy, + emailgen, forwarder, geoip, init, @@ -56,7 +57,7 @@ for _mod in ( api, swarmctl, agent, updater, listener, forwarder, swarm, deploy, lifecycle, workers, inventory, - web, profiler, orchestrator, reconciler, sniffer, db, + web, profiler, orchestrator, emailgen, reconciler, sniffer, db, topology, bus, geoip, init, webhook, ): _mod.register(app) diff --git a/decnet/cli/emailgen.py b/decnet/cli/emailgen.py new file mode 100644 index 00000000..60e332a3 --- /dev/null +++ b/decnet/cli/emailgen.py @@ -0,0 +1,66 @@ +"""``decnet emailgen`` — second orchestrator worker. + +Sibling of :mod:`decnet.cli.orchestrator`. Two distinct CLI entrypoints +match the "workers are independent, never coupled" principle: a wedged +ollama call in emailgen does not stall the SSH-flavoured orchestrator, +and systemd supervises each loop separately. +""" +from __future__ import annotations + +import os + +import typer + +from . import utils as _utils +from .utils import console, log + + +def register(app: typer.Typer) -> None: + @app.command(name="emailgen") + def emailgen_cmd( + interval: int = typer.Option( + 300, "--interval", "-i", + help="Seconds between fake-email generation ticks (default 5m)", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + model: str = typer.Option( + "", "--model", "-m", + help="Ollama model override (defaults to $DECNET_EMAILGEN_MODEL " + "or 'llama3.1')", + ), + ) -> None: + """Drip fake corporate emails into running IMAP/POP3 mail deckies.""" + import asyncio + from decnet.orchestrator.emailgen import emailgen_worker + from decnet.web.dependencies import repo + + if daemon: + log.info("emailgen daemonizing interval=%d", interval) + _utils._daemonize() + + # Honour the env var when the flag was left empty so systemd unit + # files can configure the model centrally without per-host CLI + # tweaks. Empty -> let the worker apply its own default. + resolved_model = model or os.environ.get("DECNET_EMAILGEN_MODEL", "") + log.info( + "emailgen starting interval=%d model=%s", + interval, resolved_model or "default", + ) + console.print( + f"[bold cyan]Emailgen starting[/] (interval: {interval}s" + f"{', model: ' + resolved_model if resolved_model else ''})" + ) + + async def _run() -> None: + await repo.initialize() + await emailgen_worker( + repo, interval=interval, model=resolved_model or None, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Emailgen stopped.[/]") diff --git a/decnet/orchestrator/drivers/email.py b/decnet/orchestrator/drivers/email.py new file mode 100644 index 00000000..c30fe228 --- /dev/null +++ b/decnet/orchestrator/drivers/email.py @@ -0,0 +1,272 @@ +"""Email driver — Ollama-backed EML generation + decky-side delivery. + +One :class:`EmailAction` becomes one EML written into the mail decky's +configured emailgen spool directory (``/var/spool/decnet-emails/`` by +default). An integration follow-up wires the IMAP/POP3 service templates +to read EMLs from that spool at request time so attackers see the +generated mail in their MUA. + +The Ollama call shells out via ``ollama run `` — the prototype at +``DECNET-EMAILs/main.py`` proved the round-trip works. Output is +parsed-and-repaired into a valid EML using :mod:`email.mime.*`; the +worker then ``docker exec``\\s a ``tee`` to drop the file inside the +target container. + +Per CLAUDE.md "no shell strings": every subprocess invocation uses an +argv list, never ``shell=True``. Ollama prompts and EML payloads are +piped via ``stdin``, not interpolated into argv. +""" +from __future__ import annotations + +import asyncio +import os +import shlex +import time +from datetime import datetime, timezone +from email.mime.text import MIMEText +from email.utils import formatdate +from typing import Any, Optional + +from decnet.logging import get_logger +from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.emailgen.prompt import PromptInputs, build as build_prompt +from decnet.orchestrator.emailgen.scheduler import EmailAction +from decnet.orchestrator.emailgen.threads import new_message_id + +log = get_logger("orchestrator.email") + +_DOCKER = "docker" +_OLLAMA = "ollama" +# Wall-clock cap for the LLM call. Big enough for a 4070 running +# llama3.1; small enough that a stuck Ollama server doesn't wedge the +# emailgen tick. +_DEFAULT_OLLAMA_TIMEOUT = float(os.environ.get("DECNET_EMAILGEN_TIMEOUT", "60")) +_DEFAULT_MODEL = os.environ.get("DECNET_EMAILGEN_MODEL", "llama3.1") +# docker-exec wall-clock cap for the per-EML write. +_DOCKER_TIMEOUT = 8.0 +# Container suffix for the IMAP service on a mail decky. +_IMAP_CONTAINER_SUFFIX = "-imap" +_POP3_CONTAINER_SUFFIX = "-pop3" +# Spool path inside the container. Match the IMAP template's stubbed +# IMAP_EMAIL_SEED location once wiring lands; shipping the constant now +# lets that integration land independently. +_SPOOL_DIR = "/var/spool/decnet-emails" + + +async def _run_capture( + argv: list[str], + *, + stdin_data: Optional[bytes] = None, + timeout: float = _DOCKER_TIMEOUT, +) -> tuple[int, str, str]: + """Spawn *argv*, optionally feeding *stdin_data*. Never raises.""" + try: + proc = await asyncio.create_subprocess_exec( + *argv, + stdin=asyncio.subprocess.PIPE if stdin_data is not None else None, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except FileNotFoundError as exc: + return 127, "", f"argv[0] not found: {exc}" + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(stdin_data), timeout=timeout, + ) + except asyncio.TimeoutError: + try: + proc.kill() + except ProcessLookupError: + pass + return 124, "", "timeout" + return ( + proc.returncode if proc.returncode is not None else -1, + stdout.decode("utf-8", "replace"), + stderr.decode("utf-8", "replace"), + ) + + +def _container_for(decky_name: str, services: list[str]) -> str: + """Pick the IMAP container if present, else POP3. Names follow the + ``-`` convention from the service templates.""" + if "imap" in services: + return f"{decky_name}{_IMAP_CONTAINER_SUFFIX}" + return f"{decky_name}{_POP3_CONTAINER_SUFFIX}" + + +def _parse_subject_and_body(ollama_output: str) -> tuple[str, str]: + """Split LLM output into (subject, body). + + The prompt asks for ``Subject: \\n\\n``. When the + model misbehaves (e.g. wraps in markdown fences or skips the + Subject line), fall back to a generic subject and treat the whole + output as body. Never raises. + """ + text = ollama_output.strip() + # Strip code fences if the model wrapped output. + if text.startswith("```"): + nl = text.find("\n") + if nl > 0: + text = text[nl + 1:] + if text.endswith("```"): + text = text[: -3] + text = text.strip() + lines = text.splitlines() + if lines and lines[0].lower().startswith("subject:"): + subject = lines[0].split(":", 1)[1].strip() + # Drop the (possibly empty) blank line after Subject. + body_lines = lines[1:] + if body_lines and not body_lines[0].strip(): + body_lines = body_lines[1:] + body = "\n".join(body_lines).strip() + if not subject: + subject = "Business Communication" + return subject, body + return "Business Communication", text + + +def _build_eml( + *, + sender_name: str, + sender_email: str, + recipient_name: str, + recipient_email: str, + subject: str, + body: str, + message_id: str, + in_reply_to: Optional[str], + references: str, + ts: datetime, +) -> bytes: + """Assemble a valid plain-text RFC 2822 EML.""" + msg = MIMEText(body, "plain", "utf-8") + msg["From"] = f"{sender_name} <{sender_email}>" + msg["To"] = f"{recipient_name} <{recipient_email}>" + msg["Subject"] = subject + msg["Date"] = formatdate(ts.timestamp(), localtime=False) + msg["Message-ID"] = message_id + if in_reply_to: + msg["In-Reply-To"] = in_reply_to + if references: + msg["References"] = references + msg["MIME-Version"] = "1.0" + return msg.as_bytes() + + +class EmailDriver: + """Concrete driver for :class:`EmailAction`. + + Stateless across calls — Ollama model + timeout are constructor + args, not per-call. The driver does *not* know about the bus or + DB; it returns an :class:`ActivityResult` that the worker pipes + onward. + """ + + def __init__( + self, + *, + model: str = _DEFAULT_MODEL, + ollama_timeout: float = _DEFAULT_OLLAMA_TIMEOUT, + spool_dir: str = _SPOOL_DIR, + ) -> None: + self.model = model + self.ollama_timeout = ollama_timeout + self.spool_dir = spool_dir + + async def run(self, action: EmailAction) -> ActivityResult: + # Look up the mail-decky container name + services. The driver + # receives a denormalised view via the action — the worker + # populates it from the same list the scheduler used. + return await self._run_email(action) + + async def _run_email(self, action: EmailAction) -> ActivityResult: + t0 = time.monotonic() + prompt, mannerisms_used = build_prompt( + PromptInputs( + sender=action.sender, + recipient=action.recipient, + context_hint=action.context_hint, + parent_subject=action.subject_hint, + parent_excerpt=action.parent_excerpt, + ) + ) + rc, stdout, stderr = await _run_capture( + [_OLLAMA, "run", self.model], + stdin_data=prompt.encode("utf-8"), + timeout=self.ollama_timeout, + ) + gen_ms = int((time.monotonic() - t0) * 1000) + if rc != 0 or not stdout.strip(): + log.warning( + "emailgen ollama failed rc=%d stderr=%r model=%s", + rc, stderr[:200], self.model, + ) + return ActivityResult( + success=False, + payload={ + "stage": "ollama", + "rc": rc, + "stderr": stderr.strip()[:256], + "generation_ms": gen_ms, + "model": self.model, + "thread_id": action.thread_id, + }, + ) + + subject, body = _parse_subject_and_body(stdout) + message_id = new_message_id(action.sender.email.split("@", 1)[1]) + ts = datetime.now(timezone.utc) + eml_bytes = _build_eml( + sender_name=action.sender.name, + sender_email=action.sender.email, + recipient_name=action.recipient.name, + recipient_email=action.recipient.email, + subject=subject, + body=body, + message_id=message_id, + in_reply_to=action.parent_message_id, + references=action.references, + ts=ts, + ) + + # Drop the EML into the mail decky's spool dir over docker exec. + # File path: //.eml. + # Per-thread sub-directory keeps `ls` in the spool readable by + # operators inspecting the running decoy. + eml_filename = message_id.strip("<>").replace("@", "_at_") + ".eml" + eml_dir = f"{self.spool_dir.rstrip('/')}/{action.thread_id}" + eml_path = f"{eml_dir}/{eml_filename}" + container = _container_for( + action.mail_decky_name, list(action.mail_decky_services), + ) + sh_cmd = ( + f"mkdir -p {shlex.quote(eml_dir)} && " + f"tee {shlex.quote(eml_path)} >/dev/null" + ) + argv = [_DOCKER, "exec", "-i", container, "sh", "-c", sh_cmd] + rc2, _stdout2, stderr2 = await _run_capture( + argv, stdin_data=eml_bytes, timeout=_DOCKER_TIMEOUT, + ) + success = rc2 == 0 + payload: dict[str, Any] = { + "stage": "delivered" if success else "delivery", + "model": self.model, + "generation_ms": gen_ms, + "bytes": len(eml_bytes), + "thread_id": action.thread_id, + "message_id": message_id, + "subject": subject, + "language": action.sender.language or "en", + "mannerisms_used": mannerisms_used, + "is_reply": action.is_reply, + "container": container, + "eml_path": eml_path, + "rc": rc2, + "stderr": stderr2.strip()[:256] if not success else None, + } + if not success: + log.warning( + "emailgen delivery failed container=%s rc=%d stderr=%r", + container, rc2, stderr2[:200], + ) + return ActivityResult(success=success, payload=payload) diff --git a/decnet/orchestrator/emailgen/__init__.py b/decnet/orchestrator/emailgen/__init__.py new file mode 100644 index 00000000..c8f90a5b --- /dev/null +++ b/decnet/orchestrator/emailgen/__init__.py @@ -0,0 +1,33 @@ +"""Emailgen — second orchestrator worker. + +Generates fake corporate emails (multi-language, threaded, persona-driven) +and drops them into mail-decky maildirs so attackers landing on +IMAP/POP3 honeypots find believable mailboxes instead of empty inboxes. + +The module is intentionally a sibling of :mod:`decnet.orchestrator` (not +a flag on it) — separate worker, separate CLI command +(``decnet emailgen``), separate systemd-supervised lifecycle. Shares the +heartbeat / control-listener scaffolding via :mod:`decnet.bus.publish`. + +Lazy worker re-export: :func:`emailgen_worker` is loaded on first +attribute access so that submodules can import package-level names +(``decnet.orchestrator.emailgen.prompt``) without triggering an eager +load of the worker — and through it, the email driver, which imports +back into this package. Without lazy loading the package + driver + +worker form a cycle. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: # pragma: no cover - typing only + from decnet.orchestrator.emailgen.worker import emailgen_worker # noqa: F401 + +__all__ = ["emailgen_worker"] + + +def __getattr__(name: str) -> Any: + if name == "emailgen_worker": + from decnet.orchestrator.emailgen.worker import emailgen_worker as _w + return _w + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/decnet/orchestrator/emailgen/events.py b/decnet/orchestrator/emailgen/events.py new file mode 100644 index 00000000..dccaac9e --- /dev/null +++ b/decnet/orchestrator/emailgen/events.py @@ -0,0 +1,49 @@ +"""DB-row + bus-topic helpers for the emailgen worker. + +Mirror of :mod:`decnet.orchestrator.events` for the email action class. +Kept in its own module so the SSH-flavoured orchestrator and the +emailgen worker don't accumulate cross-imports of each other's action +types. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from decnet.bus import topics as _topics +from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.emailgen.scheduler import EmailAction + + +def to_row(action: EmailAction, result: ActivityResult) -> dict[str, Any]: + """Build the kwargs dict for ``OrchestratorEmail(**...)``. + + Pulls ``message_id`` / ``subject`` / ``language`` out of the + driver's ``payload`` rather than off the action — the EML's + Message-ID is generated inside the driver after the LLM call so + we know it matches what landed on disk. + """ + payload = result.payload or {} + return { + "ts": datetime.now(timezone.utc), + "mail_decky_uuid": action.mail_decky_uuid, + "thread_id": action.thread_id, + "message_id": payload.get("message_id", ""), + "in_reply_to": action.parent_message_id, + "sender_email": action.sender.email, + "recipient_email": action.recipient.email, + "subject": payload.get("subject", ""), + "language": payload.get("language", action.sender.language or "en"), + "eml_path": payload.get("eml_path", ""), + "success": result.success, + "payload": payload, # repo serialises dict→json + } + + +def topic_for(action: EmailAction) -> str: + """Map an email action to its bus topic.""" + return _topics.orchestrator(_topics.ORCHESTRATOR_EMAIL, action.mail_decky_uuid) + + +def event_type_for(action: EmailAction) -> str: # noqa: ARG001 — symmetry + return _topics.ORCHESTRATOR_EMAIL diff --git a/decnet/orchestrator/emailgen/personas.py b/decnet/orchestrator/emailgen/personas.py new file mode 100644 index 00000000..4f909f3c --- /dev/null +++ b/decnet/orchestrator/emailgen/personas.py @@ -0,0 +1,119 @@ +"""Persona schema for the emailgen worker. + +Stored as a JSON list on :attr:`Topology.email_personas`. Each persona +describes one fictional employee whose mailbox lives on the topology's +IMAP/POP3 decky. The schema deliberately stays narrow: the LLM gets +*enough* differentiation to write distinct voices, no more. + +Invalid entries are dropped with a warning (returned alongside the +parsed list) rather than raising — a single typo in one persona must +not stall the entire emailgen tick. +""" +from __future__ import annotations + +import json +from typing import Literal, Optional + +from pydantic import BaseModel, Field, ValidationError, field_validator + +from decnet.logging import get_logger + +logger = get_logger("orchestrator.emailgen") + +Tone = Literal["formal", "direct", "casual", "technical"] +ReplyLatency = Literal["fast", "normal", "slow"] + + +class EmailPersona(BaseModel): + """One fake mailbox owner. + + ``language`` is ISO 639-1 (``en``, ``es``, ``pt``…); when unset on the + persona it falls back to the topology's ``language_default``. + ``uses_llms_heavily`` lifts the prompt-layer em-dash suppression for + that persona — em-dashes are an LLM tell, but a persona explicitly + pegged as a heavy LLM user should *naturally* produce them. + """ + name: str = Field(min_length=1, max_length=128) + email: str = Field(min_length=3, max_length=255) + role: str = Field(min_length=1, max_length=128) + tone: Tone = "formal" + mannerisms: list[str] = Field(default_factory=list, max_length=12) + language: Optional[str] = Field(default=None, max_length=8) + signature: Optional[str] = Field(default=None, max_length=512) + active_hours: str = Field(default="09:00-18:00", max_length=32) + reply_latency: ReplyLatency = "normal" + uses_llms_heavily: bool = False + + @field_validator("email") + @classmethod + def _email_shape(cls, v: str) -> str: + # Cheap structural check — full RFC 5322 isn't worth the + # dependency. We only need ``user@domain`` with non-empty parts + # for the prompt builder + Message-ID generator. + if "@" not in v: + raise ValueError("email must contain '@'") + local, _, domain = v.rpartition("@") + if not local or not domain or "." not in domain: + raise ValueError("email must look like user@domain.tld") + return v + + +def parse_personas( + raw: str | list | None, + *, + language_default: str = "en", +) -> list[EmailPersona]: + """Parse the JSON-or-list ``email_personas`` value into models. + + Resolves ``language`` against *language_default* so downstream + consumers (prompt builder, scheduler) never need to know about + fallback semantics. + """ + if not raw: + return [] + if isinstance(raw, str): + try: + raw = json.loads(raw) + except json.JSONDecodeError as exc: + logger.warning("emailgen personas: invalid JSON, skipping: %s", exc) + return [] + if not isinstance(raw, list): + logger.warning( + "emailgen personas: expected list, got %s", type(raw).__name__ + ) + return [] + out: list[EmailPersona] = [] + for i, entry in enumerate(raw): + try: + persona = EmailPersona.model_validate(entry) + except ValidationError as exc: + logger.warning( + "emailgen personas: dropping invalid entry index=%d: %s", + i, exc.errors(include_url=False), + ) + continue + if persona.language is None: + persona = persona.model_copy(update={"language": language_default}) + out.append(persona) + return out + + +def in_active_hours(persona: EmailPersona, now_hour: int) -> bool: + """Return True if *now_hour* (0–23) falls in the persona's window. + + Format: ``"HH:MM-HH:MM"``. Wrap-around windows (``"22:00-06:00"``) + are supported. Invalid windows treat the persona as always-on so a + config typo never silences the whole fleet. + """ + try: + start_s, end_s = persona.active_hours.split("-") + start_h = int(start_s.split(":")[0]) + end_h = int(end_s.split(":")[0]) + except (ValueError, IndexError): + return True + if start_h == end_h: + return True + if start_h < end_h: + return start_h <= now_hour < end_h + # Wrap-around (e.g. 22:00-06:00). + return now_hour >= start_h or now_hour < end_h diff --git a/decnet/orchestrator/emailgen/prompt.py b/decnet/orchestrator/emailgen/prompt.py new file mode 100644 index 00000000..0e678d5a --- /dev/null +++ b/decnet/orchestrator/emailgen/prompt.py @@ -0,0 +1,151 @@ +"""Ollama prompt builder for emailgen. + +The LLM gets a tightly-scoped instruction and a small handful of +deterministic constraints. Persona mannerisms are *pre-selected* in +Python (1–2 of the persona's full list) and injected as hard rules — +small models otherwise treat the mannerism list as flavour text and +ignore it, and the corpus collapses into one voice. + +**Em-dash suppression** is on by default; suppression is lifted only +for personas that opt in via ``uses_llms_heavily``. Em-dashes are a +strong stylometric tell for LLM-authored prose, and a honeypot mailbox +where every author uses them is a tell. +""" +from __future__ import annotations + +import secrets +from dataclasses import dataclass +from typing import Optional + +from decnet.orchestrator.emailgen.personas import EmailPersona + + +@dataclass(frozen=True) +class PromptInputs: + sender: EmailPersona + recipient: EmailPersona + context_hint: str + parent_subject: Optional[str] = None # set when replying + parent_excerpt: Optional[str] = None # short snippet of last msg + + +_LANGUAGE_NAMES = { + "en": "English", + "es": "Spanish", + "pt": "Portuguese", + "fr": "French", + "de": "German", + "it": "Italian", + "nl": "Dutch", + "ja": "Japanese", + "zh": "Chinese", +} + + +def _lang_label(code: str) -> str: + return _LANGUAGE_NAMES.get(code.lower(), code) + + +def select_mannerisms( + persona: EmailPersona, + *, + rng: Optional[secrets.SystemRandom] = None, + n: int = 2, +) -> list[str]: + """Pick *n* mannerisms deterministically given *rng*. + + Returns up to *n*; falls back to the full list when the persona + declares fewer. Determinism (under a seeded RNG) is what makes + tests practical — otherwise mannerism injection is unverifiable. + """ + rnd = rng or secrets.SystemRandom() + pool = list(persona.mannerisms) + if not pool: + return [] + if len(pool) <= n: + return pool + rnd.shuffle(pool) + return pool[:n] + + +def build( + inputs: PromptInputs, + *, + rng: Optional[secrets.SystemRandom] = None, +) -> tuple[str, list[str]]: + """Return ``(prompt, mannerisms_used)``. + + ``mannerisms_used`` flows back into the persisted ``payload`` JSON + so an analyst can see *why* a given email reads the way it does. + """ + sender = inputs.sender + recipient = inputs.recipient + language = _lang_label(sender.language or "en") + mannerisms = select_mannerisms(sender, rng=rng) + mannerism_block = ( + "\n".join(f"- {m}" for m in mannerisms) + if mannerisms + else "- (no specific mannerisms; write in the persona's tone)" + ) + + if sender.uses_llms_heavily: + em_dash_rule = ( + "Em-dashes are fine — this persona uses them naturally. " + "Write in your usual style." + ) + else: + em_dash_rule = ( + "Do NOT use em-dashes (—). Use commas, periods, or " + "parentheses instead. Em-dashes are a tell." + ) + + sig_block = ( + f"Use this exact signature block:\n{sender.signature}" + if sender.signature + else "End with a short, plausible signature for the persona's role." + ) + + if inputs.parent_subject: + thread_block = ( + f"This is a REPLY in an ongoing thread.\n" + f"- Parent subject: {inputs.parent_subject}\n" + f"- Parent excerpt: {inputs.parent_excerpt or '(no excerpt)'}\n" + f"- Begin the body assuming the recipient already read the parent.\n" + ) + subject_rule = ( + "Subject must be the parent subject prefixed with 'Re: ' " + "(no double 'Re: Re:')." + ) + else: + thread_block = "This is a NEW thread (no prior context)." + subject_rule = ( + "Generate a short, specific subject line (≤ 80 chars) " + "appropriate to the context." + ) + + prompt = f"""You are writing one corporate email, RFC 2822 plain-text body only. + +Persona — sender: +- Name: {sender.name} +- Role: {sender.role} +- Tone: {sender.tone} +- Mannerisms (must show through): +{mannerism_block} + +Persona — recipient: +- Name: {recipient.name} +- Role: {recipient.role} + +Context hint: {inputs.context_hint} + +Thread context: +{thread_block} + +Hard rules: +1. Write the email body in {language}. Do not translate or code-switch. +2. {em_dash_rule} +3. {subject_rule} +4. {sig_block} +5. Output ONLY the email — first line is "Subject: ", then a blank line, then the body. No commentary, no markdown fences, no preamble. +""" + return prompt.strip(), mannerisms diff --git a/decnet/orchestrator/emailgen/scheduler.py b/decnet/orchestrator/emailgen/scheduler.py new file mode 100644 index 00000000..cbaec9c0 --- /dev/null +++ b/decnet/orchestrator/emailgen/scheduler.py @@ -0,0 +1,228 @@ +"""Action picker for the emailgen worker. + +One tick = one (mail-decky, sender, recipient, [thread]) decision. + +Scope (v1): +- Only TopologyDeckies are eligible mail hosts. Fleet / SWARM-shard + mail-deckies are out of scope per the plan; they get covered when the + forwarder pattern lands for emailgen. +- Mail decky = a running TopologyDecky whose ``services`` includes + ``imap`` or ``pop3``. +- Personas come from ``Topology.email_personas`` (JSON list of + :class:`EmailPersona`). Topology-wide ``language_default`` fills in + any persona that didn't set its own. + +Returns ``None`` (skip tick) when: +- no running mail decky, +- the mail decky's topology has fewer than two valid personas, +- nobody is in their ``active_hours`` window right now. +""" +from __future__ import annotations + +import secrets +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Optional + +from decnet.logging import get_logger +from decnet.orchestrator.emailgen.personas import ( + EmailPersona, + in_active_hours, + parse_personas, +) +from decnet.orchestrator.emailgen.threads import ( + ThreadChain, + new_thread_id, + references_for_reply, + reply_subject, +) + +logger = get_logger("orchestrator.emailgen") + +_MAIL_SERVICES = ("imap", "pop3") +# Probability of replying on an existing thread when one exists. The +# inverse starts a fresh thread. 0.6 mirrors what mailbox studies find +# for active corporate inboxes — most messages are replies, but not +# overwhelmingly so. +_REPLY_PROBABILITY = 0.6 + +# Generic context hints fed to the LLM when starting a new thread. +# Deliberately broad — the persona's tone + role is what shapes the +# email; the hint just gives the model a topic to riff on. +_CONTEXT_HINTS: tuple[str, ...] = ( + "Q3 budget review and approval", + "Client presentation feedback", + "Project deadline extension request", + "Team building event planning", + "IT system maintenance notification", + "Quarterly performance review", + "Vendor onboarding process", + "Holiday schedule announcement", + "Training session invitation", + "Department restructuring update", + "Client contract negotiation", + "Security audit findings", + "Sales strategy meeting", + "Product launch timeline", + "Office relocation update", + "Travel reimbursement policy change", +) + + +@dataclass(frozen=True) +class EmailAction: + """One emailgen tick's decision. + + ``thread_id`` is non-None whenever this action is a reply; the + worker writes it back to the DB so future ticks can chain further + replies. ``in_reply_to`` / ``references`` mirror the RFC 2822 + headers we'll set on the EML. + + ``mail_decky_name`` / ``mail_decky_services`` are denormalised onto + the action so the driver doesn't need a second repo round-trip just + to resolve the container name. + """ + mail_decky_uuid: str + mail_decky_name: str + mail_decky_services: tuple[str, ...] + sender: EmailPersona + recipient: EmailPersona + thread_id: str + parent_message_id: Optional[str] + references: str + subject_hint: Optional[str] # used as parent subject when replying + parent_excerpt: Optional[str] # excerpt from the parent body + context_hint: str # only meaningful on new threads + is_reply: bool + description: str = "email:send" + + +def _is_mail_decky(decky: dict[str, Any]) -> bool: + services = decky.get("services") or [] + if isinstance(services, str): + return False + return any(s in services for s in _MAIL_SERVICES) + + +async def pick( + repo: Any, + *, + rand: Optional[secrets.SystemRandom] = None, + now: Optional[datetime] = None, +) -> Optional[EmailAction]: + """Pick one email action against the running fleet. + + *repo* is a :class:`BaseRepository`; we fetch running topology + deckies + their parent topology row directly. *now* is the + wall-clock used for ``active_hours`` filtering — injected so tests + can pin the hour deterministically. + """ + rng = rand or secrets.SystemRandom() + now_dt = now or datetime.now() + + deckies = await repo.list_running_topology_deckies() + mail_deckies = [d for d in deckies if _is_mail_decky(d)] + if not mail_deckies: + logger.debug("emailgen pick: no running mail decky") + return None + + mail_decky = rng.choice(mail_deckies) + topology_id = mail_decky.get("topology_id") + if not topology_id: + logger.debug("emailgen pick: mail decky has no topology_id") + return None + + topology = await repo.get_topology(topology_id) + if not topology: + logger.debug("emailgen pick: topology %s not found", topology_id) + return None + + personas = parse_personas( + topology.get("email_personas"), + language_default=topology.get("language_default") or "en", + ) + if len(personas) < 2: + logger.debug( + "emailgen pick: topology=%s has only %d personas; need >=2", + topology_id, len(personas), + ) + return None + + active = [p for p in personas if in_active_hours(p, now_dt.hour)] + if len(active) < 2: + logger.debug( + "emailgen pick: topology=%s only %d personas in-hours", + topology_id, len(active), + ) + return None + + sender = rng.choice(active) + recipient = rng.choice([p for p in active if p.email != sender.email]) + + # Look up open threads between this pair on this mail decky. + chain = await _maybe_pick_chain( + repo, mail_decky["uuid"], sender, recipient, rng=rng, + ) + + services = tuple(mail_decky.get("services") or ()) + decky_name = mail_decky.get("name") or "" + + if chain is not None: + return EmailAction( + mail_decky_uuid=mail_decky["uuid"], + mail_decky_name=decky_name, + mail_decky_services=services, + sender=sender, + recipient=recipient, + thread_id=chain.thread_id, + parent_message_id=chain.parent_message_id, + references=references_for_reply(chain), + subject_hint=chain.parent_subject, + parent_excerpt=None, # repo can populate later if useful + context_hint=chain.parent_subject, + is_reply=True, + ) + + return EmailAction( + mail_decky_uuid=mail_decky["uuid"], + mail_decky_name=decky_name, + mail_decky_services=services, + sender=sender, + recipient=recipient, + thread_id=new_thread_id(), + parent_message_id=None, + references="", + subject_hint=None, + parent_excerpt=None, + context_hint=rng.choice(_CONTEXT_HINTS), + is_reply=False, + ) + + +async def _maybe_pick_chain( + repo: Any, + mail_decky_uuid: str, + sender: EmailPersona, + recipient: EmailPersona, + *, + rng: secrets.SystemRandom, +) -> Optional[ThreadChain]: + """Probabilistically pick an open thread between the pair, or None.""" + if rng.random() >= _REPLY_PROBABILITY: + return None + threads = await repo.list_orchestrator_email_threads( + mail_decky_uuid, sender.email, recipient.email, limit=20, + ) + if not threads: + return None + head = threads[0] + return ThreadChain( + thread_id=head["thread_id"], + parent_message_id=head["message_id"], + # We don't reconstruct the full ancestry from row history here — + # the parent's References + parent's Message-ID would do that. + # For v1, single-step references is fine; mail clients still + # group correctly by (Subject + In-Reply-To). + references=tuple(), + parent_subject=reply_subject(head["subject"]), + ) diff --git a/decnet/orchestrator/emailgen/threads.py b/decnet/orchestrator/emailgen/threads.py new file mode 100644 index 00000000..c62665a0 --- /dev/null +++ b/decnet/orchestrator/emailgen/threads.py @@ -0,0 +1,75 @@ +"""RFC 2822 thread-chain bookkeeping. + +A thread is a worker-side UUID that groups one or more emails between +the same two personas. ``In-Reply-To`` carries the immediate parent's +``Message-ID``; ``References`` carries the full ancestry chain. + +The emailgen scheduler queries the repository for the most recent email +in any thread between (sender, recipient); if it finds one, it emits a +reply (continuing the chain). Otherwise it starts a new thread. +""" +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from typing import Optional + + +@dataclass(frozen=True) +class ThreadChain: + """Immutable view of a thread's chain at a point in time. + + ``thread_id`` is opaque (UUID). ``parent_message_id`` is the most + recent message in the chain — the new reply's ``In-Reply-To`` field. + ``references`` is the dot-separated history fed into the + ``References:`` header (oldest-first per RFC 2822 §3.6.4). + ``parent_subject`` carries the subject we're replying to, so the + reply can prepend ``Re:`` correctly. + """ + thread_id: str + parent_message_id: str + references: tuple[str, ...] + parent_subject: str + + +def new_thread_id() -> str: + return str(uuid.uuid4()) + + +def reply_subject(parent_subject: str) -> str: + """Prepend ``Re:`` to *parent_subject* if not already a reply. + + Folds repeat ``Re: Re: Re:`` into a single ``Re:`` — Outlook / + Thunderbird both do this and an attacker reading the maildir would + notice the corpus's missing convention immediately. + """ + s = parent_subject.strip() + lowered = s.lower() + while lowered.startswith("re:"): + s = s[3:].lstrip() + lowered = s.lower() + return f"Re: {s}" + + +def references_for_reply(chain: Optional[ThreadChain]) -> str: + """Build the ``References:`` header value for a reply. + + Returns a space-separated list of message-ids, oldest-first, with + the parent appended. Empty string when *chain* is None (root). + """ + if chain is None: + return "" + refs = list(chain.references) + [chain.parent_message_id] + return " ".join(refs) + + +def new_message_id(domain: str) -> str: + """Build an RFC 2822 ``Message-ID`` value (incl. angle brackets). + + Worker side — the value is also stored in the DB so a future reply + can be threaded against it. Domain mirrors the sender's email + domain so an attacker grepping for tells doesn't find every + fake-corp email tagged with ``@example.com``. + """ + safe_domain = domain.strip() or "localhost" + return f"<{uuid.uuid4().hex}@{safe_domain}>" diff --git a/decnet/orchestrator/emailgen/worker.py b/decnet/orchestrator/emailgen/worker.py new file mode 100644 index 00000000..315c97e7 --- /dev/null +++ b/decnet/orchestrator/emailgen/worker.py @@ -0,0 +1,131 @@ +"""Emailgen main loop. + +Mirrors :mod:`decnet.orchestrator.worker` shape: same heartbeat, same +control listener, same fire-and-forget bus publish, same prune knob. +A wedged ollama call stalls only this worker, never the SSH-flavoured +orchestrator running alongside. +""" +from __future__ import annotations + +import asyncio +import contextlib + +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + publish_safely, + run_control_listener, + run_health_heartbeat, +) +from decnet.logging import get_logger +from decnet.orchestrator.drivers.email import EmailDriver +from decnet.orchestrator.emailgen import events, scheduler +from decnet.web.db.repository import BaseRepository + +logger = get_logger("orchestrator.emailgen") + +# Periodic-prune knobs — same shape as orchestrator/worker.py. +_PRUNE_EVERY_TICKS = 100 +_PRUNE_PER_DECKY_CAP = 5000 + + +async def emailgen_worker( + repo: BaseRepository, + *, + interval: int = 300, + model: str | None = None, +) -> None: + """Periodically generate one fake email into a running mail decky. + + Default interval is 5 minutes — emails are expensive (LLM round + trip) and don't need to fire every minute to look natural. Honors + ``system.emailgen.control`` for graceful shutdown. + """ + logger.info("emailgen worker started interval=%ds model=%s", interval, model) + + bus = None + try: + bus = get_bus(client_name="emailgen") + await bus.connect() + except Exception as exc: # noqa: BLE001 + logger.warning( + "emailgen: bus unavailable, continuing without publish: %s", exc + ) + bus = None + + driver = EmailDriver(model=model) if model else EmailDriver() + shutdown = asyncio.Event() + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "emailgen")) + control_task = asyncio.create_task( + run_control_listener(bus, "emailgen", shutdown), + ) + tick_n = 0 + try: + while not shutdown.is_set(): + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + except asyncio.TimeoutError: + pass # normal tick + if shutdown.is_set(): + break + try: + await _one_tick(repo, driver, bus) + except Exception as exc: # noqa: BLE001 + logger.error("emailgen tick failed: %s", exc) + tick_n += 1 + if tick_n % _PRUNE_EVERY_TICKS == 0: + try: + deleted = await repo.prune_orchestrator_emails( + per_decky_cap=_PRUNE_PER_DECKY_CAP, + ) + if deleted: + logger.info( + "emailgen prune deleted=%d cap=%d", + deleted, _PRUNE_PER_DECKY_CAP, + ) + except Exception as exc: # noqa: BLE001 + logger.error("emailgen prune failed: %s", exc) + finally: + for t in (heartbeat_task, control_task): + t.cancel() + with contextlib.suppress(Exception, asyncio.CancelledError): + await t + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _one_tick(repo: BaseRepository, driver: EmailDriver, bus) -> None: + action = await scheduler.pick(repo) + if action is None: + logger.debug("emailgen: no actionable mail decky / personas this tick") + return + + result = await driver.run(action) + row = events.to_row(action, result) + await repo.record_orchestrator_email(row) + + if bus is not None: + topic = events.topic_for(action) + # Mirror the orchestrator-event SSE-friendly payload shape: ts + # as iso8601, payload as already-serialised dict. + bus_payload = { + "kind": "email", + "mail_decky_uuid": row["mail_decky_uuid"], + "thread_id": row["thread_id"], + "message_id": row["message_id"], + "in_reply_to": row["in_reply_to"], + "sender_email": row["sender_email"], + "recipient_email": row["recipient_email"], + "subject": row["subject"], + "language": row["language"], + "success": row["success"], + "ts": row["ts"].isoformat(), + } + await publish_safely( + bus, topic, bus_payload, event_type=events.event_type_for(action), + ) + + logger.info( + "emailgen tick mail_decky=%s thread=%s success=%s reply=%s", + row["mail_decky_uuid"], row["thread_id"], row["success"], action.is_reply, + ) diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index f99d396e..79ece47b 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -58,6 +58,8 @@ from .health import ( HealthResponse, ) from .orchestrator import ( + OrchestratorEmail, + OrchestratorEmailsResponse, OrchestratorEvent, OrchestratorEventsResponse, ) @@ -193,6 +195,8 @@ __all__ = [ "ComponentHealth", "HealthResponse", # orchestrator + "OrchestratorEmail", + "OrchestratorEmailsResponse", "OrchestratorEvent", "OrchestratorEventsResponse", # logs diff --git a/decnet/web/db/models/orchestrator.py b/decnet/web/db/models/orchestrator.py index 5d217eab..87e1548d 100644 --- a/decnet/web/db/models/orchestrator.py +++ b/decnet/web/db/models/orchestrator.py @@ -60,3 +60,52 @@ class OrchestratorEventsResponse(BaseModel): limit: int offset: int data: List[dict[str, Any]] + + +class OrchestratorEmail(SQLModel, table=True): + """One fake email generated by the ``decnet emailgen`` worker. + + Sibling table to :class:`OrchestratorEvent` — kept disjoint because + email rows carry domain-specific fields (subject, message_id, + in_reply_to, language) that have no analogue in the SSH/file events + and would otherwise bloat ``OrchestratorEvent.payload``. + + The mail decky's UUID lives in ``mail_decky_uuid`` (the host serving + the IMAP/POP3 mailbox). ``thread_id`` is a worker-side UUID used to + chain replies; ``in_reply_to`` is the parent email's RFC 2822 + Message-ID header value (or ``None`` for thread roots). + + ``payload`` follows the same loose-JSON convention as + :class:`OrchestratorEvent`: ``bytes``, ``generation_ms``, ``model``, + ``mannerisms_used``, etc. The worker can extend it without a + migration. + """ + __tablename__ = "orchestrator_emails" + __table_args__ = ( + Index("ix_orchestrator_emails_mail_ts", "mail_decky_uuid", "ts"), + Index("ix_orchestrator_emails_thread", "thread_id"), + ) + uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True) + ts: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + mail_decky_uuid: str = Field(index=True) + thread_id: str = Field(index=True) + message_id: str = Field(max_length=255) + in_reply_to: Optional[str] = Field(default=None, max_length=255) + sender_email: str = Field(max_length=255, index=True) + recipient_email: str = Field(max_length=255, index=True) + subject: str = Field(max_length=512) + language: str = Field(max_length=8, default="en") + eml_path: str = Field(max_length=1024) + success: bool = Field(default=False, index=True) + payload: str = Field( + sa_column=Column("payload", Text, nullable=False, default="{}") + ) + + +class OrchestratorEmailsResponse(BaseModel): + total: int + limit: int + offset: int + data: List[dict[str, Any]] diff --git a/decnet/web/db/models/topology.py b/decnet/web/db/models/topology.py index 20e8423d..8fae8f69 100644 --- a/decnet/web/db/models/topology.py +++ b/decnet/web/db/models/topology.py @@ -47,6 +47,18 @@ class Topology(SQLModel, table=True): # running. Drained by the mutator watch loop, which re-pushes via # AgentClient and clears the flag. NULL for unihost topologies. needs_resync: bool = Field(default=False, nullable=False) + # JSON-serialised list of EmailPersona dicts consumed by the + # ``decnet emailgen`` worker. Empty list = no fake mailbox owners + # configured for this topology, the worker skips it. + email_personas: str = Field( + sa_column=Column( + "email_personas", _BIG_TEXT, nullable=False, default="[]" + ) + ) + # ISO 639-1 language code applied to any persona that doesn't override + # ``language`` itself. English by default; ANTI's deployments default + # to "es" by editing this column. + language_default: str = Field(default="en", max_length=8) class LAN(SQLModel, table=True): diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index b4c2d866..451977da 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -952,3 +952,57 @@ class BaseRepository(ABC): unbounded growth without paying the cost on every write. """ raise NotImplementedError + + async def record_orchestrator_email(self, data: dict[str, Any]) -> str: + """Insert one orchestrator-generated email row, returning its uuid.""" + raise NotImplementedError + + async def list_orchestrator_emails( + self, + limit: int = 100, + offset: int = 0, + *, + mail_decky_uuid: Optional[str] = None, + thread_id: Optional[str] = None, + since_ts: Optional[Any] = None, + ) -> list[dict[str, Any]]: + """Paginated orchestrator emails newest-first. + + Optional filters narrow to a single mail decky or to one thread, + used by the dashboard's mailbox-inspector view. + """ + raise NotImplementedError + + async def count_orchestrator_emails( + self, + *, + mail_decky_uuid: Optional[str] = None, + ) -> int: + """Total orchestrator-email rows, optionally filtered by mail decky.""" + raise NotImplementedError + + async def list_orchestrator_email_threads( + self, + mail_decky_uuid: str, + sender_email: str, + recipient_email: str, + *, + limit: int = 50, + ) -> list[dict[str, Any]]: + """Open threads between *sender_email* and *recipient_email* on + *mail_decky_uuid*, newest-first. + + Used by the emailgen scheduler to decide whether to start a new + thread or reply on an existing one. Each entry is one row's + worth of dict — the worker only needs ``thread_id`` and the most + recent ``message_id`` / ``subject`` to build the reply. + """ + raise NotImplementedError + + async def prune_orchestrator_emails(self, per_decky_cap: int = 10000) -> int: + """Trim per-``mail_decky_uuid`` rows to a cap. Returns deleted count. + + Mirrors :meth:`prune_orchestrator_events`; emailgen worker calls + this on a periodic tick. + """ + raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index b23e46e7..274c9e52 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -51,6 +51,7 @@ from decnet.web.db.models import ( TopologyEdge, TopologyStatusEvent, TopologyMutation, + OrchestratorEmail, OrchestratorEvent, WebhookSubscription, ) @@ -3003,3 +3004,113 @@ class SQLModelRepository(BaseRepository): deleted += res.rowcount or 0 await session.commit() return deleted + + # ---------------------------------------------------------- emailgen + + async def record_orchestrator_email(self, data: dict[str, Any]) -> str: + payload = data.get("payload") + if isinstance(payload, (dict, list)): + data = {**data, "payload": json.dumps(payload)} + async with self._session() as session: + row = OrchestratorEmail(**data) + session.add(row) + await session.commit() + await session.refresh(row) + return row.uuid + + async def list_orchestrator_emails( + self, + limit: int = 100, + offset: int = 0, + *, + mail_decky_uuid: Optional[str] = None, + thread_id: Optional[str] = None, + since_ts: Optional[datetime] = None, + ) -> list[dict[str, Any]]: + async with self._session() as session: + stmt = select(OrchestratorEmail) + if mail_decky_uuid is not None: + stmt = stmt.where( + OrchestratorEmail.mail_decky_uuid == mail_decky_uuid + ) + if thread_id is not None: + stmt = stmt.where(OrchestratorEmail.thread_id == thread_id) + if since_ts is not None: + stmt = stmt.where(OrchestratorEmail.ts >= since_ts) + stmt = ( + stmt.order_by(desc(OrchestratorEmail.ts)) + .offset(offset) + .limit(limit) + ) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def count_orchestrator_emails( + self, + *, + mail_decky_uuid: Optional[str] = None, + ) -> int: + stmt = select(func.count()).select_from(OrchestratorEmail) + if mail_decky_uuid is not None: + stmt = stmt.where(OrchestratorEmail.mail_decky_uuid == mail_decky_uuid) + async with self._session() as session: + result = await session.execute(stmt) + return result.scalar() or 0 + + async def list_orchestrator_email_threads( + self, + mail_decky_uuid: str, + sender_email: str, + recipient_email: str, + *, + limit: int = 50, + ) -> list[dict[str, Any]]: + # Most-recent row per (sender, recipient) pair under this mail decky. + # The scheduler only needs the latest message_id/subject/thread_id to + # construct a reply; older rows in the same thread aren't relevant + # for the "do we reply or start fresh" decision. + async with self._session() as session: + stmt = ( + select(OrchestratorEmail) + .where( + OrchestratorEmail.mail_decky_uuid == mail_decky_uuid, + or_( + (OrchestratorEmail.sender_email == sender_email) + & (OrchestratorEmail.recipient_email == recipient_email), + (OrchestratorEmail.sender_email == recipient_email) + & (OrchestratorEmail.recipient_email == sender_email), + ), + OrchestratorEmail.success.is_(True), + ) + .order_by(desc(OrchestratorEmail.ts)) + .limit(limit) + ) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def prune_orchestrator_emails(self, per_decky_cap: int = 10000) -> int: + """Trim per-mail-decky rows to *per_decky_cap*, oldest-first.""" + deleted = 0 + async with self._session() as session: + decky_rows = await session.execute( + select(OrchestratorEmail.mail_decky_uuid).distinct() + ) + for (mail_uuid,) in decky_rows.all(): + keep = await session.execute( + select(OrchestratorEmail.uuid) + .where(OrchestratorEmail.mail_decky_uuid == mail_uuid) + .order_by(desc(OrchestratorEmail.ts)) + .limit(per_decky_cap) + ) + keep_uuids = [u for (u,) in keep.all()] + if not keep_uuids: + continue + from sqlalchemy import delete as _delete + stmt = _delete(OrchestratorEmail).where( + OrchestratorEmail.mail_decky_uuid == mail_uuid, + OrchestratorEmail.uuid.notin_(keep_uuids), + ) + res = await session.execute(stmt) + deleted += res.rowcount or 0 + await session.commit() + return deleted diff --git a/tests/orchestrator/emailgen/__init__.py b/tests/orchestrator/emailgen/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/orchestrator/emailgen/test_driver.py b/tests/orchestrator/emailgen/test_driver.py new file mode 100644 index 00000000..5be05e84 --- /dev/null +++ b/tests/orchestrator/emailgen/test_driver.py @@ -0,0 +1,169 @@ +"""EmailDriver: stub the Ollama subprocess + docker exec; verify EML +parse-and-repair and payload metadata.""" +from __future__ import annotations + +import pytest + +from decnet.orchestrator.drivers import email as email_driver +from decnet.orchestrator.emailgen.personas import EmailPersona +from decnet.orchestrator.emailgen.scheduler import EmailAction + + +def _persona(name="John", email="john@corp.com"): + return EmailPersona( + name=name, + email=email, + role="COO", + tone="formal", + mannerisms=["uses 'Best regards'"], + language="en", + ) + + +def _action(is_reply=False): + return EmailAction( + mail_decky_uuid="d1", + mail_decky_name="mailhost", + mail_decky_services=("imap",), + sender=_persona(), + recipient=_persona(name="Sarah", email="sarah@corp.com"), + thread_id="thr1", + parent_message_id="" if is_reply else None, + references="" if not is_reply else "", + subject_hint="Re: budget" if is_reply else None, + parent_excerpt=None, + context_hint="Q3 budget" if not is_reply else "Re: budget", + is_reply=is_reply, + ) + + +def test_parse_subject_and_body_extracts_subject_line(): + out = "Subject: Quick update\n\nHi Sarah,\nNumbers attached.\n" + subject, body = email_driver._parse_subject_and_body(out) + assert subject == "Quick update" + assert body.startswith("Hi Sarah") + + +def test_parse_subject_strips_code_fences(): + out = "```\nSubject: Quick update\n\nbody\n```\n" + subject, body = email_driver._parse_subject_and_body(out) + assert subject == "Quick update" + assert body == "body" + + +def test_parse_subject_falls_back_when_missing(): + out = "Just a body, no subject\n" + subject, body = email_driver._parse_subject_and_body(out) + assert subject == "Business Communication" + assert "body" in body.lower() + + +def test_build_eml_includes_required_headers(): + from datetime import datetime, timezone + + eml = email_driver._build_eml( + sender_name="John", + sender_email="john@corp.com", + recipient_name="Sarah", + recipient_email="sarah@corp.com", + subject="Q3 budget", + body="Hi Sarah,\nNumbers attached.", + message_id="", + in_reply_to=None, + references="", + ts=datetime(2026, 4, 26, 12, 0, tzinfo=timezone.utc), + ).decode("utf-8") + assert "From: John " in eml + assert "To: Sarah " in eml + assert "Subject: Q3 budget" in eml + assert "Message-ID: " in eml + assert "MIME-Version: 1.0" in eml + assert "In-Reply-To" not in eml + + +def test_build_eml_threads_carry_in_reply_to_and_references(): + from datetime import datetime, timezone + + eml = email_driver._build_eml( + sender_name="John", + sender_email="john@corp.com", + recipient_name="Sarah", + recipient_email="sarah@corp.com", + subject="Re: Q3", + body="Following up.", + message_id="", + in_reply_to="", + references="", + ts=datetime(2026, 4, 26, 12, 0, tzinfo=timezone.utc), + ).decode("utf-8") + assert "In-Reply-To: " in eml + assert "References: " in eml + + +def test_container_for_imap_takes_priority(): + assert email_driver._container_for("mailhost", ["imap", "pop3"]) == "mailhost-imap" + + +def test_container_for_pop3_only(): + assert email_driver._container_for("mailhost", ["pop3"]) == "mailhost-pop3" + + +@pytest.mark.asyncio +async def test_driver_run_success_path(monkeypatch): + """Stub both subprocess calls (ollama + docker exec) as success.""" + calls: list[list[str]] = [] + + async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0): + calls.append(list(argv)) + if argv[0] == "ollama": + return 0, "Subject: Q3 budget\n\nHi Sarah,\nNumbers attached.\n", "" + # docker exec + return 0, "", "" + + monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture) + + drv = email_driver.EmailDriver(model="llama3.1", ollama_timeout=1.0) + result = await drv.run(_action()) + assert result.success is True + assert result.payload["model"] == "llama3.1" + assert result.payload["subject"] == "Q3 budget" + assert result.payload["language"] == "en" + assert result.payload["mannerisms_used"] + assert result.payload["message_id"].startswith("<") + assert result.payload["eml_path"].endswith(".eml") + assert result.payload["container"] == "mailhost-imap" + # Two subprocess calls: ollama, then docker exec. + assert calls[0][0] == "ollama" + assert calls[1][0] == "docker" + + +@pytest.mark.asyncio +async def test_driver_run_ollama_failure_short_circuits(monkeypatch): + async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0): + if argv[0] == "ollama": + return 1, "", "ollama: model not found" + return 0, "", "" + + monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture) + + drv = email_driver.EmailDriver() + result = await drv.run(_action()) + assert result.success is False + assert result.payload["stage"] == "ollama" + assert "model not found" in result.payload["stderr"] + + +@pytest.mark.asyncio +async def test_driver_run_delivery_failure(monkeypatch): + async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0): + if argv[0] == "ollama": + return 0, "Subject: hi\n\nbody\n", "" + return 1, "", "no such container" + + monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture) + + drv = email_driver.EmailDriver() + result = await drv.run(_action()) + assert result.success is False + assert result.payload["stage"] == "delivery" + assert "no such container" in result.payload["stderr"] diff --git a/tests/orchestrator/emailgen/test_events.py b/tests/orchestrator/emailgen/test_events.py new file mode 100644 index 00000000..960000e4 --- /dev/null +++ b/tests/orchestrator/emailgen/test_events.py @@ -0,0 +1,72 @@ +"""events.to_row / topic_for / event_type_for.""" +from __future__ import annotations + +from decnet.bus import topics as _topics +from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.emailgen import events +from decnet.orchestrator.emailgen.personas import EmailPersona +from decnet.orchestrator.emailgen.scheduler import EmailAction + + +def _persona(email="john@corp.com"): + return EmailPersona( + name="John", email=email, role="COO", tone="formal", + mannerisms=[], language="en", + ) + + +def _action(): + return EmailAction( + mail_decky_uuid="d1", + mail_decky_name="mailhost", + mail_decky_services=("imap",), + sender=_persona(), + recipient=_persona(email="sarah@corp.com"), + thread_id="thr1", + parent_message_id=None, + references="", + subject_hint=None, + parent_excerpt=None, + context_hint="Q3 budget", + is_reply=False, + ) + + +def test_to_row_pulls_message_id_subject_from_payload(): + res = ActivityResult( + success=True, + payload={ + "message_id": "", + "subject": "Q3 budget", + "language": "en", + "eml_path": "/var/spool/decnet-emails/thr1/m1.eml", + "model": "llama3.1", + }, + ) + row = events.to_row(_action(), res) + assert row["mail_decky_uuid"] == "d1" + assert row["thread_id"] == "thr1" + assert row["message_id"] == "" + assert row["subject"] == "Q3 budget" + assert row["sender_email"] == "john@corp.com" + assert row["recipient_email"] == "sarah@corp.com" + assert row["language"] == "en" + assert row["eml_path"].endswith(".eml") + assert row["success"] is True + assert row["payload"]["model"] == "llama3.1" + + +def test_to_row_falls_back_to_persona_language(): + res = ActivityResult(success=True, payload={}) + row = events.to_row(_action(), res) + assert row["language"] == "en" + assert row["message_id"] == "" + + +def test_topic_for_uses_orchestrator_email_root(): + topic = events.topic_for(_action()) + assert topic == f"orchestrator.{_topics.ORCHESTRATOR_EMAIL}.d1" + + +def test_event_type_for_returns_email_constant(): + assert events.event_type_for(_action()) == _topics.ORCHESTRATOR_EMAIL diff --git a/tests/orchestrator/emailgen/test_personas.py b/tests/orchestrator/emailgen/test_personas.py new file mode 100644 index 00000000..a4ba80ba --- /dev/null +++ b/tests/orchestrator/emailgen/test_personas.py @@ -0,0 +1,101 @@ +"""Persona schema parsing + active-hours window tests.""" +from __future__ import annotations + +import json + +from decnet.orchestrator.emailgen.personas import ( + EmailPersona, + in_active_hours, + parse_personas, +) + + +def _persona(**over) -> dict: + base = { + "name": "John Smith", + "email": "john@corp.com", + "role": "COO", + "tone": "formal", + "mannerisms": ["uses 'Best regards'"], + } + base.update(over) + return base + + +def test_parse_empty_inputs(): + assert parse_personas(None) == [] + assert parse_personas("") == [] + assert parse_personas([]) == [] + + +def test_parse_invalid_json_returns_empty_no_raise(): + assert parse_personas("{not json") == [] + + +def test_parse_invalid_top_level_shape_returns_empty(): + assert parse_personas('{"not": "a list"}') == [] + + +def test_parse_drops_invalid_entry_keeps_valid(): + raw = json.dumps([ + _persona(), + {"name": "broken", "email": "not-an-email"}, + _persona(name="Sarah", email="sarah@corp.com"), + ]) + parsed = parse_personas(raw) + assert len(parsed) == 2 + assert {p.name for p in parsed} == {"John Smith", "Sarah"} + + +def test_parse_resolves_language_default_when_unset(): + raw = json.dumps([_persona()]) + parsed = parse_personas(raw, language_default="es") + assert parsed[0].language == "es" + + +def test_parse_persona_language_overrides_default(): + raw = json.dumps([_persona(language="pt")]) + parsed = parse_personas(raw, language_default="es") + assert parsed[0].language == "pt" + + +def test_parse_accepts_python_list_directly(): + parsed = parse_personas([_persona()]) + assert len(parsed) == 1 + + +def test_uses_llms_heavily_default_false(): + parsed = parse_personas([_persona()]) + assert parsed[0].uses_llms_heavily is False + + +def test_uses_llms_heavily_can_be_set(): + parsed = parse_personas([_persona(uses_llms_heavily=True)]) + assert parsed[0].uses_llms_heavily is True + + +def test_active_hours_normal_window(): + p = EmailPersona(**_persona(active_hours="09:00-18:00")) + assert in_active_hours(p, 12) is True + assert in_active_hours(p, 8) is False + assert in_active_hours(p, 18) is False + assert in_active_hours(p, 9) is True + + +def test_active_hours_wraparound_window(): + p = EmailPersona(**_persona(active_hours="22:00-06:00")) + assert in_active_hours(p, 23) is True + assert in_active_hours(p, 0) is True + assert in_active_hours(p, 5) is True + assert in_active_hours(p, 7) is False + + +def test_active_hours_malformed_treats_as_always_on(): + p = EmailPersona(**_persona(active_hours="garbage")) + assert in_active_hours(p, 0) is True + assert in_active_hours(p, 23) is True + + +def test_active_hours_equal_window_treated_as_always_on(): + p = EmailPersona(**_persona(active_hours="10:00-10:00")) + assert in_active_hours(p, 5) is True diff --git a/tests/orchestrator/emailgen/test_prompt.py b/tests/orchestrator/emailgen/test_prompt.py new file mode 100644 index 00000000..723b76d6 --- /dev/null +++ b/tests/orchestrator/emailgen/test_prompt.py @@ -0,0 +1,152 @@ +"""Prompt builder behaviour: language constraint, em-dash suppression, +deterministic mannerism injection.""" +from __future__ import annotations + +import random + +from decnet.orchestrator.emailgen.personas import EmailPersona +from decnet.orchestrator.emailgen.prompt import ( + PromptInputs, + build, + select_mannerisms, +) + + +def _persona(**over) -> EmailPersona: + base = dict( + name="John Smith", + email="john@corp.com", + role="COO", + tone="formal", + mannerisms=[ + "opens with 'I hope this finds you well'", + "uses 'Best regards' exclusively", + "references policy by number", + "ccs legal", + ], + language="en", + ) + base.update(over) + return EmailPersona(**base) + + +class _SeededRng: + """Adapter so prompt code thinks it has a SystemRandom.""" + + def __init__(self, seed: int): + self._r = random.Random(seed) + + def shuffle(self, seq): + self._r.shuffle(seq) + + def random(self): + return self._r.random() + + def choice(self, seq): + return self._r.choice(seq) + + +def test_select_mannerisms_returns_subset_of_pool(): + persona = _persona() + picks = select_mannerisms(persona, rng=_SeededRng(0), n=2) + assert len(picks) == 2 + assert all(m in persona.mannerisms for m in picks) + + +def test_select_mannerisms_deterministic_under_same_seed(): + persona = _persona() + a = select_mannerisms(persona, rng=_SeededRng(42), n=2) + b = select_mannerisms(persona, rng=_SeededRng(42), n=2) + assert a == b + + +def test_select_mannerisms_returns_all_when_pool_smaller_than_n(): + persona = _persona(mannerisms=["a"]) + picks = select_mannerisms(persona, rng=_SeededRng(0), n=2) + assert picks == ["a"] + + +def test_select_mannerisms_empty_pool(): + persona = _persona(mannerisms=[]) + assert select_mannerisms(persona) == [] + + +def test_build_includes_language_constraint_english(): + sender = _persona(language="en") + recip = _persona(name="Sarah", email="sarah@corp.com", role="PM") + prompt, _ = build( + PromptInputs(sender=sender, recipient=recip, context_hint="budget"), + rng=_SeededRng(0), + ) + assert "in English" in prompt + + +def test_build_includes_language_constraint_spanish(): + sender = _persona(language="es") + recip = _persona(name="Sarah", email="sarah@corp.com", role="PM") + prompt, _ = build( + PromptInputs(sender=sender, recipient=recip, context_hint="budget"), + rng=_SeededRng(0), + ) + assert "in Spanish" in prompt + + +def test_build_em_dash_suppression_default(): + sender = _persona() + recip = _persona(name="Sarah", email="sarah@corp.com", role="PM") + prompt, _ = build( + PromptInputs(sender=sender, recipient=recip, context_hint="budget"), + rng=_SeededRng(0), + ) + assert "Do NOT use em-dashes" in prompt + + +def test_build_em_dash_lifted_for_llm_heavy_persona(): + sender = _persona(uses_llms_heavily=True) + recip = _persona(name="Sarah", email="sarah@corp.com", role="PM") + prompt, _ = build( + PromptInputs(sender=sender, recipient=recip, context_hint="budget"), + rng=_SeededRng(0), + ) + assert "Do NOT use em-dashes" not in prompt + assert "fine" in prompt.lower() + + +def test_build_reply_thread_block_prefixes_re(): + sender = _persona() + recip = _persona(name="Sarah", email="sarah@corp.com", role="PM") + prompt, _ = build( + PromptInputs( + sender=sender, + recipient=recip, + context_hint="budget", + parent_subject="Re: Q3 budget", + parent_excerpt="Numbers attached.", + ), + rng=_SeededRng(0), + ) + assert "REPLY in an ongoing thread" in prompt + assert "Re: Q3 budget" in prompt + assert "Numbers attached" in prompt + assert "prefixed with 'Re: '" in prompt + + +def test_build_returns_mannerisms_used_metadata(): + sender = _persona() + recip = _persona(name="Sarah", email="sarah@corp.com", role="PM") + _, used = build( + PromptInputs(sender=sender, recipient=recip, context_hint="budget"), + rng=_SeededRng(7), + ) + assert used + assert all(m in sender.mannerisms for m in used) + + +def test_build_uses_explicit_signature_when_provided(): + sender = _persona(signature="-- John\\nCOO") + recip = _persona(name="Sarah", email="sarah@corp.com", role="PM") + prompt, _ = build( + PromptInputs(sender=sender, recipient=recip, context_hint="budget"), + rng=_SeededRng(0), + ) + assert "Use this exact signature block" in prompt diff --git a/tests/orchestrator/emailgen/test_repo.py b/tests/orchestrator/emailgen/test_repo.py new file mode 100644 index 00000000..068e4ce0 --- /dev/null +++ b/tests/orchestrator/emailgen/test_repo.py @@ -0,0 +1,129 @@ +"""record / list / count / prune orchestrator_emails on a real SQLite repo.""" +from __future__ import annotations + +import json +from datetime import datetime, timedelta, timezone + +import pytest +import pytest_asyncio + +from decnet.web.db.sqlite.repository import SQLiteRepository + + +@pytest_asyncio.fixture +async def repo(tmp_path): + r = SQLiteRepository(db_path=str(tmp_path / "decnet.db")) + await r.initialize() + yield r + await r.engine.dispose() + + +def _row( + mail="d1", + thread="thr1", + msg="", + sender="john@corp.com", + recipient="sarah@corp.com", + subject="Q3 budget", + success=True, + in_reply_to=None, + ts=None, +): + return { + "ts": ts or datetime.now(timezone.utc), + "mail_decky_uuid": mail, + "thread_id": thread, + "message_id": msg, + "in_reply_to": in_reply_to, + "sender_email": sender, + "recipient_email": recipient, + "subject": subject, + "language": "en", + "eml_path": f"/var/spool/decnet-emails/{thread}/{msg}.eml", + "success": success, + "payload": {"model": "llama3.1"}, + } + + +@pytest.mark.asyncio +async def test_record_returns_uuid_and_serialises_payload(repo): + uuid = await repo.record_orchestrator_email(_row()) + assert isinstance(uuid, str) and len(uuid) == 36 + rows = await repo.list_orchestrator_emails() + assert len(rows) == 1 + # payload is stored as JSON text, list endpoint hands it back as the + # raw column value — we just verify it round-trips intact. + assert json.loads(rows[0]["payload"])["model"] == "llama3.1" + + +@pytest.mark.asyncio +async def test_list_filters_by_thread_and_mail_decky(repo): + await repo.record_orchestrator_email(_row(thread="t1", msg="")) + await repo.record_orchestrator_email(_row(thread="t2", msg="")) + await repo.record_orchestrator_email(_row(mail="d2", msg="")) + + by_thread = await repo.list_orchestrator_emails(thread_id="t1") + assert {r["message_id"] for r in by_thread} == {""} + + by_mail = await repo.list_orchestrator_emails(mail_decky_uuid="d1") + assert len(by_mail) == 2 + + everything = await repo.list_orchestrator_emails() + assert len(everything) == 3 + + +@pytest.mark.asyncio +async def test_count_orchestrator_emails(repo): + for i in range(3): + await repo.record_orchestrator_email(_row(msg=f"")) + assert await repo.count_orchestrator_emails() == 3 + assert await repo.count_orchestrator_emails(mail_decky_uuid="d1") == 3 + assert await repo.count_orchestrator_emails(mail_decky_uuid="other") == 0 + + +@pytest.mark.asyncio +async def test_thread_lookup_only_returns_pair_threads(repo): + await repo.record_orchestrator_email( + _row(sender="john@corp.com", recipient="sarah@corp.com", msg="") + ) + # Reverse direction (Sarah → John) should still match the same pair. + await repo.record_orchestrator_email( + _row(sender="sarah@corp.com", recipient="john@corp.com", msg="") + ) + # Unrelated pair must not match. + await repo.record_orchestrator_email( + _row(sender="mike@corp.com", recipient="sarah@corp.com", msg="") + ) + threads = await repo.list_orchestrator_email_threads( + "d1", "john@corp.com", "sarah@corp.com", + ) + assert {t["message_id"] for t in threads} == {"", ""} + + +@pytest.mark.asyncio +async def test_thread_lookup_excludes_failed_rows(repo): + await repo.record_orchestrator_email(_row(msg="", success=True)) + await repo.record_orchestrator_email(_row(msg="", success=False)) + threads = await repo.list_orchestrator_email_threads( + "d1", "john@corp.com", "sarah@corp.com", + ) + assert {t["message_id"] for t in threads} == {""} + + +@pytest.mark.asyncio +async def test_prune_caps_per_decky(repo): + # Insert 5 rows on d1 with strictly-increasing timestamps so the + # prune's "newest-first keep, drop the rest" deterministically picks + # the older two. + base = datetime.now(timezone.utc) - timedelta(hours=10) + for i in range(5): + await repo.record_orchestrator_email( + _row(msg=f"", ts=base + timedelta(minutes=i)) + ) + # Cap at 3 — expect 2 deleted. + deleted = await repo.prune_orchestrator_emails(per_decky_cap=3) + assert deleted == 2 + remaining = await repo.list_orchestrator_emails() + assert len(remaining) == 3 + # The three newest survived. + assert {r["message_id"] for r in remaining} == {"", "", ""} diff --git a/tests/orchestrator/emailgen/test_scheduler.py b/tests/orchestrator/emailgen/test_scheduler.py new file mode 100644 index 00000000..a0519a3e --- /dev/null +++ b/tests/orchestrator/emailgen/test_scheduler.py @@ -0,0 +1,161 @@ +"""Scheduler.pick() — async, takes a repo-shaped object.""" +from __future__ import annotations + +import json +from datetime import datetime +from typing import Any + +import pytest + +from decnet.orchestrator.emailgen import scheduler + + +_PERSONAS_TWO = [ + { + "name": "John Smith", + "email": "john@corp.com", + "role": "COO", + "tone": "formal", + "mannerisms": ["uses 'Best regards'"], + }, + { + "name": "Sarah Johnson", + "email": "sarah@corp.com", + "role": "PM", + "tone": "direct", + "mannerisms": ["uses bullets"], + }, +] + + +class _FakeRepo: + """Minimal repo stub matching the methods scheduler.pick() uses.""" + + def __init__( + self, + *, + deckies: list[dict[str, Any]] | None = None, + topologies: dict[str, dict[str, Any]] | None = None, + threads: list[dict[str, Any]] | None = None, + ): + self.deckies = deckies or [] + self.topologies = topologies or {} + self.threads = threads or [] + self.thread_calls = 0 + + async def list_running_topology_deckies(self): + return self.deckies + + async def get_topology(self, topology_id: str): + return self.topologies.get(topology_id) + + async def list_orchestrator_email_threads(self, *args, **kwargs): + self.thread_calls += 1 + return list(self.threads) + + +def _decky(uuid="d1", name="mailhost", services=("imap",), topology_id="t1"): + return { + "uuid": uuid, + "name": name, + "services": list(services), + "topology_id": topology_id, + } + + +def _topology(personas=_PERSONAS_TWO, language_default="en"): + return { + "id": "t1", + "email_personas": json.dumps(personas), + "language_default": language_default, + } + + +@pytest.mark.asyncio +async def test_pick_no_mail_decky_returns_none(): + repo = _FakeRepo(deckies=[_decky(services=("ssh",))]) + assert await scheduler.pick(repo) is None + + +@pytest.mark.asyncio +async def test_pick_unknown_topology_returns_none(): + repo = _FakeRepo(deckies=[_decky()]) + # No topology row for "t1" — scheduler should bail. + assert await scheduler.pick(repo) is None + + +@pytest.mark.asyncio +async def test_pick_topology_with_one_persona_returns_none(): + repo = _FakeRepo( + deckies=[_decky()], + topologies={"t1": _topology(personas=_PERSONAS_TWO[:1])}, + ) + assert await scheduler.pick(repo) is None + + +@pytest.mark.asyncio +async def test_pick_returns_action_for_valid_setup(): + repo = _FakeRepo( + deckies=[_decky()], + topologies={"t1": _topology()}, + ) + action = await scheduler.pick(repo, now=datetime(2026, 4, 26, 12, 0, 0)) + assert action is not None + assert action.mail_decky_uuid == "d1" + assert action.sender.email != action.recipient.email + assert action.thread_id # populated for both new and reply branches + + +@pytest.mark.asyncio +async def test_pick_active_hours_filter_kicks_in_at_midnight(): + repo = _FakeRepo( + deckies=[_decky()], + topologies={"t1": _topology()}, + ) + # Default active_hours is 09:00-18:00; midnight => everyone out of office. + action = await scheduler.pick(repo, now=datetime(2026, 4, 26, 3, 0, 0)) + assert action is None + + +@pytest.mark.asyncio +async def test_pick_uses_pop3_decky_too(): + repo = _FakeRepo( + deckies=[_decky(services=("pop3",))], + topologies={"t1": _topology()}, + ) + action = await scheduler.pick(repo, now=datetime(2026, 4, 26, 12, 0, 0)) + assert action is not None + + +@pytest.mark.asyncio +async def test_pick_reply_chain_sets_in_reply_to(): + threads = [{ + "thread_id": "thr1", + "message_id": "", + "subject": "Q3 budget", + }] + repo = _FakeRepo( + deckies=[_decky()], + topologies={"t1": _topology()}, + threads=threads, + ) + + # Force the "reply" branch by stubbing the RNG: random() < 0.6 is True. + class _Rng: + def __init__(self): + self.calls = 0 + + def choice(self, seq): + return seq[0] + + def random(self): + return 0.0 # always reply + + action = await scheduler.pick( + repo, rand=_Rng(), now=datetime(2026, 4, 26, 12, 0, 0), + ) + assert action is not None + assert action.is_reply is True + assert action.parent_message_id == "" + assert action.thread_id == "thr1" + assert action.subject_hint == "Re: Q3 budget" diff --git a/tests/orchestrator/emailgen/test_threads.py b/tests/orchestrator/emailgen/test_threads.py new file mode 100644 index 00000000..17e64f5b --- /dev/null +++ b/tests/orchestrator/emailgen/test_threads.py @@ -0,0 +1,61 @@ +"""Thread-chain helpers.""" +from __future__ import annotations + +from decnet.orchestrator.emailgen.threads import ( + ThreadChain, + new_message_id, + new_thread_id, + references_for_reply, + reply_subject, +) + + +def test_new_thread_id_is_uuid_string(): + tid = new_thread_id() + assert len(tid) == 36 + assert tid.count("-") == 4 + + +def test_new_message_id_format_with_domain(): + mid = new_message_id("example.com") + assert mid.startswith("<") and mid.endswith(">") + assert "@example.com" in mid + + +def test_new_message_id_handles_blank_domain(): + mid = new_message_id(" ") + assert "@localhost" in mid + + +def test_reply_subject_prepends_re(): + assert reply_subject("Q3 budget") == "Re: Q3 budget" + + +def test_reply_subject_collapses_existing_re(): + assert reply_subject("Re: Re: Q3 budget") == "Re: Q3 budget" + assert reply_subject("RE: Q3 budget") == "Re: Q3 budget" + + +def test_references_for_reply_root_is_empty(): + assert references_for_reply(None) == "" + + +def test_references_for_reply_appends_parent(): + chain = ThreadChain( + thread_id="t1", + parent_message_id="", + references=("",), + parent_subject="Re: budget", + ) + refs = references_for_reply(chain) + assert refs == " " + + +def test_references_empty_chain_starts_with_parent_only(): + chain = ThreadChain( + thread_id="t1", + parent_message_id="", + references=(), + parent_subject="budget", + ) + assert references_for_reply(chain) == "" diff --git a/tests/orchestrator/emailgen/test_worker_integration.py b/tests/orchestrator/emailgen/test_worker_integration.py new file mode 100644 index 00000000..4aef25ba --- /dev/null +++ b/tests/orchestrator/emailgen/test_worker_integration.py @@ -0,0 +1,136 @@ +"""End-to-end-ish: one emailgen tick against a real SQLite repo + FakeBus, +with the Ollama + docker-exec subprocess stubbed.""" +from __future__ import annotations + +import json + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.orchestrator.drivers import email as email_driver +from decnet.orchestrator.emailgen import worker as eg_worker +from decnet.orchestrator.emailgen.scheduler import EmailAction # noqa: F401 +from decnet.web.db.models import Topology, TopologyDecky +from decnet.web.db.sqlite.repository import SQLiteRepository + + +_PERSONAS = [ + { + "name": "John Smith", + "email": "john@corp.com", + "role": "COO", + "tone": "formal", + "mannerisms": ["uses 'Best regards'"], + "active_hours": "00:00-00:00", # always-on so test is hour-independent + }, + { + "name": "Sarah Johnson", + "email": "sarah@corp.com", + "role": "PM", + "tone": "direct", + "mannerisms": ["uses bullets"], + "active_hours": "00:00-00:00", + }, +] + + +@pytest_asyncio.fixture +async def repo(tmp_path): + r = SQLiteRepository(db_path=str(tmp_path / "decnet.db")) + await r.initialize() + yield r + await r.engine.dispose() + + +@pytest_asyncio.fixture +async def fake_bus(): + bus = FakeBus() + await bus.connect() + try: + yield bus + finally: + await bus.close() + + +async def _seed_mail_topology(repo: SQLiteRepository) -> str: + async with repo._session() as session: + topo = Topology( + name="t-mail", + config_snapshot="{}", + status="active", + email_personas=json.dumps(_PERSONAS), + language_default="en", + ) + session.add(topo) + await session.commit() + await session.refresh(topo) + decky = TopologyDecky( + topology_id=topo.id, + name="mailhost", + services=json.dumps(["imap"]), + ip="10.0.0.10", + state="running", + ) + session.add(decky) + await session.commit() + await session.refresh(decky) + return decky.uuid + + +@pytest.mark.asyncio +async def test_one_tick_records_and_publishes(repo, fake_bus, monkeypatch): + decky_uuid = await _seed_mail_topology(repo) + + async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0): + if argv[0] == "ollama": + return 0, "Subject: Hi\n\nBody here.\n", "" + return 0, "", "" + + monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture) + + received: list = [] + + async def collect(): + async with fake_bus.subscribe(f"orchestrator.email.{decky_uuid}") as sub: + async for ev in sub: + received.append(ev) + return + + import asyncio + collector = asyncio.create_task(collect()) + await asyncio.sleep(0) + + driver = email_driver.EmailDriver() + await eg_worker._one_tick(repo, driver, fake_bus) + await asyncio.wait_for(collector, timeout=2.0) + + rows = await repo.list_orchestrator_emails() + assert len(rows) == 1 + row = rows[0] + assert row["success"] is True + assert row["mail_decky_uuid"] == decky_uuid + assert row["subject"] == "Hi" + assert row["language"] == "en" + + assert len(received) == 1 + assert received[0].topic == f"orchestrator.email.{decky_uuid}" + assert received[0].payload["kind"] == "email" + assert received[0].payload["success"] is True + + +@pytest.mark.asyncio +async def test_one_tick_noop_when_no_mail_decky(repo, fake_bus, monkeypatch): + called = False + + async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0): + nonlocal called + called = True + return 0, "Subject: x\n\nb\n", "" + + monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture) + + driver = email_driver.EmailDriver() + await eg_worker._one_tick(repo, driver, fake_bus) + assert called is False + assert await repo.list_orchestrator_emails() == []