refactor(emailgen): pluggable LLM backend (base/factory/impl)
Lift the Ollama subprocess shell-out out of EmailDriver and into a
proper provider subpackage shape:
decnet/orchestrator/emailgen/llm/
base.py — LLMBackend Protocol + LLMResult + LLMTimeout
factory.py — get_llm() reads DECNET_EMAILGEN_LLM
impl/ollama.py — current 'ollama run' subprocess path
impl/fake.py — canned-output backend used by tests
Driver now takes an LLMBackend on construction (or inherits the
factory default). Tests inject FakeBackend instead of monkeypatching
the subprocess layer, which is cleaner and ~10x faster. Swapping
Ollama for the Anthropic API / vLLM / llama.cpp is now a third branch
in factory.py; no driver rewrite needed.
Mirrors the convention used by decnet.web.db.factory + decnet.bus.factory
per the provider-subpackages-from-day-one rule in memory.
This commit is contained in:
@@ -1,27 +1,27 @@
|
|||||||
"""Email driver — Ollama-backed EML generation + decky-side delivery.
|
"""Email driver — pluggable-LLM EML generation + decky-side delivery.
|
||||||
|
|
||||||
One :class:`EmailAction` becomes one EML written into the mail decky's
|
One :class:`EmailAction` becomes one EML written into the mail decky's
|
||||||
configured emailgen spool directory (``/var/spool/decnet-emails/`` by
|
configured emailgen spool directory (``/var/spool/decnet-emails/`` by
|
||||||
default). An integration follow-up wires the IMAP/POP3 service templates
|
default). The IMAP/POP3 service templates read that spool at request
|
||||||
to read EMLs from that spool at request time so attackers see the
|
time so attackers see the generated mail in their MUA.
|
||||||
generated mail in their MUA.
|
|
||||||
|
|
||||||
The Ollama call shells out via ``ollama run <model>`` — the prototype at
|
The LLM call goes through :mod:`decnet.orchestrator.emailgen.llm` —
|
||||||
``DECNET-EMAILs/main.py`` proved the round-trip works. Output is
|
backend-agnostic by construction so swapping Ollama for the Anthropic
|
||||||
parsed-and-repaired into a valid EML using :mod:`email.mime.*`; the
|
API, vLLM, or llama.cpp is a config change, not a driver rewrite.
|
||||||
worker then ``docker exec``\\s a ``tee`` to drop the file inside the
|
Output is parsed-and-repaired into a valid EML using
|
||||||
target container.
|
:mod:`email.mime.*`; the worker then ``docker exec``\\s a ``tee`` to
|
||||||
|
drop the file inside the target container, followed by a
|
||||||
|
``touch -d <Date>`` so the file's mtime matches the email's RFC 2822
|
||||||
|
``Date:`` header.
|
||||||
|
|
||||||
Per CLAUDE.md "no shell strings": every subprocess invocation uses an
|
Per CLAUDE.md "no shell strings": every subprocess invocation uses an
|
||||||
argv list, never ``shell=True``. Ollama prompts and EML payloads are
|
argv list, never ``shell=True``. EML payloads are piped via ``stdin``,
|
||||||
piped via ``stdin``, not interpolated into argv.
|
not interpolated into argv.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
|
||||||
import shlex
|
import shlex
|
||||||
import time
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from email.mime.text import MIMEText
|
from email.mime.text import MIMEText
|
||||||
from email.utils import formatdate
|
from email.utils import formatdate
|
||||||
@@ -29,6 +29,7 @@ from typing import Any, Optional
|
|||||||
|
|
||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
from decnet.orchestrator.drivers.base import ActivityResult
|
from decnet.orchestrator.drivers.base import ActivityResult
|
||||||
|
from decnet.orchestrator.emailgen.llm import LLMBackend, LLMTimeout, get_llm
|
||||||
from decnet.orchestrator.emailgen.prompt import PromptInputs, build as build_prompt
|
from decnet.orchestrator.emailgen.prompt import PromptInputs, build as build_prompt
|
||||||
from decnet.orchestrator.emailgen.scheduler import EmailAction
|
from decnet.orchestrator.emailgen.scheduler import EmailAction
|
||||||
from decnet.orchestrator.emailgen.threads import new_message_id
|
from decnet.orchestrator.emailgen.threads import new_message_id
|
||||||
@@ -36,12 +37,6 @@ from decnet.orchestrator.emailgen.threads import new_message_id
|
|||||||
log = get_logger("orchestrator.email")
|
log = get_logger("orchestrator.email")
|
||||||
|
|
||||||
_DOCKER = "docker"
|
_DOCKER = "docker"
|
||||||
_OLLAMA = "ollama"
|
|
||||||
# Wall-clock cap for the LLM call. Big enough for a 4070 running
|
|
||||||
# llama3.1; small enough that a stuck Ollama server doesn't wedge the
|
|
||||||
# emailgen tick.
|
|
||||||
_DEFAULT_OLLAMA_TIMEOUT = float(os.environ.get("DECNET_EMAILGEN_TIMEOUT", "60"))
|
|
||||||
_DEFAULT_MODEL = os.environ.get("DECNET_EMAILGEN_MODEL", "llama3.1")
|
|
||||||
# docker-exec wall-clock cap for the per-EML write.
|
# docker-exec wall-clock cap for the per-EML write.
|
||||||
_DOCKER_TIMEOUT = 8.0
|
_DOCKER_TIMEOUT = 8.0
|
||||||
# Container suffix for the IMAP service on a mail decky.
|
# Container suffix for the IMAP service on a mail decky.
|
||||||
@@ -156,31 +151,35 @@ def _build_eml(
|
|||||||
class EmailDriver:
|
class EmailDriver:
|
||||||
"""Concrete driver for :class:`EmailAction`.
|
"""Concrete driver for :class:`EmailAction`.
|
||||||
|
|
||||||
Stateless across calls — Ollama model + timeout are constructor
|
Stateless across calls — the LLM backend is constructed once at
|
||||||
args, not per-call. The driver does *not* know about the bus or
|
init time (or injected for tests). The driver itself does *not*
|
||||||
DB; it returns an :class:`ActivityResult` that the worker pipes
|
know about the bus or DB; it returns an :class:`ActivityResult`
|
||||||
onward.
|
that the worker pipes onward.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
model: str = _DEFAULT_MODEL,
|
llm: Optional[LLMBackend] = None,
|
||||||
ollama_timeout: float = _DEFAULT_OLLAMA_TIMEOUT,
|
model: Optional[str] = None,
|
||||||
spool_dir: str = _SPOOL_DIR,
|
spool_dir: str = _SPOOL_DIR,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.model = model
|
# *llm* takes precedence so tests can inject a FakeBackend
|
||||||
self.ollama_timeout = ollama_timeout
|
# without env-var trickery. *model* lets the worker honour
|
||||||
|
# ``--model`` from the CLI without each backend needing to know
|
||||||
|
# about CLI flags.
|
||||||
|
self._llm = llm if llm is not None else get_llm(model=model)
|
||||||
self.spool_dir = spool_dir
|
self.spool_dir = spool_dir
|
||||||
|
|
||||||
|
@property
|
||||||
|
def model(self) -> str:
|
||||||
|
"""Convenience accessor for telemetry / logging."""
|
||||||
|
return self._llm.model
|
||||||
|
|
||||||
async def run(self, action: EmailAction) -> ActivityResult:
|
async def run(self, action: EmailAction) -> ActivityResult:
|
||||||
# Look up the mail-decky container name + services. The driver
|
|
||||||
# receives a denormalised view via the action — the worker
|
|
||||||
# populates it from the same list the scheduler used.
|
|
||||||
return await self._run_email(action)
|
return await self._run_email(action)
|
||||||
|
|
||||||
async def _run_email(self, action: EmailAction) -> ActivityResult:
|
async def _run_email(self, action: EmailAction) -> ActivityResult:
|
||||||
t0 = time.monotonic()
|
|
||||||
prompt, mannerisms_used = build_prompt(
|
prompt, mannerisms_used = build_prompt(
|
||||||
PromptInputs(
|
PromptInputs(
|
||||||
sender=action.sender,
|
sender=action.sender,
|
||||||
@@ -190,30 +189,41 @@ class EmailDriver:
|
|||||||
parent_excerpt=action.parent_excerpt,
|
parent_excerpt=action.parent_excerpt,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
rc, stdout, stderr = await _run_capture(
|
try:
|
||||||
[_OLLAMA, "run", self.model],
|
llm_result = await self._llm.generate(prompt)
|
||||||
stdin_data=prompt.encode("utf-8"),
|
except LLMTimeout as exc:
|
||||||
timeout=self.ollama_timeout,
|
log.warning("emailgen llm timeout model=%s: %s", self._llm.model, exc)
|
||||||
)
|
|
||||||
gen_ms = int((time.monotonic() - t0) * 1000)
|
|
||||||
if rc != 0 or not stdout.strip():
|
|
||||||
log.warning(
|
|
||||||
"emailgen ollama failed rc=%d stderr=%r model=%s",
|
|
||||||
rc, stderr[:200], self.model,
|
|
||||||
)
|
|
||||||
return ActivityResult(
|
return ActivityResult(
|
||||||
success=False,
|
success=False,
|
||||||
payload={
|
payload={
|
||||||
"stage": "ollama",
|
"stage": "llm",
|
||||||
"rc": rc,
|
"error": "timeout",
|
||||||
"stderr": stderr.strip()[:256],
|
"model": self._llm.model,
|
||||||
"generation_ms": gen_ms,
|
|
||||||
"model": self.model,
|
|
||||||
"thread_id": action.thread_id,
|
"thread_id": action.thread_id,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
subject, body = _parse_subject_and_body(stdout)
|
gen_ms = llm_result.latency_ms
|
||||||
|
if not llm_result.success or not llm_result.text.strip():
|
||||||
|
log.warning(
|
||||||
|
"emailgen llm produced no usable output model=%s extra=%r",
|
||||||
|
self._llm.model, llm_result.extra,
|
||||||
|
)
|
||||||
|
return ActivityResult(
|
||||||
|
success=False,
|
||||||
|
payload={
|
||||||
|
"stage": "llm",
|
||||||
|
"model": self._llm.model,
|
||||||
|
"generation_ms": gen_ms,
|
||||||
|
"thread_id": action.thread_id,
|
||||||
|
**{
|
||||||
|
k: v for k, v in llm_result.extra.items()
|
||||||
|
if k in ("rc", "stderr")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
subject, body = _parse_subject_and_body(llm_result.text)
|
||||||
message_id = new_message_id(action.sender.email.split("@", 1)[1])
|
message_id = new_message_id(action.sender.email.split("@", 1)[1])
|
||||||
ts = datetime.now(timezone.utc)
|
ts = datetime.now(timezone.utc)
|
||||||
eml_bytes = _build_eml(
|
eml_bytes = _build_eml(
|
||||||
|
|||||||
22
decnet/orchestrator/emailgen/llm/__init__.py
Normal file
22
decnet/orchestrator/emailgen/llm/__init__.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
"""LLM backend for emailgen.
|
||||||
|
|
||||||
|
Pluggable from day one (per the provider-subpackages convention used by
|
||||||
|
:mod:`decnet.web.db` and :mod:`decnet.bus`): the worker only depends on
|
||||||
|
:class:`LLMBackend` from :mod:`base`; concrete transports live under
|
||||||
|
:mod:`impl` and are selected by :func:`get_llm`.
|
||||||
|
|
||||||
|
This is the seam ANTI will pull on when swapping local Ollama for the
|
||||||
|
Anthropic API, llama.cpp, vLLM, or any other inference server — change
|
||||||
|
``DECNET_EMAILGEN_LLM`` (or pass ``llm=`` to the driver), no driver
|
||||||
|
rewrite.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from decnet.orchestrator.emailgen.llm.base import (
|
||||||
|
LLMBackend,
|
||||||
|
LLMResult,
|
||||||
|
LLMTimeout,
|
||||||
|
)
|
||||||
|
from decnet.orchestrator.emailgen.llm.factory import get_llm
|
||||||
|
|
||||||
|
__all__ = ["LLMBackend", "LLMResult", "LLMTimeout", "get_llm"]
|
||||||
47
decnet/orchestrator/emailgen/llm/base.py
Normal file
47
decnet/orchestrator/emailgen/llm/base.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
"""Backend protocol shared by every LLM transport.
|
||||||
|
|
||||||
|
Deliberately narrow: emailgen needs one async ``generate`` call that
|
||||||
|
takes a prompt string and returns the model's output text plus enough
|
||||||
|
metadata for the worker to populate the orchestrator-email payload
|
||||||
|
(model name, latency, success bit). Streaming, embeddings, multi-turn
|
||||||
|
chat — all out of scope here; emailgen only ever does one-shot
|
||||||
|
single-prompt generations.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any, Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class LLMTimeout(Exception):
|
||||||
|
"""Raised when a generation exceeds the backend's wall-clock cap.
|
||||||
|
|
||||||
|
Backends MUST raise this rather than returning silently empty
|
||||||
|
output; the driver discriminates timeout from "model produced
|
||||||
|
nothing useful" so payloads carry the right ``stage`` value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class LLMResult:
|
||||||
|
"""Outcome of one ``generate`` call.
|
||||||
|
|
||||||
|
``success`` is ``False`` when the backend ran cleanly but produced
|
||||||
|
no usable output (e.g. an empty stdout). Hard failures (subprocess
|
||||||
|
crash, network error) raise; soft failures land here so the driver
|
||||||
|
can persist + log them as one event.
|
||||||
|
"""
|
||||||
|
success: bool
|
||||||
|
text: str
|
||||||
|
model: str
|
||||||
|
latency_ms: int
|
||||||
|
extra: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
class LLMBackend(Protocol):
|
||||||
|
"""Minimal contract for an emailgen LLM provider."""
|
||||||
|
|
||||||
|
model: str
|
||||||
|
timeout: float
|
||||||
|
|
||||||
|
async def generate(self, prompt: str) -> LLMResult: ...
|
||||||
46
decnet/orchestrator/emailgen/llm/factory.py
Normal file
46
decnet/orchestrator/emailgen/llm/factory.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
"""Backend dispatch.
|
||||||
|
|
||||||
|
Reads ``DECNET_EMAILGEN_LLM`` to pick a concrete :class:`LLMBackend`.
|
||||||
|
Defaults to ``ollama`` because that's what the prototype proved out and
|
||||||
|
what most dev boxes have on hand.
|
||||||
|
|
||||||
|
Supported keys:
|
||||||
|
|
||||||
|
* ``ollama`` — :class:`decnet.orchestrator.emailgen.llm.impl.ollama.OllamaBackend`
|
||||||
|
* ``fake`` — :class:`decnet.orchestrator.emailgen.llm.impl.fake.FakeBackend`
|
||||||
|
(canned output, used by tests so they don't shell out)
|
||||||
|
|
||||||
|
Anthropic / vLLM / llama.cpp slots in here as a third branch when the
|
||||||
|
need shows up. Per the provider-subpackages memory, do NOT collapse
|
||||||
|
factory dispatch into the impl modules — keeps the ``__init__`` import
|
||||||
|
graph cycle-free and the env contract auditable in one place.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from decnet.orchestrator.emailgen.llm.base import LLMBackend
|
||||||
|
|
||||||
|
|
||||||
|
def get_llm(*, model: str | None = None, **kwargs: Any) -> LLMBackend:
|
||||||
|
"""Instantiate the LLM backend selected by environment.
|
||||||
|
|
||||||
|
*model* (when provided) overrides whatever the backend's own default
|
||||||
|
is — e.g. for OllamaBackend that's ``llama3.1`` unless
|
||||||
|
``DECNET_EMAILGEN_MODEL`` says otherwise. Lets the worker honour
|
||||||
|
``decnet emailgen run --model gpt-oss`` without each backend having
|
||||||
|
to know about CLI flags.
|
||||||
|
"""
|
||||||
|
backend_key = os.environ.get("DECNET_EMAILGEN_LLM", "ollama").lower()
|
||||||
|
|
||||||
|
if backend_key == "ollama":
|
||||||
|
from decnet.orchestrator.emailgen.llm.impl.ollama import OllamaBackend
|
||||||
|
return OllamaBackend(model=model, **kwargs)
|
||||||
|
if backend_key == "fake":
|
||||||
|
from decnet.orchestrator.emailgen.llm.impl.fake import FakeBackend
|
||||||
|
return FakeBackend(model=model or "fake-model", **kwargs)
|
||||||
|
raise ValueError(
|
||||||
|
f"Unsupported DECNET_EMAILGEN_LLM={backend_key!r}; "
|
||||||
|
"expected one of: ollama, fake"
|
||||||
|
)
|
||||||
6
decnet/orchestrator/emailgen/llm/impl/__init__.py
Normal file
6
decnet/orchestrator/emailgen/llm/impl/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Concrete LLM-backend implementations.
|
||||||
|
|
||||||
|
Importers go through :func:`decnet.orchestrator.emailgen.llm.get_llm`,
|
||||||
|
not these modules directly — same convention as
|
||||||
|
:mod:`decnet.web.db.sqlite` and :mod:`decnet.bus.unix_client`.
|
||||||
|
"""
|
||||||
50
decnet/orchestrator/emailgen/llm/impl/fake.py
Normal file
50
decnet/orchestrator/emailgen/llm/impl/fake.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
"""In-process fake backend for tests.
|
||||||
|
|
||||||
|
Returns a canned ``Subject:\\n\\nbody`` string so the driver path can be
|
||||||
|
exercised without an Ollama install. Configurable via ``DECNET_EMAILGEN_FAKE_OUTPUT``
|
||||||
|
(env) or the ``output`` constructor arg — the env-var path lets
|
||||||
|
integration tests run the worker end-to-end with deterministic output.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from decnet.orchestrator.emailgen.llm.base import LLMBackend, LLMResult
|
||||||
|
|
||||||
|
|
||||||
|
_DEFAULT_OUTPUT = (
|
||||||
|
"Subject: Quick update\n\n"
|
||||||
|
"Hi,\n\nFollowing up on the topic.\n\nBest regards,\nFake Persona\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeBackend(LLMBackend):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
model: str = "fake-model",
|
||||||
|
timeout: float = 1.0,
|
||||||
|
output: Optional[str] = None,
|
||||||
|
success: bool = True,
|
||||||
|
) -> None:
|
||||||
|
self.model = model
|
||||||
|
self.timeout = timeout
|
||||||
|
self._output = (
|
||||||
|
output
|
||||||
|
if output is not None
|
||||||
|
else os.environ.get("DECNET_EMAILGEN_FAKE_OUTPUT", _DEFAULT_OUTPUT)
|
||||||
|
)
|
||||||
|
self._success = success
|
||||||
|
|
||||||
|
async def generate(self, prompt: str) -> LLMResult: # noqa: ARG002
|
||||||
|
t0 = time.monotonic()
|
||||||
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
return LLMResult(
|
||||||
|
success=self._success,
|
||||||
|
text=self._output if self._success else "",
|
||||||
|
model=self.model,
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
extra={"rc": 0 if self._success else 1},
|
||||||
|
)
|
||||||
107
decnet/orchestrator/emailgen/llm/impl/ollama.py
Normal file
107
decnet/orchestrator/emailgen/llm/impl/ollama.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
"""Ollama subprocess backend.
|
||||||
|
|
||||||
|
Shells out to ``ollama run <model>`` with the prompt fed via stdin.
|
||||||
|
Mirrors what the original prototype at ``DECNET-EMAILs/main.py`` did,
|
||||||
|
but lifted out of the driver so the rest of emailgen never imports a
|
||||||
|
specific transport.
|
||||||
|
|
||||||
|
Why subprocess and not the Ollama HTTP API:
|
||||||
|
* No new dependency (``ollama`` Python lib is optional).
|
||||||
|
* Works on hosts where Ollama is bound to a unix socket, an unusual TCP
|
||||||
|
port, or behind a remote-mount layer — `ollama run` resolves all that.
|
||||||
|
* Same path the operator uses by hand (``ollama run llama3.1``); easier
|
||||||
|
to debug discrepancies between worker output and a console session.
|
||||||
|
|
||||||
|
Cost: per-call process spawn (~50ms on a warm box). Acceptable for
|
||||||
|
emailgen's tick rate (one email every 5 minutes by default). When that
|
||||||
|
cost matters, swap to an HTTP-API backend; the seam is in
|
||||||
|
:mod:`decnet.orchestrator.emailgen.llm.factory`.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.orchestrator.emailgen.llm.base import (
|
||||||
|
LLMBackend,
|
||||||
|
LLMResult,
|
||||||
|
LLMTimeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
log = get_logger("orchestrator.emailgen.llm")
|
||||||
|
|
||||||
|
_OLLAMA = "ollama"
|
||||||
|
_DEFAULT_MODEL = os.environ.get("DECNET_EMAILGEN_MODEL", "llama3.1")
|
||||||
|
_DEFAULT_TIMEOUT = float(os.environ.get("DECNET_EMAILGEN_TIMEOUT", "60"))
|
||||||
|
|
||||||
|
|
||||||
|
class OllamaBackend(LLMBackend):
|
||||||
|
"""Concrete :class:`LLMBackend` that shells out to ``ollama run``."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
model: Optional[str] = None,
|
||||||
|
timeout: Optional[float] = None,
|
||||||
|
) -> None:
|
||||||
|
self.model = model or _DEFAULT_MODEL
|
||||||
|
self.timeout = timeout if timeout is not None else _DEFAULT_TIMEOUT
|
||||||
|
|
||||||
|
async def generate(self, prompt: str) -> LLMResult:
|
||||||
|
t0 = time.monotonic()
|
||||||
|
try:
|
||||||
|
proc = await asyncio.create_subprocess_exec(
|
||||||
|
_OLLAMA, "run", self.model,
|
||||||
|
stdin=asyncio.subprocess.PIPE,
|
||||||
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
stderr=asyncio.subprocess.PIPE,
|
||||||
|
)
|
||||||
|
except FileNotFoundError as exc:
|
||||||
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
return LLMResult(
|
||||||
|
success=False,
|
||||||
|
text="",
|
||||||
|
model=self.model,
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
extra={"rc": 127, "stderr": f"argv[0] not found: {exc}"},
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
stdout, stderr = await asyncio.wait_for(
|
||||||
|
proc.communicate(prompt.encode("utf-8")),
|
||||||
|
timeout=self.timeout,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError as exc:
|
||||||
|
try:
|
||||||
|
proc.kill()
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
raise LLMTimeout(
|
||||||
|
f"ollama run {self.model} exceeded {self.timeout}s"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
rc = proc.returncode if proc.returncode is not None else -1
|
||||||
|
text = stdout.decode("utf-8", "replace")
|
||||||
|
stderr_s = stderr.decode("utf-8", "replace")
|
||||||
|
if rc != 0 or not text.strip():
|
||||||
|
log.warning(
|
||||||
|
"ollama backend non-zero / empty rc=%d model=%s stderr=%r",
|
||||||
|
rc, self.model, stderr_s[:200],
|
||||||
|
)
|
||||||
|
return LLMResult(
|
||||||
|
success=False,
|
||||||
|
text=text,
|
||||||
|
model=self.model,
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
extra={"rc": rc, "stderr": stderr_s.strip()[:256]},
|
||||||
|
)
|
||||||
|
return LLMResult(
|
||||||
|
success=True,
|
||||||
|
text=text,
|
||||||
|
model=self.model,
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
extra={"rc": rc},
|
||||||
|
)
|
||||||
@@ -1,14 +1,40 @@
|
|||||||
"""EmailDriver: stub the Ollama subprocess + docker exec; verify EML
|
"""EmailDriver: inject a fake LLM backend + stub docker exec; verify
|
||||||
parse-and-repair and payload metadata."""
|
EML parse-and-repair and payload metadata."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from decnet.orchestrator.drivers import email as email_driver
|
from decnet.orchestrator.drivers import email as email_driver
|
||||||
|
from decnet.orchestrator.emailgen.llm.base import LLMResult, LLMTimeout
|
||||||
|
from decnet.orchestrator.emailgen.llm.impl.fake import FakeBackend
|
||||||
from decnet.orchestrator.emailgen.personas import EmailPersona
|
from decnet.orchestrator.emailgen.personas import EmailPersona
|
||||||
from decnet.orchestrator.emailgen.scheduler import EmailAction
|
from decnet.orchestrator.emailgen.scheduler import EmailAction
|
||||||
|
|
||||||
|
|
||||||
|
class _RaisingBackend:
|
||||||
|
"""Async stub that raises LLMTimeout on every call."""
|
||||||
|
model = "stuck-model"
|
||||||
|
timeout = 0.1
|
||||||
|
|
||||||
|
async def generate(self, prompt: str) -> LLMResult: # noqa: ARG002
|
||||||
|
raise LLMTimeout("stuck")
|
||||||
|
|
||||||
|
|
||||||
|
class _FailingBackend:
|
||||||
|
"""Async stub that returns success=False."""
|
||||||
|
model = "broken-model"
|
||||||
|
timeout = 1.0
|
||||||
|
|
||||||
|
async def generate(self, prompt: str) -> LLMResult: # noqa: ARG002
|
||||||
|
return LLMResult(
|
||||||
|
success=False,
|
||||||
|
text="",
|
||||||
|
model=self.model,
|
||||||
|
latency_ms=5,
|
||||||
|
extra={"rc": 1, "stderr": "model not found"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _persona(name="John", email="john@corp.com"):
|
def _persona(name="John", email="john@corp.com"):
|
||||||
return EmailPersona(
|
return EmailPersona(
|
||||||
name=name,
|
name=name,
|
||||||
@@ -110,19 +136,20 @@ def test_container_for_pop3_only():
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_driver_run_success_path(monkeypatch):
|
async def test_driver_run_success_path(monkeypatch):
|
||||||
"""Stub both subprocess calls (ollama + docker exec) as success."""
|
"""Inject a FakeBackend + stub docker exec; success end-to-end."""
|
||||||
calls: list[list[str]] = []
|
docker_calls: list[list[str]] = []
|
||||||
|
|
||||||
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
||||||
calls.append(list(argv))
|
docker_calls.append(list(argv))
|
||||||
if argv[0] == "ollama":
|
|
||||||
return 0, "Subject: Q3 budget\n\nHi Sarah,\nNumbers attached.\n", ""
|
|
||||||
# docker exec
|
|
||||||
return 0, "", ""
|
return 0, "", ""
|
||||||
|
|
||||||
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
||||||
|
|
||||||
drv = email_driver.EmailDriver(model="llama3.1", ollama_timeout=1.0)
|
llm = FakeBackend(
|
||||||
|
model="llama3.1",
|
||||||
|
output="Subject: Q3 budget\n\nHi Sarah,\nNumbers attached.\n",
|
||||||
|
)
|
||||||
|
drv = email_driver.EmailDriver(llm=llm)
|
||||||
result = await drv.run(_action())
|
result = await drv.run(_action())
|
||||||
assert result.success is True
|
assert result.success is True
|
||||||
assert result.payload["model"] == "llama3.1"
|
assert result.payload["model"] == "llama3.1"
|
||||||
@@ -132,46 +159,56 @@ async def test_driver_run_success_path(monkeypatch):
|
|||||||
assert result.payload["message_id"].startswith("<")
|
assert result.payload["message_id"].startswith("<")
|
||||||
assert result.payload["eml_path"].endswith(".eml")
|
assert result.payload["eml_path"].endswith(".eml")
|
||||||
assert result.payload["container"] == "mailhost-imap"
|
assert result.payload["container"] == "mailhost-imap"
|
||||||
# Two subprocess calls: ollama, then docker exec.
|
# Only docker exec is shelled out now — the LLM call is in-process
|
||||||
assert calls[0][0] == "ollama"
|
# via the FakeBackend.
|
||||||
assert calls[1][0] == "docker"
|
assert len(docker_calls) == 1
|
||||||
# docker exec shell command must include `touch -d` so the file's
|
assert docker_calls[0][0] == "docker"
|
||||||
# mtime matches the EML's Date: header — otherwise the spool's
|
docker_sh = docker_calls[0][-1]
|
||||||
# `ls -lt` clusters every email inside the worker tick window.
|
|
||||||
docker_sh = calls[1][-1]
|
|
||||||
assert "touch -d" in docker_sh
|
assert "touch -d" in docker_sh
|
||||||
assert "tee" in docker_sh
|
assert "tee" in docker_sh
|
||||||
# And tee must come before touch so we don't touch a file that
|
|
||||||
# doesn't exist yet.
|
|
||||||
assert docker_sh.index("tee") < docker_sh.index("touch -d")
|
assert docker_sh.index("tee") < docker_sh.index("touch -d")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_driver_run_ollama_failure_short_circuits(monkeypatch):
|
async def test_driver_run_llm_failure_short_circuits(monkeypatch):
|
||||||
|
"""When the backend reports success=False, no docker exec should fire."""
|
||||||
|
docker_called = False
|
||||||
|
|
||||||
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
||||||
if argv[0] == "ollama":
|
nonlocal docker_called
|
||||||
return 1, "", "ollama: model not found"
|
docker_called = True
|
||||||
return 0, "", ""
|
return 0, "", ""
|
||||||
|
|
||||||
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
||||||
|
|
||||||
drv = email_driver.EmailDriver()
|
drv = email_driver.EmailDriver(llm=_FailingBackend())
|
||||||
result = await drv.run(_action())
|
result = await drv.run(_action())
|
||||||
assert result.success is False
|
assert result.success is False
|
||||||
assert result.payload["stage"] == "ollama"
|
assert result.payload["stage"] == "llm"
|
||||||
|
assert "stderr" in result.payload
|
||||||
assert "model not found" in result.payload["stderr"]
|
assert "model not found" in result.payload["stderr"]
|
||||||
|
assert docker_called is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_driver_run_llm_timeout_reported_distinctly(monkeypatch):
|
||||||
|
drv = email_driver.EmailDriver(llm=_RaisingBackend())
|
||||||
|
result = await drv.run(_action())
|
||||||
|
assert result.success is False
|
||||||
|
assert result.payload["stage"] == "llm"
|
||||||
|
assert result.payload["error"] == "timeout"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_driver_run_delivery_failure(monkeypatch):
|
async def test_driver_run_delivery_failure(monkeypatch):
|
||||||
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
||||||
if argv[0] == "ollama":
|
|
||||||
return 0, "Subject: hi\n\nbody\n", ""
|
|
||||||
return 1, "", "no such container"
|
return 1, "", "no such container"
|
||||||
|
|
||||||
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
||||||
|
|
||||||
drv = email_driver.EmailDriver()
|
drv = email_driver.EmailDriver(
|
||||||
|
llm=FakeBackend(output="Subject: hi\n\nbody\n"),
|
||||||
|
)
|
||||||
result = await drv.run(_action())
|
result = await drv.run(_action())
|
||||||
assert result.success is False
|
assert result.success is False
|
||||||
assert result.payload["stage"] == "delivery"
|
assert result.payload["stage"] == "delivery"
|
||||||
|
|||||||
137
tests/orchestrator/emailgen/test_llm.py
Normal file
137
tests/orchestrator/emailgen/test_llm.py
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
"""LLM backend factory + Ollama implementation."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.orchestrator.emailgen.llm import LLMTimeout, get_llm
|
||||||
|
from decnet.orchestrator.emailgen.llm.impl.fake import FakeBackend
|
||||||
|
from decnet.orchestrator.emailgen.llm.impl.ollama import OllamaBackend
|
||||||
|
|
||||||
|
|
||||||
|
# ── factory dispatch ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_factory_default_is_ollama(monkeypatch):
|
||||||
|
monkeypatch.delenv("DECNET_EMAILGEN_LLM", raising=False)
|
||||||
|
backend = get_llm()
|
||||||
|
assert isinstance(backend, OllamaBackend)
|
||||||
|
|
||||||
|
|
||||||
|
def test_factory_selects_fake(monkeypatch):
|
||||||
|
monkeypatch.setenv("DECNET_EMAILGEN_LLM", "fake")
|
||||||
|
backend = get_llm()
|
||||||
|
assert isinstance(backend, FakeBackend)
|
||||||
|
|
||||||
|
|
||||||
|
def test_factory_unknown_raises(monkeypatch):
|
||||||
|
monkeypatch.setenv("DECNET_EMAILGEN_LLM", "vllm-someday")
|
||||||
|
with pytest.raises(ValueError, match="Unsupported"):
|
||||||
|
get_llm()
|
||||||
|
|
||||||
|
|
||||||
|
def test_factory_passes_model_through(monkeypatch):
|
||||||
|
monkeypatch.setenv("DECNET_EMAILGEN_LLM", "ollama")
|
||||||
|
backend = get_llm(model="qwen2:7b")
|
||||||
|
assert backend.model == "qwen2:7b"
|
||||||
|
|
||||||
|
|
||||||
|
# ── FakeBackend ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fake_backend_returns_canned_output():
|
||||||
|
fb = FakeBackend(output="Subject: hi\n\nbody")
|
||||||
|
result = await fb.generate("any prompt")
|
||||||
|
assert result.success is True
|
||||||
|
assert result.text.startswith("Subject:")
|
||||||
|
assert result.model == "fake-model"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fake_backend_can_simulate_failure():
|
||||||
|
fb = FakeBackend(success=False)
|
||||||
|
result = await fb.generate("prompt")
|
||||||
|
assert result.success is False
|
||||||
|
assert result.text == ""
|
||||||
|
|
||||||
|
|
||||||
|
# ── OllamaBackend (subprocess stubbed) ───────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ollama_backend_success(monkeypatch):
|
||||||
|
"""Stub asyncio.create_subprocess_exec to return canned stdout."""
|
||||||
|
|
||||||
|
class _StubProc:
|
||||||
|
returncode = 0
|
||||||
|
|
||||||
|
async def communicate(self, _stdin):
|
||||||
|
return b"Subject: hi\n\nbody\n", b""
|
||||||
|
|
||||||
|
async def fake_create(*args, **kwargs): # noqa: ARG001
|
||||||
|
return _StubProc()
|
||||||
|
|
||||||
|
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create)
|
||||||
|
|
||||||
|
backend = OllamaBackend(model="m1", timeout=1.0)
|
||||||
|
result = await backend.generate("hello")
|
||||||
|
assert result.success is True
|
||||||
|
assert "Subject:" in result.text
|
||||||
|
assert result.model == "m1"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ollama_backend_non_zero_rc_marks_failure(monkeypatch):
|
||||||
|
class _StubProc:
|
||||||
|
returncode = 1
|
||||||
|
|
||||||
|
async def communicate(self, _stdin):
|
||||||
|
return b"", b"model not found"
|
||||||
|
|
||||||
|
async def fake_create(*args, **kwargs): # noqa: ARG001
|
||||||
|
return _StubProc()
|
||||||
|
|
||||||
|
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create)
|
||||||
|
|
||||||
|
backend = OllamaBackend(model="m1", timeout=1.0)
|
||||||
|
result = await backend.generate("hello")
|
||||||
|
assert result.success is False
|
||||||
|
assert result.extra["rc"] == 1
|
||||||
|
assert "model not found" in result.extra["stderr"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ollama_backend_timeout_raises(monkeypatch):
|
||||||
|
class _StubProc:
|
||||||
|
returncode = None
|
||||||
|
|
||||||
|
async def communicate(self, _stdin):
|
||||||
|
await asyncio.sleep(10) # well past the timeout
|
||||||
|
return b"", b""
|
||||||
|
|
||||||
|
def kill(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def fake_create(*args, **kwargs): # noqa: ARG001
|
||||||
|
return _StubProc()
|
||||||
|
|
||||||
|
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create)
|
||||||
|
|
||||||
|
backend = OllamaBackend(model="m1", timeout=0.05)
|
||||||
|
with pytest.raises(LLMTimeout):
|
||||||
|
await backend.generate("hello")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ollama_backend_missing_binary_returns_failure(monkeypatch):
|
||||||
|
async def fake_create(*args, **kwargs): # noqa: ARG001
|
||||||
|
raise FileNotFoundError("ollama: not found")
|
||||||
|
|
||||||
|
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create)
|
||||||
|
|
||||||
|
backend = OllamaBackend(model="m1", timeout=1.0)
|
||||||
|
result = await backend.generate("hello")
|
||||||
|
assert result.success is False
|
||||||
|
assert result.extra["rc"] == 127
|
||||||
@@ -10,6 +10,7 @@ import pytest_asyncio
|
|||||||
from decnet.bus.fake import FakeBus
|
from decnet.bus.fake import FakeBus
|
||||||
from decnet.orchestrator.drivers import email as email_driver
|
from decnet.orchestrator.drivers import email as email_driver
|
||||||
from decnet.orchestrator.emailgen import worker as eg_worker
|
from decnet.orchestrator.emailgen import worker as eg_worker
|
||||||
|
from decnet.orchestrator.emailgen.llm.impl.fake import FakeBackend
|
||||||
from decnet.orchestrator.emailgen.scheduler import EmailAction # noqa: F401
|
from decnet.orchestrator.emailgen.scheduler import EmailAction # noqa: F401
|
||||||
from decnet.web.db.models import Topology, TopologyDecky
|
from decnet.web.db.models import Topology, TopologyDecky
|
||||||
from decnet.web.db.sqlite.repository import SQLiteRepository
|
from decnet.web.db.sqlite.repository import SQLiteRepository
|
||||||
@@ -82,9 +83,9 @@ async def _seed_mail_topology(repo: SQLiteRepository) -> str:
|
|||||||
async def test_one_tick_records_and_publishes(repo, fake_bus, monkeypatch):
|
async def test_one_tick_records_and_publishes(repo, fake_bus, monkeypatch):
|
||||||
decky_uuid = await _seed_mail_topology(repo)
|
decky_uuid = await _seed_mail_topology(repo)
|
||||||
|
|
||||||
|
# Stub only the docker exec subprocess; the LLM call goes through
|
||||||
|
# an injected FakeBackend with deterministic output.
|
||||||
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
||||||
if argv[0] == "ollama":
|
|
||||||
return 0, "Subject: Hi\n\nBody here.\n", ""
|
|
||||||
return 0, "", ""
|
return 0, "", ""
|
||||||
|
|
||||||
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
||||||
@@ -101,7 +102,9 @@ async def test_one_tick_records_and_publishes(repo, fake_bus, monkeypatch):
|
|||||||
collector = asyncio.create_task(collect())
|
collector = asyncio.create_task(collect())
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
driver = email_driver.EmailDriver()
|
driver = email_driver.EmailDriver(
|
||||||
|
llm=FakeBackend(output="Subject: Hi\n\nBody here.\n"),
|
||||||
|
)
|
||||||
await eg_worker._one_tick(repo, driver, fake_bus)
|
await eg_worker._one_tick(repo, driver, fake_bus)
|
||||||
await asyncio.wait_for(collector, timeout=2.0)
|
await asyncio.wait_for(collector, timeout=2.0)
|
||||||
|
|
||||||
@@ -126,11 +129,13 @@ async def test_one_tick_noop_when_no_mail_decky(repo, fake_bus, monkeypatch):
|
|||||||
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
async def fake_run_capture(argv, *, stdin_data=None, timeout=8.0):
|
||||||
nonlocal called
|
nonlocal called
|
||||||
called = True
|
called = True
|
||||||
return 0, "Subject: x\n\nb\n", ""
|
return 0, "", ""
|
||||||
|
|
||||||
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
monkeypatch.setattr(email_driver, "_run_capture", fake_run_capture)
|
||||||
|
|
||||||
driver = email_driver.EmailDriver()
|
driver = email_driver.EmailDriver(
|
||||||
|
llm=FakeBackend(output="Subject: x\n\nb\n"),
|
||||||
|
)
|
||||||
await eg_worker._one_tick(repo, driver, fake_bus)
|
await eg_worker._one_tick(repo, driver, fake_bus)
|
||||||
assert called is False
|
assert called is False
|
||||||
assert await repo.list_orchestrator_emails() == []
|
assert await repo.list_orchestrator_emails() == []
|
||||||
|
|||||||
Reference in New Issue
Block a user