feat(mail): operator-tunable IMAP/POP3 email seed (DEBT-026)

IMAP_EMAIL_SEED / POP3_EMAIL_SEED accept a directory (rglob *.eml +
*.json) or a single .json/.eml. Loaded entries CONCATENATE with the
hardcoded _BAIT_EMAILS — additive to the realism-engine emailgen
output rather than replacing it. JSON dicts require from_addr /
to_addr / subject / body; bare bodies are wrapped into RFC 5322 on
load. compose_fragment reads service_cfg["email_seed"] and bind-mounts
the host path read-only at /var/spool/decnet-emails/seed.
This commit is contained in:
2026-05-03 02:47:06 -04:00
parent e0b07651fd
commit b88d67794d
8 changed files with 444 additions and 133 deletions

View File

@@ -4,11 +4,18 @@ from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent / "templates" / "imap"
_SEED_CONTAINER_PATH = "/var/spool/decnet-emails/seed"
class IMAPService(BaseService):
name = "imap"
ports = [143, 993]
default_image = "build"
# config_schema: no user-tunable fields yet — TODO add when compose_fragment grows cfg reads
# Optional config:
# email_seed: host path to a directory of .eml/.json files OR a
# single .json/.eml. Mounted read-only into the
# container; entries concatenate with the hardcoded
# bait list (additive to realism-engine output).
def compose_fragment(self, decky_name: str, log_target: str | None = None, service_cfg: dict | None = None) -> dict:
fragment: dict = {
@@ -19,6 +26,14 @@ class IMAPService(BaseService):
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
if service_cfg:
seed = service_cfg.get("email_seed")
if seed:
host_path = str(Path(str(seed)).expanduser().resolve())
fragment["environment"]["IMAP_EMAIL_SEED"] = _SEED_CONTAINER_PATH
fragment.setdefault("volumes", []).append(
f"{host_path}:{_SEED_CONTAINER_PATH}:ro"
)
return fragment
def dockerfile_context(self) -> Path | None:

View File

@@ -4,11 +4,17 @@ from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent / "templates" / "pop3"
_SEED_CONTAINER_PATH = "/var/spool/decnet-emails/seed"
class POP3Service(BaseService):
name = "pop3"
ports = [110, 995]
default_image = "build"
# config_schema: no user-tunable fields yet — TODO add when compose_fragment grows cfg reads
# Optional config:
# email_seed: host path to a directory of .eml/.json files OR a
# single .json/.eml. Mounted read-only; entries
# concatenate with the hardcoded bait list.
def compose_fragment(self, decky_name: str, log_target: str | None = None, service_cfg: dict | None = None) -> dict:
fragment: dict = {
@@ -19,6 +25,14 @@ class POP3Service(BaseService):
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
if service_cfg:
seed = service_cfg.get("email_seed")
if seed:
host_path = str(Path(str(seed)).expanduser().resolve())
fragment["environment"]["POP3_EMAIL_SEED"] = _SEED_CONTAINER_PATH
fragment.setdefault("volumes", []).append(
f"{host_path}:{_SEED_CONTAINER_PATH}:ro"
)
return fragment
def dockerfile_context(self) -> Path | None:

View File

@@ -13,7 +13,9 @@ Banner advertises Dovecot so nmap fingerprints correctly.
import asyncio
import email
import email.policy
import json
import os
import sys
import time
from email.utils import getaddresses
from pathlib import Path
@@ -37,14 +39,14 @@ VALID_USERS: dict[str, str] = {
u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)]
}
# Path to a directory of ``*.eml`` files that the orchestrator emailgen
# worker drops into the container (``/var/spool/decnet-emails/`` by
# convention). When set AND the directory contains parseable EMLs,
# they replace the hardcoded ``_BAIT_EMAILS`` fallback below — meaning
# every mail an attacker reads is the LLM-generated, persona-driven,
# language-aware version, not the static credential-stuffed bait list.
# Empty / missing / unparseable: the fallback list still serves so a
# fresh deployment is never silent.
# Operator/realism-engine email seed source. Two shapes accepted:
# 1. Directory: walked recursively for ``*.eml`` (RFC 822 on disk —
# what the realism-engine emailgen worker drops in) and ``*.json``
# (operator-curated lists of dicts, see _load_seed_json).
# 2. Single ``*.json`` file: a list of dicts with the same shape.
# Loaded entries are CONCATENATED with ``_BAIT_EMAILS`` — never replace.
# The hardcoded list keeps a fresh deployment non-silent and serves as
# the deterministic baseline the persona output stacks on top of.
_EMAIL_SEED_PATH = os.environ.get("IMAP_EMAIL_SEED", "")
# Re-scan the seed directory at most this often. Cheap: walking a few
# dozen .eml files is sub-millisecond, but caching keeps an attacker's
@@ -256,20 +258,18 @@ _MAILBOXES = ["INBOX", "Sent", "Drafts", "Archive"]
# ── Spool-backed email loader ─────────────────────────────────────────────────
# When IMAP_EMAIL_SEED points at a directory of .eml files the
# orchestrator emailgen worker has dropped into the container, parse
# them on demand and serve them as the INBOX. Cached between requests
# with a short TTL + mtime check so a hot mailbox doesn't pay the parse
# cost on every IMAP command.
#
# Failure modes (missing dir, unparseable EMLs, empty dir) all return
# the hardcoded fallback rather than 0 messages — a silent INBOX is a
# stronger tell than a slightly-stale one.
# When IMAP_EMAIL_SEED points at a directory (or a single .json file) the
# realism-engine emailgen worker / operator has populated, parse it on
# demand and CONCATENATE the result with the hardcoded ``_BAIT_EMAILS``.
# Cached with a short TTL + mtime check so a hot mailbox doesn't pay the
# parse cost on every IMAP command.
_seed_cache: list[dict] | None = None
_seed_cache_dir_mtime: float = 0.0
_seed_cache_path_mtime: float = 0.0
_seed_cache_loaded_at: float = 0.0
_SEED_JSON_REQUIRED = ("from_addr", "to_addr", "subject", "body")
def _split_addr(value: str) -> tuple[str, str]:
"""Return (display_name, email) from a header value, falling back to
@@ -284,11 +284,12 @@ def _split_addr(value: str) -> tuple[str, str]:
return (name or "").strip(), (addr or value).strip()
def _eml_to_dict(path: Path, uid: int) -> dict | None:
def _eml_to_dict(path: Path) -> dict | None:
"""Parse one .eml into the dict shape the rest of this server uses.
Returns None when the file isn't parseable; callers skip + continue
so one corrupt EML does not kill the whole INBOX listing.
so one corrupt EML does not kill the whole INBOX listing. ``uid``
is assigned by the caller after concatenation.
"""
try:
raw = path.read_bytes()
@@ -300,71 +301,155 @@ def _eml_to_dict(path: Path, uid: int) -> dict | None:
subject = (msg.get("Subject") or "").strip()
date = msg.get("Date") or ""
return {
"uid": uid,
"uid": 0,
"flags": [], # never \Seen for spool emails — fresh delivery
"from_name": from_name or from_addr.split("@", 1)[0] if from_addr else "Unknown",
"from_name": from_name or (from_addr.split("@", 1)[0] if from_addr else "Unknown"),
"from_addr": from_addr or "unknown@localhost",
"to_addr": to_addr or "unknown@localhost",
"subject": subject or "(no subject)",
"date": date,
# The body field carries the full RFC 822 message — headers + body.
# That mirrors how the hardcoded _BAIT_EMAILS entries are shaped.
"body": raw.decode("utf-8", errors="replace"),
}
def _scan_seed_dir(path: Path) -> list[dict]:
"""Walk *path* recursively, parse every ``*.eml``, sort by mtime."""
eml_paths: list[Path] = []
def _seed_dict_to_entry(entry: dict) -> dict | None:
"""Validate and normalize a JSON-supplied dict into the bait shape.
Required keys: from_addr, to_addr, subject, body. Optional: date,
from_name, flags. Bad rows return None (caller skips + logs).
"""
if not isinstance(entry, dict):
return None
for key in _SEED_JSON_REQUIRED:
if not isinstance(entry.get(key), str) or not entry[key]:
return None
from_addr = entry["from_addr"]
from_name = str(entry.get("from_name") or from_addr.split("@", 1)[0])
date = str(entry.get("date") or "")
flags = entry.get("flags") or []
if not isinstance(flags, list):
flags = []
body = entry["body"]
# If body is a bare string (no headers), wrap it into RFC 822 so
# IMAP BODY[]/RFC822 fetches return a complete message — matches
# the hardcoded _BAIT_EMAILS shape.
if "\r\n\r\n" not in body and "\n\n" not in body:
headers = (
f"Date: {date}\r\n"
f"From: {from_name} <{from_addr}>\r\n"
f"To: {entry['to_addr']}\r\n"
f"Subject: {entry['subject']}\r\n"
"\r\n"
)
body = headers + body
return {
"uid": 0,
"flags": list(flags),
"from_name": from_name,
"from_addr": from_addr,
"to_addr": entry["to_addr"],
"subject": entry["subject"],
"date": date,
"body": body,
}
def _load_seed_json(path: Path) -> list[dict]:
"""Load a JSON list of dicts into entries. Bad rows logged + skipped."""
try:
for p in path.rglob("*.eml"):
if p.is_file():
eml_paths.append(p)
raw = path.read_text(encoding="utf-8")
data = json.loads(raw)
except (OSError, ValueError) as exc:
print(f"imap: seed json {path} unreadable: {exc}", file=sys.stderr)
return []
if not isinstance(data, list):
print(f"imap: seed json {path} must be a list", file=sys.stderr)
return []
out: list[dict] = []
for i, entry in enumerate(data):
normalized = _seed_dict_to_entry(entry)
if normalized is None:
print(f"imap: seed json {path}[{i}] missing required keys", file=sys.stderr)
continue
out.append(normalized)
return out
def _scan_seed(path: Path) -> list[dict]:
"""Resolve *path* into seed entries.
- Directory: rglob ``*.eml`` (mtime-sorted) + every ``*.json`` (each
a list of dicts).
- File ending in ``.json``: that JSON list.
- File ending in ``.eml``: that single EML.
"""
out: list[dict] = []
try:
if path.is_dir():
eml_paths = sorted(
(p for p in path.rglob("*.eml") if p.is_file()),
key=lambda p: p.stat().st_mtime,
)
for p in eml_paths:
d = _eml_to_dict(p)
if d is not None:
out.append(d)
for jp in sorted(p for p in path.rglob("*.json") if p.is_file()):
out.extend(_load_seed_json(jp))
elif path.suffix.lower() == ".json" and path.is_file():
out.extend(_load_seed_json(path))
elif path.suffix.lower() == ".eml" and path.is_file():
d = _eml_to_dict(path)
if d is not None:
out.append(d)
except OSError:
return []
eml_paths.sort(key=lambda p: p.stat().st_mtime)
out: list[dict] = []
for i, p in enumerate(eml_paths, start=1):
d = _eml_to_dict(p, uid=i)
if d is not None:
out.append(d)
return out
def _get_emails() -> list[dict]:
"""Return the active mailbox list.
"""Return the active mailbox list: ``_BAIT_EMAILS`` concatenated
with seed entries (directory of .eml/.json or a single .json/.eml).
Resolution order:
1. ``IMAP_EMAIL_SEED`` set + dir exists + at least one parseable EML
→ that list (rescan-throttled).
2. Else → the hardcoded ``_BAIT_EMAILS`` fallback.
UIDs are renumbered sequentially across the combined list so the
hardcoded baits keep their original UIDs (1..10) and seeded entries
pick up from len(_BAIT_EMAILS)+1.
"""
global _seed_cache, _seed_cache_dir_mtime, _seed_cache_loaded_at
global _seed_cache, _seed_cache_path_mtime, _seed_cache_loaded_at
if not _EMAIL_SEED_PATH:
return _BAIT_EMAILS
seed_dir = Path(_EMAIL_SEED_PATH)
seed_path = Path(_EMAIL_SEED_PATH)
try:
dir_stat = seed_dir.stat()
path_stat = seed_path.stat()
except OSError:
return _BAIT_EMAILS
now = time.monotonic()
fresh_enough = (
_seed_cache is not None
and (now - _seed_cache_loaded_at) < _SEED_RESCAN_INTERVAL
and dir_stat.st_mtime == _seed_cache_dir_mtime
and path_stat.st_mtime == _seed_cache_path_mtime
)
if fresh_enough:
return _seed_cache or _BAIT_EMAILS
scanned = _scan_seed_dir(seed_dir)
if not scanned:
# Don't poison the cache with an empty list; a single early
# FETCH before emailgen has run would otherwise stick the
# mailbox at 0 for _SEED_RESCAN_INTERVAL seconds.
seed = _seed_cache or []
else:
seed = _scan_seed(seed_path)
_seed_cache = seed
_seed_cache_path_mtime = path_stat.st_mtime
_seed_cache_loaded_at = now
if not seed:
return _BAIT_EMAILS
_seed_cache = scanned
_seed_cache_dir_mtime = dir_stat.st_mtime
_seed_cache_loaded_at = now
return scanned
combined: list[dict] = list(_BAIT_EMAILS)
base_uid = len(_BAIT_EMAILS)
for i, entry in enumerate(seed, start=1):
renumbered = dict(entry)
renumbered["uid"] = base_uid + i
combined.append(renumbered)
return combined
# ── Logging ───────────────────────────────────────────────────────────────────

