feat(emailgen): Ollama-driven fake email worker for IMAP/POP3 deckies

Second orchestrator worker (decnet emailgen) that drips persona-driven,
threaded, multi-language fake emails into running mail deckies.  Personas
live on Topology.email_personas; topology-wide language_default falls
through to any persona that doesn't pin its own.  Em-dashes are
suppressed at the prompt layer by default and only lifted for personas
explicitly marked uses_llms_heavily — em-dashes are an LLM tell and a
flat corpus of em-dashed mail is a giveaway.

EML delivery writes into /var/spool/decnet-emails/<thread>/<msg>.eml on
the mail decky via docker exec; wiring the IMAP/POP3 templates to read
from that spool (replacing the hardcoded _BAIT_EMAILS) is the next step.
This commit is contained in:
2026-04-26 22:16:19 -04:00
parent 674028d476
commit 3ee55ec341
25 changed files with 2343 additions and 1 deletions

View File

@@ -12,6 +12,7 @@ Token structure (NATS-style, dot-separated):
decky.{decky_id}.traffic
orchestrator.traffic.{decky_id}
orchestrator.file.{decky_id}
orchestrator.email.{decky_id}
attacker.observed
attacker.scored
attacker.session.started
@@ -172,6 +173,11 @@ CREDENTIAL_REUSE_DETECTED = "reuse.detected"
# stream via ``orchestrator.*.<decky_uuid>``.
ORCHESTRATOR_TRAFFIC = "traffic"
ORCHESTRATOR_FILE = "file"
# Emailgen — published by the ``decnet emailgen`` worker once per generated
# fake email delivered into a mail decky's maildir. Third token is the
# destination mail-decky uuid (the IMAP/POP3 host serving the mailbox),
# matching the ``orchestrator.*.<decky_uuid>`` subscription pattern.
ORCHESTRATOR_EMAIL = "email"
# System event types.
SYSTEM_LOG = "log"

View File

