tests: realism migration regression coverage

Four gaps from the realism migration plan, plus one flaky test
fixed.

Added:

- tests/deploy/test_orchestrator_unit.py — replaces the dead
  test_emailgen_unit.py. Asserts:
  * decnet-orchestrator.service.j2 carries the DECNET_REALISM_*
    env block (LLM, MODEL, TIMEOUT, PERSONAS) so per-host tuning
    works without editing the .j2.
  * Legacy DECNET_EMAILGEN_* vars are NOT referenced — clean break
    contract from stage 5.
  * decnet.target wants orchestrator + canary, does NOT want
    decnet-emailgen.service. Anti-regression for service-collapse.
  * deploy/decnet-emailgen.service.j2 stays deleted.

- tests/orchestrator/test_worker_integration.py — new
  test_one_tick_email_branch_records_orchestrator_email. Pins the
  action-roll to email, seeds a topology with an IMAP mail decky +
  two personas, stubs LLM + docker-exec write paths, verifies an
  orchestrator_emails row + bus event land. Restores end-to-end
  email coverage that was lost when the pre-collapse
  test_worker_integration.py was deleted.

- tests/realism/test_synthetic_files_truncation.py — pins the 64KB
  last_body cap on create + edit, and documents the consequence:
  edit candidates carry a truncated snapshot of files that exceeded
  the cap. If a future change lifts the cap, _LIMIT in the test
  must lift with it.

Fixed flaky:

- tests/orchestrator/test_scheduler.py — two pick_file tests
  pinned to random.Random(1). Without a seed, the 3% canary gate
  (stage 7) and 10% leave-alone roll occasionally flaked the
  assertions because the _FakeRepo doesn't carry a
  create_canary_token method.

Note: the existing
test_realism_subprocess_import_personas_rejects_in_agent_mode
already covers agent-mode rejection of decnet realism
import-personas; no new gating test needed.
This commit is contained in:
2026-04-27 17:29:25 -04:00
parent a07fb3fe08
commit b86129e35e
5 changed files with 382 additions and 93 deletions

View File

@@ -1,93 +0,0 @@
"""Smoke tests for the emailgen systemd unit + decnet.target wiring.
These don't exercise systemd (the test host wouldn't have it), they
just assert the static contents of ``deploy/decnet-emailgen.service.j2``
and ``deploy/decnet.target`` match what ``decnet init`` will install.
A regression here would only surface on a fresh host install — cheap
to catch at CI time.
"""
from __future__ import annotations
from pathlib import Path
import pytest
REPO = Path(__file__).resolve().parent.parent.parent
DEPLOY = REPO / "deploy"
@pytest.fixture
def unit_text() -> str:
return (DEPLOY / "decnet-emailgen.service.j2").read_text()
@pytest.fixture
def target_text() -> str:
return (DEPLOY / "decnet.target").read_text()
# ── unit file ────────────────────────────────────────────────────────────────
def test_emailgen_unit_exists():
assert (DEPLOY / "decnet-emailgen.service.j2").exists()
def test_emailgen_unit_uses_run_subcommand(unit_text):
"""`decnet emailgen` is a sub-app now — the unit must call `run`,
not bare `emailgen` (which still works but is implicit-default and
fragile to future changes)."""
assert "decnet emailgen run" in unit_text
def test_emailgen_unit_has_docker_supplementary_group(unit_text):
"""Driver shells `docker exec` to drop EMLs in the spool — without
this group the worker can't reach the docker socket."""
assert "SupplementaryGroups=docker" in unit_text
def test_emailgen_unit_orders_after_bus(unit_text):
"""Bus must come up first so emailgen's heartbeat publishes land."""
assert "After=network-online.target decnet-bus.service" in unit_text
assert "Wants=network-online.target decnet-bus.service" in unit_text
def test_emailgen_unit_has_security_hardening(unit_text):
"""Same hardening shape as orchestrator.service — defence in depth."""
for directive in (
"NoNewPrivileges=yes",
"ProtectSystem=full",
"ProtectHome=read-only",
"PrivateTmp=yes",
"ProtectKernelTunables=yes",
"ProtectKernelModules=yes",
"ProtectControlGroups=yes",
"RestrictSUIDSGID=yes",
"LockPersonality=yes",
):
assert directive in unit_text, f"missing {directive}"
def test_emailgen_unit_writes_to_log_dir(unit_text):
assert "/var/log/decnet/decnet.emailgen.log" in unit_text
assert "ReadWritePaths={{ install_dir }} /var/log/decnet" in unit_text
def test_emailgen_unit_restart_on_failure(unit_text):
assert "Restart=on-failure" in unit_text
# ── target wiring ────────────────────────────────────────────────────────────
def test_target_wants_emailgen(target_text):
"""A fresh `decnet init` must bring up emailgen with the rest of
the fleet."""
assert "decnet-emailgen.service" in target_text
def test_target_wants_orchestrator(target_text):
"""Orchestrator was an oversight historically — bundling it in here
too while we're touching the file."""
assert "decnet-orchestrator.service" in target_text