View File

@@ -10,7 +10,9 @@ Credentials via IMAP_USERS env var (shared with IMAP service).
"""
import asyncio
import json
import os
import sys
import time
from pathlib import Path
from typing import cast
@@ -33,11 +35,13 @@ VALID_USERS: dict[str, str] = {
u: p for part in _RAW_USERS.split(",") if ":" in part for u, p in [part.split(":", 1)]
}
# Path to a directory of ``*.eml`` files dropped by the orchestrator
# emailgen worker (``/var/spool/decnet-emails/`` by convention). When
# set and populated, those EMLs replace the hardcoded fallback list
# below — same semantics as the IMAP template. Empty / missing falls
# back so a fresh deployment is never silent.
# Operator/realism-engine email seed source. Two shapes accepted:
# 1. Directory: walked recursively for ``*.eml`` (RFC 822 on disk —
# what the realism-engine emailgen worker drops in) and ``*.json``
# (operator-curated lists of dicts; each dict formatted into RFC
# 5322 on load).
# 2. Single ``*.json`` or ``*.eml`` file.
# Loaded entries are CONCATENATED with ``_BAIT_EMAILS`` — never replace.
_EMAIL_SEED_PATH = os.environ.get("POP3_EMAIL_SEED", "")
_SEED_RESCAN_INTERVAL = float(os.environ.get("POP3_EMAIL_SEED_RESCAN", "5"))
@@ -172,60 +176,128 @@ _BAIT_EMAILS: list[str] = [
# ── Spool-backed email loader ─────────────────────────────────────────────────
# POP3 stores each message as a single str (full RFC 822 text); when the
# emailgen spool is configured, we read every *.eml in it and serve the
# raw bytes as the corpus. Same caching strategy as the IMAP template.
# POP3 stores each message as a single str (full RFC 822 text). Seeded
# entries CONCATENATE onto ``_BAIT_EMAILS`` (never replace). Both .eml
# and .json sources are accepted — JSON dicts are formatted into RFC
# 5322 on load. Caching strategy matches the IMAP template.
_seed_cache: list[str] | None = None
_seed_cache_dir_mtime: float = 0.0
_seed_cache_path_mtime: float = 0.0
_seed_cache_loaded_at: float = 0.0
_SEED_JSON_REQUIRED = ("from_addr", "to_addr", "subject", "body")
def _scan_seed_dir(path: Path) -> list[str]:
"""Walk *path* recursively and return each .eml's raw text content,
sorted by mtime so older threads get lower indices."""
eml_paths: list[Path] = []
def _seed_dict_to_rfc822(entry: dict) -> str | None:
"""Format a JSON-supplied dict into a full RFC 5322 message string.
Required keys: from_addr, to_addr, subject, body. Optional: date,
from_name. Returns None for malformed entries (caller skips + logs).
"""
if not isinstance(entry, dict):
return None
for key in _SEED_JSON_REQUIRED:
if not isinstance(entry.get(key), str) or not entry[key]:
return None
from_addr = entry["from_addr"]
from_name = str(entry.get("from_name") or from_addr.split("@", 1)[0])
date = str(entry.get("date") or "")
body = entry["body"]
if "\r\n\r\n" in body or "\n\n" in body:
return body # already a full RFC 822 message
return (
f"Date: {date}\r\n"
f"From: {from_name} <{from_addr}>\r\n"
f"To: {entry['to_addr']}\r\n"
f"Subject: {entry['subject']}\r\n"
"\r\n"
f"{body}"
)
def _load_seed_json(path: Path) -> list[str]:
"""Load a JSON list of dicts → list of RFC 822 strings."""
try:
for p in path.rglob("*.eml"):
if p.is_file():
eml_paths.append(p)
raw = path.read_text(encoding="utf-8")
data = json.loads(raw)
except (OSError, ValueError) as exc:
print(f"pop3: seed json {path} unreadable: {exc}", file=sys.stderr)
return []
if not isinstance(data, list):
print(f"pop3: seed json {path} must be a list", file=sys.stderr)
return []
out: list[str] = []
for i, entry in enumerate(data):
formatted = _seed_dict_to_rfc822(entry)
if formatted is None:
print(f"pop3: seed json {path}[{i}] missing required keys", file=sys.stderr)
continue
out.append(formatted)
return out
def _scan_seed(path: Path) -> list[str]:
"""Resolve *path* into RFC 822 strings (.eml direct, .json formatted)."""
out: list[str] = []
try:
if path.is_dir():
eml_paths = sorted(
(p for p in path.rglob("*.eml") if p.is_file()),
key=lambda p: p.stat().st_mtime,
)
for p in eml_paths:
try:
out.append(p.read_text(encoding="utf-8", errors="replace"))
except OSError:
continue
for jp in sorted(p for p in path.rglob("*.json") if p.is_file()):
out.extend(_load_seed_json(jp))
elif path.suffix.lower() == ".json" and path.is_file():
out.extend(_load_seed_json(path))
elif path.suffix.lower() == ".eml" and path.is_file():
try:
out.append(path.read_text(encoding="utf-8", errors="replace"))
except OSError:
pass
except OSError:
return []
eml_paths.sort(key=lambda p: p.stat().st_mtime)
out: list[str] = []
for p in eml_paths:
try:
out.append(p.read_text(encoding="utf-8", errors="replace"))
except OSError:
continue
return out
def _get_emails() -> list[str]:
"""Return the active corpus. Same fallback rules as IMAP template."""
global _seed_cache, _seed_cache_dir_mtime, _seed_cache_loaded_at
"""Return ``_BAIT_EMAILS`` concatenated with seed entries.
Empty / missing seed → just ``_BAIT_EMAILS``. Hardcoded baits keep
indices 1..10; seeded messages start at 11.
"""
global _seed_cache, _seed_cache_path_mtime, _seed_cache_loaded_at
if not _EMAIL_SEED_PATH:
return _BAIT_EMAILS
seed_dir = Path(_EMAIL_SEED_PATH)
seed_path = Path(_EMAIL_SEED_PATH)
try:
dir_stat = seed_dir.stat()
path_stat = seed_path.stat()
except OSError:
return _BAIT_EMAILS
now = time.monotonic()
fresh_enough = (
_seed_cache is not None
and (now - _seed_cache_loaded_at) < _SEED_RESCAN_INTERVAL
and dir_stat.st_mtime == _seed_cache_dir_mtime
and path_stat.st_mtime == _seed_cache_path_mtime
)
if fresh_enough:
return _seed_cache or _BAIT_EMAILS
scanned = _scan_seed_dir(seed_dir)
if not scanned:
seed = _seed_cache or []
else:
seed = _scan_seed(seed_path)
_seed_cache = seed
_seed_cache_path_mtime = path_stat.st_mtime
_seed_cache_loaded_at = now
if not seed:
return _BAIT_EMAILS
_seed_cache = scanned
_seed_cache_dir_mtime = dir_stat.st_mtime
_seed_cache_loaded_at = now
return scanned
return list(_BAIT_EMAILS) + seed
# ── Logging ───────────────────────────────────────────────────────────────────

View File

@@ -110,14 +110,9 @@ All route decorators now declare `responses={401: {"description": "Not authentic
~~**File:** `decnet/web/sqlite_repository.py` (~400 lines)~~
Fully refactored to `decnet/web/db/` modular layout: `models.py` (SQLModel schema), `repository.py` (abstract base), `sqlite/repository.py` (SQLite implementation), `sqlite/database.py` (engine/session factory). Commit `de84cc6`.
### DEBT-026 — IMAP/POP3 bait emails not configurable via service config
**Files:** `templates/imap/server.py`, `templates/pop3/server.py`, `decnet/services/imap.py`, `decnet/services/pop3.py`
Bait emails are hardcoded. A stub env var `IMAP_EMAIL_SEED` is read but currently ignored. Full implementation requires:
1. `IMAP_EMAIL_SEED` points to a JSON file with a list of `{from_, to, subject, date, body}` dicts.
2. `templates/imap/server.py` loads and merges/replaces `_BAIT_EMAILS` from that file at startup.
3. `decnet/services/imap.py` `compose_fragment()` reads `service_cfg["email_seed"]` and injects `IMAP_EMAIL_SEED` + a bind-mount for the seed file into the compose fragment.
4. Same pattern for POP3 (`POP3_EMAIL_SEED`).
**Status:** Stub in place — full wiring deferred to next session.
### ~~DEBT-026 — IMAP/POP3 bait emails not configurable via service config~~ ✅ RESOLVED
**Files:** `templates/imap/server.py`, `templates/pop3/server.py`, `decnet/services/imap.py`, `decnet/services/pop3.py`
Resolved 2026-05-03. `IMAP_EMAIL_SEED` / `POP3_EMAIL_SEED` now accept either a directory (rglob `*.eml` and `*.json`) or a single `.json` / `.eml` file. JSON entries are dicts with required keys `from_addr`, `to_addr`, `subject`, `body` (optional `from_name`, `date`, `flags`); bare-body entries are wrapped into RFC 5322 on load. Loaded entries CONCATENATE with `_BAIT_EMAILS` (additive to the realism-engine emailgen output — the hardcoded baits are no longer replaced). `compose_fragment()` reads `service_cfg["email_seed"]` and bind-mounts the host path read-only at `/var/spool/decnet-emails/seed`.
---
@@ -713,7 +708,7 @@ user who needs it.
| DEBT-023 | 🟢 Low | Infra | deferred (needs docker pull) |
| ~~DEBT-024~~ | ✅ | Infra | resolved |
| ~~DEBT-025~~ | ✅ | Build | resolved |
| DEBT-026 | 🟡 Medium | Features | deferred (out of scope) |
| ~~DEBT-026~~ | ✅ | Features | resolved 2026-05-03 |
| DEBT-027 | 🟡 Medium | Features | deferred (out of scope) |
| DEBT-028 | 🟡 Medium | Testing | deferred (needs DinD CI) |
| DEBT-029 | 🟡 Medium | Architecture / Bus | ✅ resolved |
@@ -737,5 +732,5 @@ user who needs it.
| DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring |
| DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open |
**Remaining open:** DEBT-011 (Alembic), DEBT-023 (image pinning), DEBT-026 (modular mailboxes), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-032 (fingerprint rotation detection), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
**Remaining open:** DEBT-011 (Alembic), DEBT-023 (image pinning), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-032 (fingerprint rotation detection), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
**Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt.

View File

@@ -1,12 +1,15 @@
"""Spool-backed email loading for the IMAP template.
"""Seed-backed email loading for the IMAP template.
Verifies that when ``IMAP_EMAIL_SEED`` points at a directory of .eml
files, the IMAP server serves those (replacing the hardcoded
``_BAIT_EMAILS`` fallback). Empty / missing dir falls back gracefully.
Verifies that when ``IMAP_EMAIL_SEED`` points at a directory of .eml /
.json (or a single .json / .eml), the IMAP server CONCATENATES those
entries onto the hardcoded ``_BAIT_EMAILS`` baseline. Empty / missing
input falls back to the baseline alone — the realism-engine output and
operator-curated seeds are additive, never replacing.
"""
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
from types import ModuleType
@@ -14,6 +17,8 @@ from unittest.mock import MagicMock, patch
import pytest
_HARDCODED = 10 # length of templates/imap/server.py::_BAIT_EMAILS
_EML_TEMPLATE = (
"From: {from_name} <{from_addr}>\r\n"
@@ -78,41 +83,47 @@ def _seed(tmp_path: Path, n: int = 3) -> Path:
def test_falls_back_to_hardcoded_when_seed_unset(tmp_path):
mod = _load_imap({})
emails = mod._get_emails()
# The shipped fallback ships exactly 10 entries.
assert len(emails) == 10
# The shipped baseline is exactly 10 entries.
assert len(emails) == _HARDCODED
assert emails[0]["from_addr"] == "devops@company.internal"
def test_falls_back_when_seed_dir_missing(tmp_path):
mod = _load_imap({"IMAP_EMAIL_SEED": str(tmp_path / "does-not-exist")})
emails = mod._get_emails()
assert len(emails) == 10 # fallback
assert len(emails) == _HARDCODED # baseline only
def test_falls_back_when_seed_dir_empty(tmp_path):
(tmp_path / "spool").mkdir()
mod = _load_imap({"IMAP_EMAIL_SEED": str(tmp_path / "spool")})
assert len(mod._get_emails()) == 10 # fallback (no .eml files)
assert len(mod._get_emails()) == _HARDCODED # baseline only
def test_loads_eml_files_from_spool(tmp_path):
def test_seed_concatenates_with_hardcoded(tmp_path):
spool = _seed(tmp_path, n=3)
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert len(emails) == 3
senders = {e["from_addr"] for e in emails}
# Hardcoded 10 + 3 spooled = 13.
assert len(emails) == _HARDCODED + 3
# Hardcoded baseline keeps original UIDs 1..10.
assert emails[0]["uid"] == 1
assert emails[0]["from_addr"] == "devops@company.internal"
assert emails[9]["uid"] == 10
# Seeded entries pick up at UID 11.
assert {e["uid"] for e in emails[10:]} == {11, 12, 13}
senders = {e["from_addr"] for e in emails[10:]}
assert senders == {"sender0@corp.com", "sender1@corp.com", "sender2@corp.com"}
# UIDs are 1-based and unique.
assert {e["uid"] for e in emails} == {1, 2, 3}
def test_loaded_eml_carries_full_rfc822_body(tmp_path):
spool = _seed(tmp_path, n=1)
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert "From:" in emails[0]["body"]
assert "Subject: Topic 0" in emails[0]["body"]
assert "Body of message 0." in emails[0]["body"]
seeded = emails[_HARDCODED]
assert "From:" in seeded["body"]
assert "Subject: Topic 0" in seeded["body"]
assert "Body of message 0." in seeded["body"]
def test_corrupt_eml_skipped_not_fatal(tmp_path):
@@ -127,11 +138,56 @@ def test_corrupt_eml_skipped_not_fatal(tmp_path):
(spool / "broken.eml").mkdir()
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert len(emails) == 1
assert emails[0]["from_addr"] == "good@corp.com"
assert len(emails) == _HARDCODED + 1
assert emails[-1]["from_addr"] == "good@corp.com"
def test_select_inbox_reflects_spool_count(tmp_path):
def test_json_seed_file_loaded(tmp_path):
seed = tmp_path / "seed.json"
seed.write_text(json.dumps([
{
"from_addr": "ceo@corp.com",
"from_name": "CEO",
"to_addr": "admin@corp.com",
"subject": "Q4 numbers",
"date": "Mon, 27 Apr 2026 09:00:00 +0000",
"body": "Please review attached.",
},
{
# Missing 'subject' — must be skipped, not crash.
"from_addr": "ghost@corp.com",
"to_addr": "admin@corp.com",
"body": "no subject",
},
]))
mod = _load_imap({"IMAP_EMAIL_SEED": str(seed)})
emails = mod._get_emails()
assert len(emails) == _HARDCODED + 1 # one valid, one dropped
seeded = emails[-1]
assert seeded["uid"] == _HARDCODED + 1
assert seeded["from_addr"] == "ceo@corp.com"
# JSON entry without RFC 822 headers gets wrapped into a full message.
assert "From: CEO <ceo@corp.com>" in seeded["body"]
assert "Subject: Q4 numbers" in seeded["body"]
def test_dir_with_eml_and_json_concatenated(tmp_path):
spool = _seed(tmp_path, n=2)
(spool / "extra.json").write_text(json.dumps([
{
"from_addr": "ops@corp.com",
"to_addr": "admin@corp.com",
"subject": "extra",
"body": "hi",
},
]))
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
# Hardcoded + 2 .eml + 1 .json
assert len(emails) == _HARDCODED + 3
def test_select_inbox_reflects_concatenated_count(tmp_path):
spool = _seed(tmp_path, n=4)
mod = _load_imap({"IMAP_EMAIL_SEED": str(spool)})
proto = mod.IMAPProtocol()
@@ -144,5 +200,6 @@ def test_select_inbox_reflects_spool_count(tmp_path):
written.clear()
proto.data_received(b"B0 SELECT INBOX\r\n")
out = b"".join(written)
assert b"* 4 EXISTS" in out
assert b"[UIDNEXT 5]" in out
expected_total = _HARDCODED + 4
assert f"* {expected_total} EXISTS".encode() in out
assert f"[UIDNEXT {expected_total + 1}]".encode() in out

View File

@@ -1,12 +1,19 @@
"""Spool-backed email loading for the POP3 template."""
"""Seed-backed email loading for the POP3 template.
Concat semantics: hardcoded ``_BAIT_EMAILS`` + .eml/.json from the seed
path. Mirrors the IMAP test file.
"""
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
from types import ModuleType
from unittest.mock import MagicMock, patch
_HARDCODED = 10 # length of templates/pop3/server.py::_BAIT_EMAILS
_EML_TEMPLATE = (
"From: Sender <sender@corp.com>\r\n"
@@ -61,25 +68,54 @@ def _seed(tmp_path: Path, n: int) -> Path:
def test_falls_back_when_seed_unset(tmp_path):
mod = _load_pop3({})
assert len(mod._get_emails()) == 10 # hardcoded fallback
assert len(mod._get_emails()) == _HARDCODED # baseline only
def test_falls_back_when_seed_dir_missing(tmp_path):
mod = _load_pop3({"POP3_EMAIL_SEED": str(tmp_path / "nope")})
assert len(mod._get_emails()) == 10
assert len(mod._get_emails()) == _HARDCODED
def test_loads_emls_from_spool(tmp_path):
def test_seed_concatenates_with_hardcoded(tmp_path):
spool = _seed(tmp_path, n=3)
mod = _load_pop3({"POP3_EMAIL_SEED": str(spool)})
emails = mod._get_emails()
assert len(emails) == 3
# POP3 stores raw RFC 822 strings; verify content round-trips.
assert any("Topic 0" in e for e in emails)
assert all(e.startswith("From:") for e in emails)
# Hardcoded baseline + 3 spooled .eml.
assert len(emails) == _HARDCODED + 3
# Hardcoded entries unchanged at the head.
assert "AWS credentials rotation" in emails[0]
# Seeded entries at the tail.
assert any("Topic 0" in e for e in emails[_HARDCODED:])
assert all(e.startswith("From:") for e in emails[_HARDCODED:])
def test_stat_reflects_spool_size(tmp_path):
def test_json_seed_file_loaded(tmp_path):
seed = tmp_path / "seed.json"
seed.write_text(json.dumps([
{
"from_addr": "ceo@corp.com",
"from_name": "CEO",
"to_addr": "admin@corp.com",
"subject": "Q4 numbers",
"date": "Mon, 27 Apr 2026 09:00:00 +0000",
"body": "Please review attached.",
},
{
# Missing 'subject' — skipped, not fatal.
"from_addr": "ghost@corp.com",
"to_addr": "admin@corp.com",
"body": "no subject",
},
]))
mod = _load_pop3({"POP3_EMAIL_SEED": str(seed)})
emails = mod._get_emails()
assert len(emails) == _HARDCODED + 1
seeded = emails[-1]
assert "Subject: Q4 numbers" in seeded
assert "From: CEO <ceo@corp.com>" in seeded
def test_stat_reflects_concatenated_count(tmp_path):
spool = _seed(tmp_path, n=2)
mod = _load_pop3({"POP3_EMAIL_SEED": str(spool)})
proto = mod.POP3Protocol()
@@ -93,4 +129,5 @@ def test_stat_reflects_spool_size(tmp_path):
written.clear()
proto.data_received(b"STAT\r\n")
out = b"".join(written)
assert out.startswith(b"+OK 2 ")
expected = _HARDCODED + 2
assert out.startswith(f"+OK {expected} ".encode())

View File

@@ -361,3 +361,39 @@ def test_telnet_no_cowrie_env_vars():
"""Ensure no Cowrie env vars bleed into the real telnet service."""
env = _fragment("telnet").get("environment", {})
assert not any(k.startswith("COWRIE_") for k in env)
# IMAP / POP3 email_seed -----------------------------------------------------
def test_imap_no_email_seed_by_default():
fragment = _fragment("imap")
assert "IMAP_EMAIL_SEED" not in fragment.get("environment", {})
assert "volumes" not in fragment
def test_imap_email_seed_wires_env_and_volume(tmp_path):
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
fragment = _fragment("imap", service_cfg={"email_seed": str(seed_dir)})
assert fragment["environment"]["IMAP_EMAIL_SEED"] == "/var/spool/decnet-emails/seed"
volumes = fragment.get("volumes") or []
assert len(volumes) == 1
assert volumes[0].endswith(":/var/spool/decnet-emails/seed:ro")
assert volumes[0].startswith(str(seed_dir))
def test_pop3_no_email_seed_by_default():
fragment = _fragment("pop3")
assert "POP3_EMAIL_SEED" not in fragment.get("environment", {})
assert "volumes" not in fragment
def test_pop3_email_seed_wires_env_and_volume(tmp_path):
seed_file = tmp_path / "seed.json"
seed_file.write_text("[]")
fragment = _fragment("pop3", service_cfg={"email_seed": str(seed_file)})
assert fragment["environment"]["POP3_EMAIL_SEED"] == "/var/spool/decnet-emails/seed"
volumes = fragment.get("volumes") or []
assert len(volumes) == 1
assert volumes[0].endswith(":/var/spool/decnet-emails/seed:ro")
assert volumes[0].startswith(str(seed_file))