@@ -24,6 +24,7 @@ from . import (
bus,
db,
deploy,
emailgen,
forwarder,
geoip,
init,
@@ -56,7 +57,7 @@ for _mod in (
api, swarmctl, agent, updater, listener, forwarder,
swarm,
deploy, lifecycle, workers, inventory,
web, profiler, orchestrator, reconciler, sniffer, db,
web, profiler, orchestrator, emailgen, reconciler, sniffer, db,
topology, bus, geoip, init, webhook,
):
_mod.register(app)

66
decnet/cli/emailgen.py Normal file
View File

@@ -0,0 +1,66 @@
"""``decnet emailgen`` — second orchestrator worker.
Sibling of :mod:`decnet.cli.orchestrator`. Two distinct CLI entrypoints
match the "workers are independent, never coupled" principle: a wedged
ollama call in emailgen does not stall the SSH-flavoured orchestrator,
and systemd supervises each loop separately.
"""
from __future__ import annotations
import os
import typer
from . import utils as _utils
from .utils import console, log
def register(app: typer.Typer) -> None:
@app.command(name="emailgen")
def emailgen_cmd(
interval: int = typer.Option(
300, "--interval", "-i",
help="Seconds between fake-email generation ticks (default 5m)",
),
daemon: bool = typer.Option(
False, "--daemon", "-d",
help="Detach to background as a daemon process",
),
model: str = typer.Option(
"", "--model", "-m",
help="Ollama model override (defaults to $DECNET_EMAILGEN_MODEL "
"or 'llama3.1')",
),
) -> None:
"""Drip fake corporate emails into running IMAP/POP3 mail deckies."""
import asyncio
from decnet.orchestrator.emailgen import emailgen_worker
from decnet.web.dependencies import repo
if daemon:
log.info("emailgen daemonizing interval=%d", interval)
_utils._daemonize()
# Honour the env var when the flag was left empty so systemd unit
# files can configure the model centrally without per-host CLI
# tweaks. Empty -> let the worker apply its own default.
resolved_model = model or os.environ.get("DECNET_EMAILGEN_MODEL", "")
log.info(
"emailgen starting interval=%d model=%s",
interval, resolved_model or "default",
)
console.print(
f"[bold cyan]Emailgen starting[/] (interval: {interval}s"
f"{', model: ' + resolved_model if resolved_model else ''})"
)
async def _run() -> None:
await repo.initialize()
await emailgen_worker(
repo, interval=interval, model=resolved_model or None,
)
try:
asyncio.run(_run())
except KeyboardInterrupt:
console.print("\n[yellow]Emailgen stopped.[/]")

View File

@@ -0,0 +1,272 @@
"""Email driver — Ollama-backed EML generation + decky-side delivery.
One :class:`EmailAction` becomes one EML written into the mail decky's
configured emailgen spool directory (``/var/spool/decnet-emails/`` by
default). An integration follow-up wires the IMAP/POP3 service templates
to read EMLs from that spool at request time so attackers see the
generated mail in their MUA.
The Ollama call shells out via ``ollama run <model>`` — the prototype at
``DECNET-EMAILs/main.py`` proved the round-trip works. Output is
parsed-and-repaired into a valid EML using :mod:`email.mime.*`; the
worker then ``docker exec``\\s a ``tee`` to drop the file inside the
target container.
Per CLAUDE.md "no shell strings": every subprocess invocation uses an
argv list, never ``shell=True``. Ollama prompts and EML payloads are
piped via ``stdin``, not interpolated into argv.
"""
from __future__ import annotations
import asyncio
import os
import shlex
import time
from datetime import datetime, timezone
from email.mime.text import MIMEText
from email.utils import formatdate
from typing import Any, Optional
from decnet.logging import get_logger
from decnet.orchestrator.drivers.base import ActivityResult
from decnet.orchestrator.emailgen.prompt import PromptInputs, build as build_prompt
from decnet.orchestrator.emailgen.scheduler import EmailAction
from decnet.orchestrator.emailgen.threads import new_message_id
log = get_logger("orchestrator.email")
_DOCKER = "docker"
_OLLAMA = "ollama"
# Wall-clock cap for the LLM call. Big enough for a 4070 running
# llama3.1; small enough that a stuck Ollama server doesn't wedge the
# emailgen tick.
_DEFAULT_OLLAMA_TIMEOUT = float(os.environ.get("DECNET_EMAILGEN_TIMEOUT", "60"))
_DEFAULT_MODEL = os.environ.get("DECNET_EMAILGEN_MODEL", "llama3.1")
# docker-exec wall-clock cap for the per-EML write.
_DOCKER_TIMEOUT = 8.0
# Container suffix for the IMAP service on a mail decky.
_IMAP_CONTAINER_SUFFIX = "-imap"
_POP3_CONTAINER_SUFFIX = "-pop3"
# Spool path inside the container. Match the IMAP template's stubbed
# IMAP_EMAIL_SEED location once wiring lands; shipping the constant now
# lets that integration land independently.
_SPOOL_DIR = "/var/spool/decnet-emails"
async def _run_capture(
argv: list[str],
*,
stdin_data: Optional[bytes] = None,
timeout: float = _DOCKER_TIMEOUT,
) -> tuple[int, str, str]:
"""Spawn *argv*, optionally feeding *stdin_data*. Never raises."""
try:
proc = await asyncio.create_subprocess_exec(
*argv,
stdin=asyncio.subprocess.PIPE if stdin_data is not None else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as exc:
return 127, "", f"argv[0] not found: {exc}"
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(stdin_data), timeout=timeout,
)
except asyncio.TimeoutError:
try:
proc.kill()
except ProcessLookupError:
pass
return 124, "", "timeout"
return (
proc.returncode if proc.returncode is not None else -1,
stdout.decode("utf-8", "replace"),
stderr.decode("utf-8", "replace"),
)
def _container_for(decky_name: str, services: list[str]) -> str:
"""Pick the IMAP container if present, else POP3. Names follow the
``<decky_name>-<service>`` convention from the service templates."""
if "imap" in services:
return f"{decky_name}{_IMAP_CONTAINER_SUFFIX}"
return f"{decky_name}{_POP3_CONTAINER_SUFFIX}"
def _parse_subject_and_body(ollama_output: str) -> tuple[str, str]:
"""Split LLM output into (subject, body).
The prompt asks for ``Subject: <subject>\\n\\n<body>``. When the
model misbehaves (e.g. wraps in markdown fences or skips the
Subject line), fall back to a generic subject and treat the whole
output as body. Never raises.
"""
text = ollama_output.strip()
# Strip code fences if the model wrapped output.
if text.startswith("```"):
nl = text.find("\n")
if nl > 0:
text = text[nl + 1:]
if text.endswith("```"):
text = text[: -3]
text = text.strip()
lines = text.splitlines()
if lines and lines[0].lower().startswith("subject:"):
subject = lines[0].split(":", 1)[1].strip()
# Drop the (possibly empty) blank line after Subject.
body_lines = lines[1:]
if body_lines and not body_lines[0].strip():
body_lines = body_lines[1:]
body = "\n".join(body_lines).strip()
if not subject:
subject = "Business Communication"
return subject, body
return "Business Communication", text
def _build_eml(
*,
sender_name: str,
sender_email: str,
recipient_name: str,
recipient_email: str,
subject: str,
body: str,
message_id: str,
in_reply_to: Optional[str],
references: str,
ts: datetime,
) -> bytes:
"""Assemble a valid plain-text RFC 2822 EML."""
msg = MIMEText(body, "plain", "utf-8")
msg["From"] = f"{sender_name} <{sender_email}>"
msg["To"] = f"{recipient_name} <{recipient_email}>"
msg["Subject"] = subject
msg["Date"] = formatdate(ts.timestamp(), localtime=False)
msg["Message-ID"] = message_id
if in_reply_to:
msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = references
msg["MIME-Version"] = "1.0"
return msg.as_bytes()
class EmailDriver:
"""Concrete driver for :class:`EmailAction`.
Stateless across calls — Ollama model + timeout are constructor
args, not per-call. The driver does *not* know about the bus or
DB; it returns an :class:`ActivityResult` that the worker pipes
onward.
"""
def __init__(
self,
*,
model: str = _DEFAULT_MODEL,
ollama_timeout: float = _DEFAULT_OLLAMA_TIMEOUT,
spool_dir: str = _SPOOL_DIR,
) -> None:
self.model = model
self.ollama_timeout = ollama_timeout
self.spool_dir = spool_dir
async def run(self, action: EmailAction) -> ActivityResult:
# Look up the mail-decky container name + services. The driver
# receives a denormalised view via the action — the worker
# populates it from the same list the scheduler used.
return await self._run_email(action)
async def _run_email(self, action: EmailAction) -> ActivityResult:
t0 = time.monotonic()
prompt, mannerisms_used = build_prompt(
PromptInputs(
sender=action.sender,
recipient=action.recipient,
context_hint=action.context_hint,
parent_subject=action.subject_hint,
parent_excerpt=action.parent_excerpt,
)
)
rc, stdout, stderr = await _run_capture(
[_OLLAMA, "run", self.model],
stdin_data=prompt.encode("utf-8"),
timeout=self.ollama_timeout,
)
gen_ms = int((time.monotonic() - t0) * 1000)
if rc != 0 or not stdout.strip():
log.warning(
"emailgen ollama failed rc=%d stderr=%r model=%s",
rc, stderr[:200], self.model,
)
return ActivityResult(
success=False,
payload={
"stage": "ollama",
"rc": rc,
"stderr": stderr.strip()[:256],
"generation_ms": gen_ms,
"model": self.model,
"thread_id": action.thread_id,
},
)
subject, body = _parse_subject_and_body(stdout)
message_id = new_message_id(action.sender.email.split("@", 1)[1])
ts = datetime.now(timezone.utc)
eml_bytes = _build_eml(
sender_name=action.sender.name,
sender_email=action.sender.email,
recipient_name=action.recipient.name,
recipient_email=action.recipient.email,
subject=subject,
body=body,
message_id=message_id,
in_reply_to=action.parent_message_id,
references=action.references,
ts=ts,
)
# Drop the EML into the mail decky's spool dir over docker exec.
# File path: <spool>/<thread_id>/<uuid-from-message-id>.eml.
# Per-thread sub-directory keeps `ls` in the spool readable by
# operators inspecting the running decoy.
eml_filename = message_id.strip("<>").replace("@", "_at_") + ".eml"
eml_dir = f"{self.spool_dir.rstrip('/')}/{action.thread_id}"
eml_path = f"{eml_dir}/{eml_filename}"
container = _container_for(
action.mail_decky_name, list(action.mail_decky_services),
)
sh_cmd = (
f"mkdir -p {shlex.quote(eml_dir)} && "
f"tee {shlex.quote(eml_path)} >/dev/null"
)
argv = [_DOCKER, "exec", "-i", container, "sh", "-c", sh_cmd]
rc2, _stdout2, stderr2 = await _run_capture(
argv, stdin_data=eml_bytes, timeout=_DOCKER_TIMEOUT,
)
success = rc2 == 0
payload: dict[str, Any] = {
"stage": "delivered" if success else "delivery",
"model": self.model,
"generation_ms": gen_ms,
"bytes": len(eml_bytes),
"thread_id": action.thread_id,
"message_id": message_id,
"subject": subject,
"language": action.sender.language or "en",
"mannerisms_used": mannerisms_used,
"is_reply": action.is_reply,
"container": container,
"eml_path": eml_path,
"rc": rc2,
"stderr": stderr2.strip()[:256] if not success else None,
}
if not success:
log.warning(
"emailgen delivery failed container=%s rc=%d stderr=%r",
container, rc2, stderr2[:200],
)
return ActivityResult(success=success, payload=payload)

View File

@@ -0,0 +1,33 @@
"""Emailgen — second orchestrator worker.
Generates fake corporate emails (multi-language, threaded, persona-driven)
and drops them into mail-decky maildirs so attackers landing on
IMAP/POP3 honeypots find believable mailboxes instead of empty inboxes.
The module is intentionally a sibling of :mod:`decnet.orchestrator` (not
a flag on it) — separate worker, separate CLI command
(``decnet emailgen``), separate systemd-supervised lifecycle. Shares the
heartbeat / control-listener scaffolding via :mod:`decnet.bus.publish`.
Lazy worker re-export: :func:`emailgen_worker` is loaded on first
attribute access so that submodules can import package-level names
(``decnet.orchestrator.emailgen.prompt``) without triggering an eager
load of the worker — and through it, the email driver, which imports
back into this package. Without lazy loading the package + driver +
worker form a cycle.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING: # pragma: no cover - typing only
from decnet.orchestrator.emailgen.worker import emailgen_worker # noqa: F401
__all__ = ["emailgen_worker"]
def __getattr__(name: str) -> Any:
if name == "emailgen_worker":
from decnet.orchestrator.emailgen.worker import emailgen_worker as _w
return _w
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -0,0 +1,49 @@
"""DB-row + bus-topic helpers for the emailgen worker.
Mirror of :mod:`decnet.orchestrator.events` for the email action class.
Kept in its own module so the SSH-flavoured orchestrator and the
emailgen worker don't accumulate cross-imports of each other's action
types.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from decnet.bus import topics as _topics
from decnet.orchestrator.drivers.base import ActivityResult
from decnet.orchestrator.emailgen.scheduler import EmailAction
def to_row(action: EmailAction, result: ActivityResult) -> dict[str, Any]:
"""Build the kwargs dict for ``OrchestratorEmail(**...)``.
Pulls ``message_id`` / ``subject`` / ``language`` out of the
driver's ``payload`` rather than off the action — the EML's
Message-ID is generated inside the driver after the LLM call so
we know it matches what landed on disk.
"""
payload = result.payload or {}
return {
"ts": datetime.now(timezone.utc),
"mail_decky_uuid": action.mail_decky_uuid,
"thread_id": action.thread_id,
"message_id": payload.get("message_id", ""),
"in_reply_to": action.parent_message_id,
"sender_email": action.sender.email,
"recipient_email": action.recipient.email,
"subject": payload.get("subject", ""),
"language": payload.get("language", action.sender.language or "en"),
"eml_path": payload.get("eml_path", ""),
"success": result.success,
"payload": payload, # repo serialises dict→json
}
def topic_for(action: EmailAction) -> str:
"""Map an email action to its bus topic."""
return _topics.orchestrator(_topics.ORCHESTRATOR_EMAIL, action.mail_decky_uuid)
def event_type_for(action: EmailAction) -> str: # noqa: ARG001 — symmetry
return _topics.ORCHESTRATOR_EMAIL

View File

@@ -0,0 +1,119 @@
"""Persona schema for the emailgen worker.
Stored as a JSON list on :attr:`Topology.email_personas`. Each persona
describes one fictional employee whose mailbox lives on the topology's
IMAP/POP3 decky. The schema deliberately stays narrow: the LLM gets
*enough* differentiation to write distinct voices, no more.
Invalid entries are dropped with a warning (returned alongside the
parsed list) rather than raising — a single typo in one persona must
not stall the entire emailgen tick.
"""
from __future__ import annotations
import json
from typing import Literal, Optional
from pydantic import BaseModel, Field, ValidationError, field_validator
from decnet.logging import get_logger
logger = get_logger("orchestrator.emailgen")
Tone = Literal["formal", "direct", "casual", "technical"]
ReplyLatency = Literal["fast", "normal", "slow"]
class EmailPersona(BaseModel):
"""One fake mailbox owner.
``language`` is ISO 639-1 (``en``, ``es``, ``pt``…); when unset on the
persona it falls back to the topology's ``language_default``.
``uses_llms_heavily`` lifts the prompt-layer em-dash suppression for
that persona — em-dashes are an LLM tell, but a persona explicitly
pegged as a heavy LLM user should *naturally* produce them.
"""
name: str = Field(min_length=1, max_length=128)
email: str = Field(min_length=3, max_length=255)
role: str = Field(min_length=1, max_length=128)
tone: Tone = "formal"
mannerisms: list[str] = Field(default_factory=list, max_length=12)
language: Optional[str] = Field(default=None, max_length=8)
signature: Optional[str] = Field(default=None, max_length=512)
active_hours: str = Field(default="09:00-18:00", max_length=32)
reply_latency: ReplyLatency = "normal"
uses_llms_heavily: bool = False
@field_validator("email")
@classmethod
def _email_shape(cls, v: str) -> str:
# Cheap structural check — full RFC 5322 isn't worth the
# dependency. We only need ``user@domain`` with non-empty parts
# for the prompt builder + Message-ID generator.
if "@" not in v:
raise ValueError("email must contain '@'")
local, _, domain = v.rpartition("@")
if not local or not domain or "." not in domain:
raise ValueError("email must look like user@domain.tld")
return v
def parse_personas(
raw: str | list | None,
*,
language_default: str = "en",
) -> list[EmailPersona]:
"""Parse the JSON-or-list ``email_personas`` value into models.
Resolves ``language`` against *language_default* so downstream
consumers (prompt builder, scheduler) never need to know about
fallback semantics.
"""
if not raw:
return []
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError as exc:
logger.warning("emailgen personas: invalid JSON, skipping: %s", exc)
return []
if not isinstance(raw, list):
logger.warning(
"emailgen personas: expected list, got %s", type(raw).__name__
)
return []
out: list[EmailPersona] = []
for i, entry in enumerate(raw):
try:
persona = EmailPersona.model_validate(entry)
except ValidationError as exc:
logger.warning(
"emailgen personas: dropping invalid entry index=%d: %s",
i, exc.errors(include_url=False),
)
continue
if persona.language is None:
persona = persona.model_copy(update={"language": language_default})
out.append(persona)
return out
def in_active_hours(persona: EmailPersona, now_hour: int) -> bool:
"""Return True if *now_hour* (023) falls in the persona's window.
Format: ``"HH:MM-HH:MM"``. Wrap-around windows (``"22:00-06:00"``)
are supported. Invalid windows treat the persona as always-on so a
config typo never silences the whole fleet.
"""
try:
start_s, end_s = persona.active_hours.split("-")
start_h = int(start_s.split(":")[0])
end_h = int(end_s.split(":")[0])
except (ValueError, IndexError):
return True
if start_h == end_h:
return True
if start_h < end_h:
return start_h <= now_hour < end_h
# Wrap-around (e.g. 22:00-06:00).
return now_hour >= start_h or now_hour < end_h

View File

@@ -0,0 +1,151 @@
"""Ollama prompt builder for emailgen.
The LLM gets a tightly-scoped instruction and a small handful of
deterministic constraints. Persona mannerisms are *pre-selected* in
Python (12 of the persona's full list) and injected as hard rules —
small models otherwise treat the mannerism list as flavour text and
ignore it, and the corpus collapses into one voice.
**Em-dash suppression** is on by default; suppression is lifted only
for personas that opt in via ``uses_llms_heavily``. Em-dashes are a
strong stylometric tell for LLM-authored prose, and a honeypot mailbox
where every author uses them is a tell.
"""
from __future__ import annotations
import secrets
from dataclasses import dataclass
from typing import Optional
from decnet.orchestrator.emailgen.personas import EmailPersona
@dataclass(frozen=True)
class PromptInputs:
sender: EmailPersona
recipient: EmailPersona
context_hint: str
parent_subject: Optional[str] = None # set when replying
parent_excerpt: Optional[str] = None # short snippet of last msg
_LANGUAGE_NAMES = {
"en": "English",
"es": "Spanish",
"pt": "Portuguese",
"fr": "French",
"de": "German",
"it": "Italian",
"nl": "Dutch",
"ja": "Japanese",
"zh": "Chinese",
}
def _lang_label(code: str) -> str:
return _LANGUAGE_NAMES.get(code.lower(), code)
def select_mannerisms(
persona: EmailPersona,
*,
rng: Optional[secrets.SystemRandom] = None,
n: int = 2,
) -> list[str]:
"""Pick *n* mannerisms deterministically given *rng*.
Returns up to *n*; falls back to the full list when the persona
declares fewer. Determinism (under a seeded RNG) is what makes
tests practical — otherwise mannerism injection is unverifiable.
"""
rnd = rng or secrets.SystemRandom()
pool = list(persona.mannerisms)
if not pool:
return []
if len(pool) <= n:
return pool
rnd.shuffle(pool)
return pool[:n]
def build(
inputs: PromptInputs,
*,
rng: Optional[secrets.SystemRandom] = None,
) -> tuple[str, list[str]]:
"""Return ``(prompt, mannerisms_used)``.
``mannerisms_used`` flows back into the persisted ``payload`` JSON
so an analyst can see *why* a given email reads the way it does.
"""
sender = inputs.sender
recipient = inputs.recipient
language = _lang_label(sender.language or "en")
mannerisms = select_mannerisms(sender, rng=rng)
mannerism_block = (
"\n".join(f"- {m}" for m in mannerisms)
if mannerisms
else "- (no specific mannerisms; write in the persona's tone)"
)
if sender.uses_llms_heavily:
em_dash_rule = (
"Em-dashes are fine — this persona uses them naturally. "
"Write in your usual style."
)
else:
em_dash_rule = (
"Do NOT use em-dashes (—). Use commas, periods, or "
"parentheses instead. Em-dashes are a tell."
)
sig_block = (
f"Use this exact signature block:\n{sender.signature}"
if sender.signature
else "End with a short, plausible signature for the persona's role."
)
if inputs.parent_subject:
thread_block = (
f"This is a REPLY in an ongoing thread.\n"
f"- Parent subject: {inputs.parent_subject}\n"
f"- Parent excerpt: {inputs.parent_excerpt or '(no excerpt)'}\n"
f"- Begin the body assuming the recipient already read the parent.\n"
)
subject_rule = (
"Subject must be the parent subject prefixed with 'Re: ' "
"(no double 'Re: Re:')."
)
else:
thread_block = "This is a NEW thread (no prior context)."
subject_rule = (
"Generate a short, specific subject line (≤ 80 chars) "
"appropriate to the context."
)
prompt = f"""You are writing one corporate email, RFC 2822 plain-text body only.
Persona — sender:
- Name: {sender.name}
- Role: {sender.role}
- Tone: {sender.tone}
- Mannerisms (must show through):
{mannerism_block}
Persona — recipient:
- Name: {recipient.name}
- Role: {recipient.role}
Context hint: {inputs.context_hint}
Thread context:
{thread_block}
Hard rules:
1. Write the email body in {language}. Do not translate or code-switch.
2. {em_dash_rule}
3. {subject_rule}
4. {sig_block}
5. Output ONLY the email — first line is "Subject: <subject>", then a blank line, then the body. No commentary, no markdown fences, no preamble.
"""
return prompt.strip(), mannerisms

View File

@@ -0,0 +1,228 @@
"""Action picker for the emailgen worker.
One tick = one (mail-decky, sender, recipient, [thread]) decision.
Scope (v1):
- Only TopologyDeckies are eligible mail hosts. Fleet / SWARM-shard
mail-deckies are out of scope per the plan; they get covered when the
forwarder pattern lands for emailgen.
- Mail decky = a running TopologyDecky whose ``services`` includes
``imap`` or ``pop3``.
- Personas come from ``Topology.email_personas`` (JSON list of
:class:`EmailPersona`). Topology-wide ``language_default`` fills in
any persona that didn't set its own.
Returns ``None`` (skip tick) when:
- no running mail decky,
- the mail decky's topology has fewer than two valid personas,
- nobody is in their ``active_hours`` window right now.
"""
from __future__ import annotations
import secrets
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
from decnet.logging import get_logger
from decnet.orchestrator.emailgen.personas import (
EmailPersona,
in_active_hours,
parse_personas,
)
from decnet.orchestrator.emailgen.threads import (
ThreadChain,
new_thread_id,
references_for_reply,
reply_subject,
)
logger = get_logger("orchestrator.emailgen")
_MAIL_SERVICES = ("imap", "pop3")
# Probability of replying on an existing thread when one exists. The
# inverse starts a fresh thread. 0.6 mirrors what mailbox studies find
# for active corporate inboxes — most messages are replies, but not
# overwhelmingly so.
_REPLY_PROBABILITY = 0.6
# Generic context hints fed to the LLM when starting a new thread.
# Deliberately broad — the persona's tone + role is what shapes the
# email; the hint just gives the model a topic to riff on.
_CONTEXT_HINTS: tuple[str, ...] = (
"Q3 budget review and approval",
"Client presentation feedback",
"Project deadline extension request",
"Team building event planning",
"IT system maintenance notification",
"Quarterly performance review",
"Vendor onboarding process",
"Holiday schedule announcement",
"Training session invitation",
"Department restructuring update",
"Client contract negotiation",
"Security audit findings",
"Sales strategy meeting",
"Product launch timeline",
"Office relocation update",
"Travel reimbursement policy change",
)
@dataclass(frozen=True)
class EmailAction:
"""One emailgen tick's decision.
``thread_id`` is non-None whenever this action is a reply; the
worker writes it back to the DB so future ticks can chain further
replies. ``in_reply_to`` / ``references`` mirror the RFC 2822
headers we'll set on the EML.
``mail_decky_name`` / ``mail_decky_services`` are denormalised onto
the action so the driver doesn't need a second repo round-trip just
to resolve the container name.
"""
mail_decky_uuid: str
mail_decky_name: str
mail_decky_services: tuple[str, ...]
sender: EmailPersona
recipient: EmailPersona
thread_id: str
parent_message_id: Optional[str]
references: str
subject_hint: Optional[str] # used as parent subject when replying
parent_excerpt: Optional[str] # excerpt from the parent body
context_hint: str # only meaningful on new threads
is_reply: bool
description: str = "email:send"
def _is_mail_decky(decky: dict[str, Any]) -> bool:
services = decky.get("services") or []
if isinstance(services, str):
return False
return any(s in services for s in _MAIL_SERVICES)
async def pick(
repo: Any,
*,
rand: Optional[secrets.SystemRandom] = None,
now: Optional[datetime] = None,
) -> Optional[EmailAction]:
"""Pick one email action against the running fleet.
*repo* is a :class:`BaseRepository`; we fetch running topology
deckies + their parent topology row directly. *now* is the
wall-clock used for ``active_hours`` filtering — injected so tests
can pin the hour deterministically.
"""
rng = rand or secrets.SystemRandom()
now_dt = now or datetime.now()
deckies = await repo.list_running_topology_deckies()
mail_deckies = [d for d in deckies if _is_mail_decky(d)]
if not mail_deckies:
logger.debug("emailgen pick: no running mail decky")
return None
mail_decky = rng.choice(mail_deckies)
topology_id = mail_decky.get("topology_id")
if not topology_id:
logger.debug("emailgen pick: mail decky has no topology_id")
return None
topology = await repo.get_topology(topology_id)
if not topology:
logger.debug("emailgen pick: topology %s not found", topology_id)
return None
personas = parse_personas(
topology.get("email_personas"),
language_default=topology.get("language_default") or "en",
)
if len(personas) < 2:
logger.debug(
"emailgen pick: topology=%s has only %d personas; need >=2",
topology_id, len(personas),
)
return None
active = [p for p in personas if in_active_hours(p, now_dt.hour)]
if len(active) < 2:
logger.debug(
"emailgen pick: topology=%s only %d personas in-hours",
topology_id, len(active),
)
return None
sender = rng.choice(active)
recipient = rng.choice([p for p in active if p.email != sender.email])
# Look up open threads between this pair on this mail decky.
chain = await _maybe_pick_chain(
repo, mail_decky["uuid"], sender, recipient, rng=rng,
)
services = tuple(mail_decky.get("services") or ())
decky_name = mail_decky.get("name") or ""
if chain is not None:
return EmailAction(
mail_decky_uuid=mail_decky["uuid"],
mail_decky_name=decky_name,
mail_decky_services=services,
sender=sender,
recipient=recipient,
thread_id=chain.thread_id,
parent_message_id=chain.parent_message_id,
references=references_for_reply(chain),
subject_hint=chain.parent_subject,
parent_excerpt=None, # repo can populate later if useful
context_hint=chain.parent_subject,
is_reply=True,
)
return EmailAction(
mail_decky_uuid=mail_decky["uuid"],
mail_decky_name=decky_name,
mail_decky_services=services,
sender=sender,
recipient=recipient,
thread_id=new_thread_id(),
parent_message_id=None,
references="",
subject_hint=None,
parent_excerpt=None,
context_hint=rng.choice(_CONTEXT_HINTS),
is_reply=False,
)
async def _maybe_pick_chain(
repo: Any,
mail_decky_uuid: str,
sender: EmailPersona,
recipient: EmailPersona,
*,
rng: secrets.SystemRandom,
) -> Optional[ThreadChain]:
"""Probabilistically pick an open thread between the pair, or None."""
if rng.random() >= _REPLY_PROBABILITY:
return None
threads = await repo.list_orchestrator_email_threads(
mail_decky_uuid, sender.email, recipient.email, limit=20,
)
if not threads:
return None
head = threads[0]
return ThreadChain(
thread_id=head["thread_id"],
parent_message_id=head["message_id"],
# We don't reconstruct the full ancestry from row history here —
# the parent's References + parent's Message-ID would do that.
# For v1, single-step references is fine; mail clients still
# group correctly by (Subject + In-Reply-To).
references=tuple(),
parent_subject=reply_subject(head["subject"]),
)

View File

@@ -0,0 +1,75 @@
"""RFC 2822 thread-chain bookkeeping.
A thread is a worker-side UUID that groups one or more emails between
the same two personas. ``In-Reply-To`` carries the immediate parent's
``Message-ID``; ``References`` carries the full ancestry chain.
The emailgen scheduler queries the repository for the most recent email
in any thread between (sender, recipient); if it finds one, it emits a
reply (continuing the chain). Otherwise it starts a new thread.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class ThreadChain:
"""Immutable view of a thread's chain at a point in time.
``thread_id`` is opaque (UUID). ``parent_message_id`` is the most
recent message in the chain — the new reply's ``In-Reply-To`` field.
``references`` is the dot-separated history fed into the
``References:`` header (oldest-first per RFC 2822 §3.6.4).
``parent_subject`` carries the subject we're replying to, so the
reply can prepend ``Re:`` correctly.
"""
thread_id: str
parent_message_id: str
references: tuple[str, ...]
parent_subject: str
def new_thread_id() -> str:
return str(uuid.uuid4())
def reply_subject(parent_subject: str) -> str:
"""Prepend ``Re:`` to *parent_subject* if not already a reply.
Folds repeat ``Re: Re: Re:`` into a single ``Re:`` — Outlook /
Thunderbird both do this and an attacker reading the maildir would
notice the corpus's missing convention immediately.
"""
s = parent_subject.strip()
lowered = s.lower()
while lowered.startswith("re:"):
s = s[3:].lstrip()
lowered = s.lower()
return f"Re: {s}"
def references_for_reply(chain: Optional[ThreadChain]) -> str:
"""Build the ``References:`` header value for a reply.
Returns a space-separated list of message-ids, oldest-first, with
the parent appended. Empty string when *chain* is None (root).
"""
if chain is None:
return ""
refs = list(chain.references) + [chain.parent_message_id]
return " ".join(refs)
def new_message_id(domain: str) -> str:
"""Build an RFC 2822 ``Message-ID`` value (incl. angle brackets).
Worker side — the value is also stored in the DB so a future reply
can be threaded against it. Domain mirrors the sender's email
domain so an attacker grepping for tells doesn't find every
fake-corp email tagged with ``@example.com``.
"""
safe_domain = domain.strip() or "localhost"
return f"<{uuid.uuid4().hex}@{safe_domain}>"

View File

@@ -0,0 +1,131 @@
"""Emailgen main loop.
Mirrors :mod:`decnet.orchestrator.worker` shape: same heartbeat, same
control listener, same fire-and-forget bus publish, same prune knob.
A wedged ollama call stalls only this worker, never the SSH-flavoured
orchestrator running alongside.
"""
from __future__ import annotations
import asyncio
import contextlib
from decnet.bus.factory import get_bus
from decnet.bus.publish import (
publish_safely,
run_control_listener,
run_health_heartbeat,
)
from decnet.logging import get_logger
from decnet.orchestrator.drivers.email import EmailDriver
from decnet.orchestrator.emailgen import events, scheduler
from decnet.web.db.repository import BaseRepository
logger = get_logger("orchestrator.emailgen")
# Periodic-prune knobs — same shape as orchestrator/worker.py.
_PRUNE_EVERY_TICKS = 100
_PRUNE_PER_DECKY_CAP = 5000
async def emailgen_worker(
repo: BaseRepository,
*,
interval: int = 300,
model: str | None = None,
) -> None:
"""Periodically generate one fake email into a running mail decky.
Default interval is 5 minutes — emails are expensive (LLM round
trip) and don't need to fire every minute to look natural. Honors
``system.emailgen.control`` for graceful shutdown.
"""
logger.info("emailgen worker started interval=%ds model=%s", interval, model)
bus = None
try:
bus = get_bus(client_name="emailgen")
await bus.connect()
except Exception as exc: # noqa: BLE001
logger.warning(
"emailgen: bus unavailable, continuing without publish: %s", exc
)
bus = None
driver = EmailDriver(model=model) if model else EmailDriver()
shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "emailgen"))
control_task = asyncio.create_task(
run_control_listener(bus, "emailgen", shutdown),
)
tick_n = 0
try:
while not shutdown.is_set():
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
except asyncio.TimeoutError:
pass # normal tick
if shutdown.is_set():
break
try:
await _one_tick(repo, driver, bus)
except Exception as exc: # noqa: BLE001
logger.error("emailgen tick failed: %s", exc)
tick_n += 1
if tick_n % _PRUNE_EVERY_TICKS == 0:
try:
deleted = await repo.prune_orchestrator_emails(
per_decky_cap=_PRUNE_PER_DECKY_CAP,
)
if deleted:
logger.info(
"emailgen prune deleted=%d cap=%d",
deleted, _PRUNE_PER_DECKY_CAP,
)
except Exception as exc: # noqa: BLE001
logger.error("emailgen prune failed: %s", exc)
finally:
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
async def _one_tick(repo: BaseRepository, driver: EmailDriver, bus) -> None:
action = await scheduler.pick(repo)
if action is None:
logger.debug("emailgen: no actionable mail decky / personas this tick")
return
result = await driver.run(action)
row = events.to_row(action, result)
await repo.record_orchestrator_email(row)
if bus is not None:
topic = events.topic_for(action)
# Mirror the orchestrator-event SSE-friendly payload shape: ts
# as iso8601, payload as already-serialised dict.
bus_payload = {
"kind": "email",
"mail_decky_uuid": row["mail_decky_uuid"],
"thread_id": row["thread_id"],
"message_id": row["message_id"],
"in_reply_to": row["in_reply_to"],
"sender_email": row["sender_email"],
"recipient_email": row["recipient_email"],
"subject": row["subject"],
"language": row["language"],
"success": row["success"],
"ts": row["ts"].isoformat(),
}
await publish_safely(
bus, topic, bus_payload, event_type=events.event_type_for(action),
)
logger.info(
"emailgen tick mail_decky=%s thread=%s success=%s reply=%s",
row["mail_decky_uuid"], row["thread_id"], row["success"], action.is_reply,
)

View File

@@ -58,6 +58,8 @@ from .health import (
HealthResponse,
)
from .orchestrator import (
OrchestratorEmail,
OrchestratorEmailsResponse,
OrchestratorEvent,
OrchestratorEventsResponse,
)
@@ -193,6 +195,8 @@ __all__ = [
"ComponentHealth",
"HealthResponse",
# orchestrator
"OrchestratorEmail",
"OrchestratorEmailsResponse",
"OrchestratorEvent",
"OrchestratorEventsResponse",
# logs

View File

@@ -60,3 +60,52 @@ class OrchestratorEventsResponse(BaseModel):
limit: int
offset: int
data: List[dict[str, Any]]
class OrchestratorEmail(SQLModel, table=True):
"""One fake email generated by the ``decnet emailgen`` worker.
Sibling table to :class:`OrchestratorEvent` — kept disjoint because
email rows carry domain-specific fields (subject, message_id,
in_reply_to, language) that have no analogue in the SSH/file events
and would otherwise bloat ``OrchestratorEvent.payload``.
The mail decky's UUID lives in ``mail_decky_uuid`` (the host serving
the IMAP/POP3 mailbox). ``thread_id`` is a worker-side UUID used to
chain replies; ``in_reply_to`` is the parent email's RFC 2822
Message-ID header value (or ``None`` for thread roots).
``payload`` follows the same loose-JSON convention as
:class:`OrchestratorEvent`: ``bytes``, ``generation_ms``, ``model``,
``mannerisms_used``, etc. The worker can extend it without a
migration.
"""
__tablename__ = "orchestrator_emails"
__table_args__ = (
Index("ix_orchestrator_emails_mail_ts", "mail_decky_uuid", "ts"),
Index("ix_orchestrator_emails_thread", "thread_id"),
)
uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
ts: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
mail_decky_uuid: str = Field(index=True)
thread_id: str = Field(index=True)
message_id: str = Field(max_length=255)
in_reply_to: Optional[str] = Field(default=None, max_length=255)
sender_email: str = Field(max_length=255, index=True)
recipient_email: str = Field(max_length=255, index=True)
subject: str = Field(max_length=512)
language: str = Field(max_length=8, default="en")
eml_path: str = Field(max_length=1024)
success: bool = Field(default=False, index=True)
payload: str = Field(
sa_column=Column("payload", Text, nullable=False, default="{}")
)
class OrchestratorEmailsResponse(BaseModel):
total: int
limit: int
offset: int
data: List[dict[str, Any]]

View File

@@ -47,6 +47,18 @@ class Topology(SQLModel, table=True):
# running. Drained by the mutator watch loop, which re-pushes via
# AgentClient and clears the flag. NULL for unihost topologies.
needs_resync: bool = Field(default=False, nullable=False)
# JSON-serialised list of EmailPersona dicts consumed by the
# ``decnet emailgen`` worker. Empty list = no fake mailbox owners
# configured for this topology, the worker skips it.
email_personas: str = Field(
sa_column=Column(
"email_personas", _BIG_TEXT, nullable=False, default="[]"
)
)
# ISO 639-1 language code applied to any persona that doesn't override
# ``language`` itself. English by default; ANTI's deployments default
# to "es" by editing this column.
language_default: str = Field(default="en", max_length=8)
class LAN(SQLModel, table=True):

View File

@@ -952,3 +952,57 @@ class BaseRepository(ABC):
unbounded growth without paying the cost on every write.
"""
raise NotImplementedError
async def record_orchestrator_email(self, data: dict[str, Any]) -> str:
"""Insert one orchestrator-generated email row, returning its uuid."""
raise NotImplementedError
async def list_orchestrator_emails(
self,
limit: int = 100,
offset: int = 0,
*,
mail_decky_uuid: Optional[str] = None,
thread_id: Optional[str] = None,
since_ts: Optional[Any] = None,
) -> list[dict[str, Any]]:
"""Paginated orchestrator emails newest-first.
Optional filters narrow to a single mail decky or to one thread,
used by the dashboard's mailbox-inspector view.
"""
raise NotImplementedError
async def count_orchestrator_emails(
self,
*,
mail_decky_uuid: Optional[str] = None,
) -> int:
"""Total orchestrator-email rows, optionally filtered by mail decky."""
raise NotImplementedError
async def list_orchestrator_email_threads(
self,
mail_decky_uuid: str,
sender_email: str,
recipient_email: str,
*,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Open threads between *sender_email* and *recipient_email* on
*mail_decky_uuid*, newest-first.
Used by the emailgen scheduler to decide whether to start a new
thread or reply on an existing one. Each entry is one row's
worth of dict — the worker only needs ``thread_id`` and the most
recent ``message_id`` / ``subject`` to build the reply.
"""
raise NotImplementedError
async def prune_orchestrator_emails(self, per_decky_cap: int = 10000) -> int:
"""Trim per-``mail_decky_uuid`` rows to a cap. Returns deleted count.
Mirrors :meth:`prune_orchestrator_events`; emailgen worker calls
this on a periodic tick.
"""
raise NotImplementedError

View File

@@ -51,6 +51,7 @@ from decnet.web.db.models import (
TopologyEdge,
TopologyStatusEvent,
TopologyMutation,
OrchestratorEmail,
OrchestratorEvent,
WebhookSubscription,
)
@@ -3003,3 +3004,113 @@ class SQLModelRepository(BaseRepository):
deleted += res.rowcount or 0
await session.commit()
return deleted
# ---------------------------------------------------------- emailgen
async def record_orchestrator_email(self, data: dict[str, Any]) -> str:
payload = data.get("payload")
if isinstance(payload, (dict, list)):
data = {**data, "payload": json.dumps(payload)}
async with self._session() as session:
row = OrchestratorEmail(**data)
session.add(row)
await session.commit()
await session.refresh(row)
return row.uuid
async def list_orchestrator_emails(
self,
limit: int = 100,
offset: int = 0,
*,
mail_decky_uuid: Optional[str] = None,
thread_id: Optional[str] = None,
since_ts: Optional[datetime] = None,
) -> list[dict[str, Any]]:
async with self._session() as session:
stmt = select(OrchestratorEmail)
if mail_decky_uuid is not None:
stmt = stmt.where(
OrchestratorEmail.mail_decky_uuid == mail_decky_uuid
)
if thread_id is not None:
stmt = stmt.where(OrchestratorEmail.thread_id == thread_id)
if since_ts is not None:
stmt = stmt.where(OrchestratorEmail.ts >= since_ts)
stmt = (
stmt.order_by(desc(OrchestratorEmail.ts))
.offset(offset)
.limit(limit)
)
result = await session.execute(stmt)
return [r.model_dump(mode="json") for r in result.scalars().all()]
async def count_orchestrator_emails(
self,
*,
mail_decky_uuid: Optional[str] = None,
) -> int:
stmt = select(func.count()).select_from(OrchestratorEmail)
if mail_decky_uuid is not None:
stmt = stmt.where(OrchestratorEmail.mail_decky_uuid == mail_decky_uuid)
async with self._session() as session:
result = await session.execute(stmt)
return result.scalar() or 0
async def list_orchestrator_email_threads(
self,
mail_decky_uuid: str,
sender_email: str,
recipient_email: str,
*,
limit: int = 50,
) -> list[dict[str, Any]]:
# Most-recent row per (sender, recipient) pair under this mail decky.
# The scheduler only needs the latest message_id/subject/thread_id to
# construct a reply; older rows in the same thread aren't relevant
# for the "do we reply or start fresh" decision.
async with self._session() as session:
stmt = (
select(OrchestratorEmail)
.where(
OrchestratorEmail.mail_decky_uuid == mail_decky_uuid,
or_(
(OrchestratorEmail.sender_email == sender_email)
& (OrchestratorEmail.recipient_email == recipient_email),
(OrchestratorEmail.sender_email == recipient_email)
& (OrchestratorEmail.recipient_email == sender_email),
),
OrchestratorEmail.success.is_(True),
)
.order_by(desc(OrchestratorEmail.ts))
.limit(limit)
)
result = await session.execute(stmt)
return [r.model_dump(mode="json") for r in result.scalars().all()]
async def prune_orchestrator_emails(self, per_decky_cap: int = 10000) -> int:
"""Trim per-mail-decky rows to *per_decky_cap*, oldest-first."""
deleted = 0
async with self._session() as session:
decky_rows = await session.execute(
select(OrchestratorEmail.mail_decky_uuid).distinct()
)
for (mail_uuid,) in decky_rows.all():
keep = await session.execute(
select(OrchestratorEmail.uuid)
.where(OrchestratorEmail.mail_decky_uuid == mail_uuid)
.order_by(desc(OrchestratorEmail.ts))
.limit(per_decky_cap)
)
keep_uuids = [u for (u,) in keep.all()]
if not keep_uuids:
continue
from sqlalchemy import delete as _delete
stmt = _delete(OrchestratorEmail).where(
OrchestratorEmail.mail_decky_uuid == mail_uuid,
OrchestratorEmail.uuid.notin_(keep_uuids),
)
res = await session.execute(stmt)
deleted += res.rowcount or 0
await session.commit()
return deleted