View File

@@ -0,0 +1,151 @@
"""Smoke tests for the orchestrator systemd unit + decnet.target wiring.
These don't exercise systemd (the test host wouldn't have it); they
just assert the static contents of ``deploy/decnet-orchestrator.service.j2``
and ``deploy/decnet.target`` match what ``decnet init`` will install.
Anti-regressions for two specific failure modes:
1. After the realism migration (stage 5), ``decnet-emailgen.service``
is gone — the orchestrator covers the email branch. A regression
that re-introduces the emailgen unit file or the ``decnet.target``
entry would only surface on a fresh host install; cheap to catch
here.
2. The orchestrator unit must ship the ``DECNET_REALISM_*`` env block
so the LLM enrichment + persona-pool path are configurable per
host without editing the .j2.
"""
from __future__ import annotations
from pathlib import Path
import pytest
REPO = Path(__file__).resolve().parent.parent.parent
DEPLOY = REPO / "deploy"
@pytest.fixture
def unit_text() -> str:
return (DEPLOY / "decnet-orchestrator.service.j2").read_text()
@pytest.fixture
def target_text() -> str:
return (DEPLOY / "decnet.target").read_text()
# ── orchestrator unit ────────────────────────────────────────────────────────
def test_orchestrator_unit_exists():
assert (DEPLOY / "decnet-orchestrator.service.j2").exists()
def test_orchestrator_unit_uses_orchestrate_subcommand(unit_text):
assert "decnet orchestrate" in unit_text
def test_orchestrator_unit_has_docker_supplementary_group(unit_text):
"""SSHDriver shells `docker exec` against decky containers — without
this group the worker can't reach the docker socket."""
assert "SupplementaryGroups=docker" in unit_text
def test_orchestrator_unit_orders_after_bus(unit_text):
"""Bus must be up first so heartbeats publish from the start."""
assert "After=network-online.target decnet-bus.service" in unit_text
assert "Wants=network-online.target decnet-bus.service" in unit_text
def test_orchestrator_unit_has_security_hardening(unit_text):
for directive in (
"NoNewPrivileges=yes",
"ProtectSystem=full",
"ProtectHome=read-only",
"PrivateTmp=yes",
"ProtectKernelTunables=yes",
"ProtectKernelModules=yes",
"ProtectControlGroups=yes",
"RestrictSUIDSGID=yes",
"LockPersonality=yes",
):
assert directive in unit_text, f"missing {directive}"
def test_orchestrator_unit_writes_to_log_dir(unit_text):
assert "/var/log/decnet/decnet.orchestrator.log" in unit_text
assert "ReadWritePaths={{ install_dir }} /var/log/decnet" in unit_text
def test_orchestrator_unit_restart_on_failure(unit_text):
assert "Restart=on-failure" in unit_text
def test_orchestrator_unit_carries_realism_env_block(unit_text):
"""Stage 5 + 6 contract: the orchestrator's LLM enrichment and
persona-pool path are configured per host via DECNET_REALISM_*
env vars. Shipping them in the .j2 means an operator who never
drops a .env.local still gets sane defaults."""
for var in (
"DECNET_REALISM_LLM",
"DECNET_REALISM_MODEL",
"DECNET_REALISM_TIMEOUT",
"DECNET_REALISM_PERSONAS",
):
assert var in unit_text, f"missing {var} in unit"
def test_orchestrator_unit_does_not_carry_legacy_emailgen_envs(unit_text):
"""Pre-v1 clean break per the realism migration: the
DECNET_EMAILGEN_* env vars are no longer read. Carrying them in
the unit would mislead operators into thinking they still apply."""
for legacy in (
"DECNET_EMAILGEN_LLM",
"DECNET_EMAILGEN_MODEL",
"DECNET_EMAILGEN_TIMEOUT",
"DECNET_EMAILGEN_PERSONAS",
):
assert legacy not in unit_text, (
f"legacy env {legacy} still referenced; clean-break broken"
)
# ── decnet.target ────────────────────────────────────────────────────────────
def test_target_wants_orchestrator(target_text):
assert "decnet-orchestrator.service" in target_text
def test_target_does_not_want_emailgen(target_text):
"""Stage 5 of the realism migration deleted decnet-emailgen.service.
A fresh `decnet init` against a target file that still mentions it
fails systemctl start with `Unit decnet-emailgen.service could not
be found`, blocking the whole target. Anti-regression."""
assert "decnet-emailgen.service" not in target_text
def test_target_wants_canary(target_text):
"""Canary worker is a peer of orchestrator; both are part of the
realism + callback story. Bundle check."""
assert "decnet-canary.service" in target_text
def test_target_orders_after_bus(target_text):
"""Whole target depends on the bus being up."""
assert "After=decnet-bus.service" in target_text
# ── unit file no longer exists ───────────────────────────────────────────────
def test_emailgen_unit_template_is_gone():
"""The pre-collapse ``deploy/decnet-emailgen.service.j2`` must stay
deleted. A future commit that re-creates it (e.g. by accident
during a partial revert) would break the realism migration's
service-collapse contract."""
assert not (DEPLOY / "decnet-emailgen.service.j2").exists(), (
"decnet-emailgen.service.j2 reappeared — service collapse undone?"
)

View File

@@ -146,11 +146,15 @@ async def test_pick_file_returns_none_when_topology_has_no_personas():
@pytest.mark.asyncio
async def test_pick_file_produces_file_action_for_topology_decky():
import random as _r
repo = _FakeRepo(topologies={"t1": _topology_row(_PERSONAS_TWO)})
deckies = [_decky()]
# Pin the RNG so the 3% canary gate (stage 7) and 10% leave-alone
# roll don't flake this test. Seed 1 lands on a vanilla create.
action = await scheduler.pick_file(
deckies, repo,
now=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc),
rand=_r.Random(1),
)
assert isinstance(action, scheduler.FileAction)
assert action.dst_uuid == "u1"
@@ -178,6 +182,7 @@ async def test_pick_file_skips_decky_when_personas_outside_window():
@pytest.mark.asyncio
async def test_pick_file_uses_global_pool_for_fleet_source(tmp_path, monkeypatch):
import json
import random as _r
pool = tmp_path / "personas.json"
pool.write_text(json.dumps(_PERSONAS_TWO))
monkeypatch.setenv("DECNET_REALISM_PERSONAS", str(pool))
@@ -189,9 +194,11 @@ async def test_pick_file_uses_global_pool_for_fleet_source(tmp_path, monkeypatch
repo = _FakeRepo() # no topology rows — fleet path
deckies = [_decky(source="fleet", topology_id=None)]
# Pin the RNG so the canary / leave-alone rolls don't flake.
action = await scheduler.pick_file(
deckies, repo,
now=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc),
rand=_r.Random(1),
)
assert isinstance(action, scheduler.FileAction)
assert action.dst_uuid == "u1"

View File

@@ -156,6 +156,121 @@ async def test_one_tick_picks_fleet_deckies(repo, fake_bus, monkeypatch):
assert rows[0]["dst_decky_uuid"].startswith("local:fleet-")
@pytest.mark.asyncio
async def test_one_tick_email_branch_records_orchestrator_email(
repo, fake_bus, monkeypatch,
):
"""Stage 5 contract: email actions land via the unified orchestrator.
The pre-collapse path was a separate ``decnet emailgen run`` worker;
after the realism migration the orchestrator's tick handles email
drops alongside traffic + file via the action-kind roll. This test
seeds a topology with a mail decky + two personas, forces the
action roll to ``email``, stubs the LLM + docker-exec write paths,
and verifies an ``orchestrator_emails`` row + bus event land.
"""
import json
from decnet.orchestrator.drivers import email as email_driver
from decnet.realism.llm.impl.fake import FakeBackend
personas = [
{
"name": "John Smith", "email": "john@corp.com", "role": "COO",
"tone": "formal", "mannerisms": ["uses 'Best regards'"],
"active_hours": "00:00-00:00",
},
{
"name": "Sarah Johnson", "email": "sarah@corp.com", "role": "PM",
"tone": "direct", "mannerisms": ["uses bullets"],
"active_hours": "00:00-00:00",
},
]
async with repo._session() as session:
topo = Topology(
name="t-email", config_snapshot="{}", status="active",
email_personas=json.dumps(personas),
)
session.add(topo)
await session.commit()
await session.refresh(topo)
mail_decky = TopologyDecky(
topology_id=topo.id, name="mailhost",
services=json.dumps(["imap"]), ip="10.0.0.5", state="running",
)
session.add(mail_decky)
await session.commit()
# Force the worker's action roll to the email branch — no SSH-capable
# deckies exist in this seed (only IMAP), so traffic/file drop to
# None and email is the only viable branch anyway, but we pin the
# roll for determinism.
monkeypatch.setattr(orch_worker, "_roll_action_kind", lambda _rng: "email")
# Stub the LLM so we don't shell out to ollama. The driver
# constructs its own backend in __init__; we patch get_driver_for
# to return a driver with a FakeBackend pre-injected.
fake_eml = (
"Subject: Q3 ops review\n\n"
"Hi Sarah,\n\nQuick note on the Q3 review.\n\nBest regards,\nJohn\n"
)
fake_llm = FakeBackend(output=fake_eml)
fake_driver = email_driver.EmailDriver(llm=fake_llm)
def _factory(action):
from decnet.orchestrator.emailgen.scheduler import EmailAction as _EA
if isinstance(action, _EA):
return fake_driver
from decnet.orchestrator.drivers import get_driver_for as _real
return _real(action)
monkeypatch.setattr(orch_worker, "get_driver_for", _factory)
# Stub the docker-exec write path on the email driver — same trick
# the SSH driver tests use, but EmailDriver shells out via plain
# asyncio.create_subprocess_exec.
async def fake_create(*args, **kwargs):
class _Stub:
returncode = 0
async def communicate(self, _stdin=None):
return b"", b""
return _Stub()
import asyncio as _asyncio
monkeypatch.setattr(_asyncio, "create_subprocess_exec", fake_create)
received: list = []
async def collect():
async with fake_bus.subscribe("orchestrator.>") as sub:
async for ev in sub:
received.append(ev)
if len(received) >= 1:
return
collector = _asyncio.create_task(collect())
await _asyncio.sleep(0)
await orch_worker._one_tick(repo, fake_bus)
await _asyncio.wait_for(collector, timeout=2.0)
# The email branch lands in orchestrator_emails, NOT
# orchestrator_events — separate table, separate kind discriminant.
emails = await repo.list_orchestrator_emails(limit=10)
assert len(emails) == 1
row = emails[0]
assert row["mail_decky_uuid"] == mail_decky.uuid
assert row["sender_email"] in {"john@corp.com", "sarah@corp.com"}
assert row["recipient_email"] in {"john@corp.com", "sarah@corp.com"}
assert row["sender_email"] != row["recipient_email"]
assert row["subject"]
assert row["success"] is True
# Bus event topic discriminator + payload kind agree.
assert len(received) == 1
ev = received[0]
assert ev.topic.startswith("orchestrator.email.")
assert ev.payload["kind"] == "email"
assert ev.payload["mail_decky_uuid"] == mail_decky.uuid
@pytest.mark.asyncio
async def test_tick_is_noop_when_no_running_deckies(repo, fake_bus, monkeypatch):
called = False

View File

@@ -0,0 +1,109 @@
"""``synthetic_files.last_body`` is capped at 64 KB.
The orchestrator caps the persisted body at 64 KB on every write
(create + edit) so the table doesn't bloat with large blobs. This
introduces a real edge: an EditAction whose ``previous_body`` is
sourced from the cap (not the file on disk) sees truncated bytes.
Today the realism templates produce well under 64 KB, so the edge
isn't reachable from the planted-content path. But a future change
that lifts the cap, an LLM that returns a long body, or a
``honeydoc_pdf`` body cultivated through the realism path could all
hit it. These tests pin the contract so a regression that drops the
cap or applies it inconsistently fails loudly.
"""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
import pytest_asyncio
from decnet.web.db.sqlite.repository import SQLiteRepository
_LIMIT = 65536 # decnet/orchestrator/worker.py uses [:65536]
@pytest_asyncio.fixture
async def repo(tmp_path):
r = SQLiteRepository(db_path=str(tmp_path / "decnet.db"))
await r.initialize()
yield r
await r.engine.dispose()
def _row(body: str) -> dict:
import hashlib
now = datetime.now(timezone.utc)
return {
"decky_uuid": "d1",
"path": "/home/admin/notes.txt",
"persona": "admin",
"content_class": "note",
"created_at": now,
"last_modified": now,
"edit_count": 0,
# The hash is over the *full* body in the orchestrator's write
# path; if the body comes from a row that was already truncated,
# the hash reflects the truncation. Tests check both paths.
"content_hash": hashlib.sha256(body.encode("utf-8")).hexdigest(),
"last_body": body[:_LIMIT],
}
@pytest.mark.asyncio
async def test_oversized_body_is_truncated_at_write(repo):
body = "A" * (_LIMIT * 2)
uuid = await repo.record_synthetic_file(_row(body))
rows = await repo.list_synthetic_files(decky_uuid="d1")
assert len(rows) == 1
stored = rows[0]
assert stored["uuid"] == uuid
assert len(stored["last_body"]) == _LIMIT
@pytest.mark.asyncio
async def test_body_at_exact_limit_is_preserved(repo):
"""Boundary: a body of exactly 64 KB must not be silently
truncated. Off-by-one regression target."""
body = "B" * _LIMIT
await repo.record_synthetic_file(_row(body))
rows = await repo.list_synthetic_files(decky_uuid="d1")
assert len(rows[0]["last_body"]) == _LIMIT
@pytest.mark.asyncio
async def test_pick_for_edit_returns_truncated_body(repo):
"""Stage 3b contract: the edit candidate carries the *stored*
last_body — necessarily truncated when the original exceeded the
cap. Document the consequence so a future test author doesn't
expect the full body to round-trip."""
body = "C" * (_LIMIT * 3)
await repo.record_synthetic_file(_row(body))
candidate = await repo.pick_random_synthetic_file_for_edit("d1")
assert candidate is not None
assert len(candidate["last_body"]) == _LIMIT
# The edit driver mutates this body via realism.bodies.next_iteration,
# so callers must accept they're editing a truncated snapshot of
# the file that's actually on the decky. This is documented
# behaviour pre-v1; if the cap rises, lift _LIMIT here too.
@pytest.mark.asyncio
async def test_edit_path_keeps_cap(repo):
"""An update_synthetic_file call that tries to write a >cap body
must clip to the cap on the way in. Mirrors the orchestrator
worker's ``last_body=body[:65536]`` line."""
uuid = await repo.record_synthetic_file(_row("seed"))
big = "D" * (_LIMIT * 4)
await repo.update_synthetic_file(
uuid,
{
"last_modified": datetime.now(timezone.utc),
"edit_count": 1,
"last_body": big[:_LIMIT], # caller is responsible for clipping
},
)
rows = await repo.list_synthetic_files(decky_uuid="d1")
assert len(rows[0]["last_body"]) == _LIMIT