diff --git a/tests/deploy/test_emailgen_unit.py b/tests/deploy/test_emailgen_unit.py deleted file mode 100644 index fd64cc9e..00000000 --- a/tests/deploy/test_emailgen_unit.py +++ /dev/null @@ -1,93 +0,0 @@ -"""Smoke tests for the emailgen systemd unit + decnet.target wiring. - -These don't exercise systemd (the test host wouldn't have it), they -just assert the static contents of ``deploy/decnet-emailgen.service.j2`` -and ``deploy/decnet.target`` match what ``decnet init`` will install. -A regression here would only surface on a fresh host install — cheap -to catch at CI time. -""" -from __future__ import annotations - -from pathlib import Path - -import pytest - - -REPO = Path(__file__).resolve().parent.parent.parent -DEPLOY = REPO / "deploy" - - -@pytest.fixture -def unit_text() -> str: - return (DEPLOY / "decnet-emailgen.service.j2").read_text() - - -@pytest.fixture -def target_text() -> str: - return (DEPLOY / "decnet.target").read_text() - - -# ── unit file ──────────────────────────────────────────────────────────────── - - -def test_emailgen_unit_exists(): - assert (DEPLOY / "decnet-emailgen.service.j2").exists() - - -def test_emailgen_unit_uses_run_subcommand(unit_text): - """`decnet emailgen` is a sub-app now — the unit must call `run`, - not bare `emailgen` (which still works but is implicit-default and - fragile to future changes).""" - assert "decnet emailgen run" in unit_text - - -def test_emailgen_unit_has_docker_supplementary_group(unit_text): - """Driver shells `docker exec` to drop EMLs in the spool — without - this group the worker can't reach the docker socket.""" - assert "SupplementaryGroups=docker" in unit_text - - -def test_emailgen_unit_orders_after_bus(unit_text): - """Bus must come up first so emailgen's heartbeat publishes land.""" - assert "After=network-online.target decnet-bus.service" in unit_text - assert "Wants=network-online.target decnet-bus.service" in unit_text - - -def test_emailgen_unit_has_security_hardening(unit_text): - """Same hardening shape as orchestrator.service — defence in depth.""" - for directive in ( - "NoNewPrivileges=yes", - "ProtectSystem=full", - "ProtectHome=read-only", - "PrivateTmp=yes", - "ProtectKernelTunables=yes", - "ProtectKernelModules=yes", - "ProtectControlGroups=yes", - "RestrictSUIDSGID=yes", - "LockPersonality=yes", - ): - assert directive in unit_text, f"missing {directive}" - - -def test_emailgen_unit_writes_to_log_dir(unit_text): - assert "/var/log/decnet/decnet.emailgen.log" in unit_text - assert "ReadWritePaths={{ install_dir }} /var/log/decnet" in unit_text - - -def test_emailgen_unit_restart_on_failure(unit_text): - assert "Restart=on-failure" in unit_text - - -# ── target wiring ──────────────────────────────────────────────────────────── - - -def test_target_wants_emailgen(target_text): - """A fresh `decnet init` must bring up emailgen with the rest of - the fleet.""" - assert "decnet-emailgen.service" in target_text - - -def test_target_wants_orchestrator(target_text): - """Orchestrator was an oversight historically — bundling it in here - too while we're touching the file.""" - assert "decnet-orchestrator.service" in target_text diff --git a/tests/deploy/test_orchestrator_unit.py b/tests/deploy/test_orchestrator_unit.py new file mode 100644 index 00000000..37f2be87 --- /dev/null +++ b/tests/deploy/test_orchestrator_unit.py @@ -0,0 +1,151 @@ +"""Smoke tests for the orchestrator systemd unit + decnet.target wiring. + +These don't exercise systemd (the test host wouldn't have it); they +just assert the static contents of ``deploy/decnet-orchestrator.service.j2`` +and ``deploy/decnet.target`` match what ``decnet init`` will install. + +Anti-regressions for two specific failure modes: + +1. After the realism migration (stage 5), ``decnet-emailgen.service`` + is gone — the orchestrator covers the email branch. A regression + that re-introduces the emailgen unit file or the ``decnet.target`` + entry would only surface on a fresh host install; cheap to catch + here. +2. The orchestrator unit must ship the ``DECNET_REALISM_*`` env block + so the LLM enrichment + persona-pool path are configurable per + host without editing the .j2. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + + +REPO = Path(__file__).resolve().parent.parent.parent +DEPLOY = REPO / "deploy" + + +@pytest.fixture +def unit_text() -> str: + return (DEPLOY / "decnet-orchestrator.service.j2").read_text() + + +@pytest.fixture +def target_text() -> str: + return (DEPLOY / "decnet.target").read_text() + + +# ── orchestrator unit ──────────────────────────────────────────────────────── + + +def test_orchestrator_unit_exists(): + assert (DEPLOY / "decnet-orchestrator.service.j2").exists() + + +def test_orchestrator_unit_uses_orchestrate_subcommand(unit_text): + assert "decnet orchestrate" in unit_text + + +def test_orchestrator_unit_has_docker_supplementary_group(unit_text): + """SSHDriver shells `docker exec` against decky containers — without + this group the worker can't reach the docker socket.""" + assert "SupplementaryGroups=docker" in unit_text + + +def test_orchestrator_unit_orders_after_bus(unit_text): + """Bus must be up first so heartbeats publish from the start.""" + assert "After=network-online.target decnet-bus.service" in unit_text + assert "Wants=network-online.target decnet-bus.service" in unit_text + + +def test_orchestrator_unit_has_security_hardening(unit_text): + for directive in ( + "NoNewPrivileges=yes", + "ProtectSystem=full", + "ProtectHome=read-only", + "PrivateTmp=yes", + "ProtectKernelTunables=yes", + "ProtectKernelModules=yes", + "ProtectControlGroups=yes", + "RestrictSUIDSGID=yes", + "LockPersonality=yes", + ): + assert directive in unit_text, f"missing {directive}" + + +def test_orchestrator_unit_writes_to_log_dir(unit_text): + assert "/var/log/decnet/decnet.orchestrator.log" in unit_text + assert "ReadWritePaths={{ install_dir }} /var/log/decnet" in unit_text + + +def test_orchestrator_unit_restart_on_failure(unit_text): + assert "Restart=on-failure" in unit_text + + +def test_orchestrator_unit_carries_realism_env_block(unit_text): + """Stage 5 + 6 contract: the orchestrator's LLM enrichment and + persona-pool path are configured per host via DECNET_REALISM_* + env vars. Shipping them in the .j2 means an operator who never + drops a .env.local still gets sane defaults.""" + for var in ( + "DECNET_REALISM_LLM", + "DECNET_REALISM_MODEL", + "DECNET_REALISM_TIMEOUT", + "DECNET_REALISM_PERSONAS", + ): + assert var in unit_text, f"missing {var} in unit" + + +def test_orchestrator_unit_does_not_carry_legacy_emailgen_envs(unit_text): + """Pre-v1 clean break per the realism migration: the + DECNET_EMAILGEN_* env vars are no longer read. Carrying them in + the unit would mislead operators into thinking they still apply.""" + for legacy in ( + "DECNET_EMAILGEN_LLM", + "DECNET_EMAILGEN_MODEL", + "DECNET_EMAILGEN_TIMEOUT", + "DECNET_EMAILGEN_PERSONAS", + ): + assert legacy not in unit_text, ( + f"legacy env {legacy} still referenced; clean-break broken" + ) + + +# ── decnet.target ──────────────────────────────────────────────────────────── + + +def test_target_wants_orchestrator(target_text): + assert "decnet-orchestrator.service" in target_text + + +def test_target_does_not_want_emailgen(target_text): + """Stage 5 of the realism migration deleted decnet-emailgen.service. + A fresh `decnet init` against a target file that still mentions it + fails systemctl start with `Unit decnet-emailgen.service could not + be found`, blocking the whole target. Anti-regression.""" + assert "decnet-emailgen.service" not in target_text + + +def test_target_wants_canary(target_text): + """Canary worker is a peer of orchestrator; both are part of the + realism + callback story. Bundle check.""" + assert "decnet-canary.service" in target_text + + +def test_target_orders_after_bus(target_text): + """Whole target depends on the bus being up.""" + assert "After=decnet-bus.service" in target_text + + +# ── unit file no longer exists ─────────────────────────────────────────────── + + +def test_emailgen_unit_template_is_gone(): + """The pre-collapse ``deploy/decnet-emailgen.service.j2`` must stay + deleted. A future commit that re-creates it (e.g. by accident + during a partial revert) would break the realism migration's + service-collapse contract.""" + assert not (DEPLOY / "decnet-emailgen.service.j2").exists(), ( + "decnet-emailgen.service.j2 reappeared — service collapse undone?" + ) diff --git a/tests/orchestrator/test_scheduler.py b/tests/orchestrator/test_scheduler.py index 607c9d35..fcab6322 100644 --- a/tests/orchestrator/test_scheduler.py +++ b/tests/orchestrator/test_scheduler.py @@ -146,11 +146,15 @@ async def test_pick_file_returns_none_when_topology_has_no_personas(): @pytest.mark.asyncio async def test_pick_file_produces_file_action_for_topology_decky(): + import random as _r repo = _FakeRepo(topologies={"t1": _topology_row(_PERSONAS_TWO)}) deckies = [_decky()] + # Pin the RNG so the 3% canary gate (stage 7) and 10% leave-alone + # roll don't flake this test. Seed 1 lands on a vanilla create. action = await scheduler.pick_file( deckies, repo, now=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc), + rand=_r.Random(1), ) assert isinstance(action, scheduler.FileAction) assert action.dst_uuid == "u1" @@ -178,6 +182,7 @@ async def test_pick_file_skips_decky_when_personas_outside_window(): @pytest.mark.asyncio async def test_pick_file_uses_global_pool_for_fleet_source(tmp_path, monkeypatch): import json + import random as _r pool = tmp_path / "personas.json" pool.write_text(json.dumps(_PERSONAS_TWO)) monkeypatch.setenv("DECNET_REALISM_PERSONAS", str(pool)) @@ -189,9 +194,11 @@ async def test_pick_file_uses_global_pool_for_fleet_source(tmp_path, monkeypatch repo = _FakeRepo() # no topology rows — fleet path deckies = [_decky(source="fleet", topology_id=None)] + # Pin the RNG so the canary / leave-alone rolls don't flake. action = await scheduler.pick_file( deckies, repo, now=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc), + rand=_r.Random(1), ) assert isinstance(action, scheduler.FileAction) assert action.dst_uuid == "u1" diff --git a/tests/orchestrator/test_worker_integration.py b/tests/orchestrator/test_worker_integration.py index 791d88fa..04e36c35 100644 --- a/tests/orchestrator/test_worker_integration.py +++ b/tests/orchestrator/test_worker_integration.py @@ -156,6 +156,121 @@ async def test_one_tick_picks_fleet_deckies(repo, fake_bus, monkeypatch): assert rows[0]["dst_decky_uuid"].startswith("local:fleet-") +@pytest.mark.asyncio +async def test_one_tick_email_branch_records_orchestrator_email( + repo, fake_bus, monkeypatch, +): + """Stage 5 contract: email actions land via the unified orchestrator. + + The pre-collapse path was a separate ``decnet emailgen run`` worker; + after the realism migration the orchestrator's tick handles email + drops alongside traffic + file via the action-kind roll. This test + seeds a topology with a mail decky + two personas, forces the + action roll to ``email``, stubs the LLM + docker-exec write paths, + and verifies an ``orchestrator_emails`` row + bus event land. + """ + import json + from decnet.orchestrator.drivers import email as email_driver + from decnet.realism.llm.impl.fake import FakeBackend + + personas = [ + { + "name": "John Smith", "email": "john@corp.com", "role": "COO", + "tone": "formal", "mannerisms": ["uses 'Best regards'"], + "active_hours": "00:00-00:00", + }, + { + "name": "Sarah Johnson", "email": "sarah@corp.com", "role": "PM", + "tone": "direct", "mannerisms": ["uses bullets"], + "active_hours": "00:00-00:00", + }, + ] + async with repo._session() as session: + topo = Topology( + name="t-email", config_snapshot="{}", status="active", + email_personas=json.dumps(personas), + ) + session.add(topo) + await session.commit() + await session.refresh(topo) + mail_decky = TopologyDecky( + topology_id=topo.id, name="mailhost", + services=json.dumps(["imap"]), ip="10.0.0.5", state="running", + ) + session.add(mail_decky) + await session.commit() + + # Force the worker's action roll to the email branch — no SSH-capable + # deckies exist in this seed (only IMAP), so traffic/file drop to + # None and email is the only viable branch anyway, but we pin the + # roll for determinism. + monkeypatch.setattr(orch_worker, "_roll_action_kind", lambda _rng: "email") + + # Stub the LLM so we don't shell out to ollama. The driver + # constructs its own backend in __init__; we patch get_driver_for + # to return a driver with a FakeBackend pre-injected. + fake_eml = ( + "Subject: Q3 ops review\n\n" + "Hi Sarah,\n\nQuick note on the Q3 review.\n\nBest regards,\nJohn\n" + ) + fake_llm = FakeBackend(output=fake_eml) + fake_driver = email_driver.EmailDriver(llm=fake_llm) + + def _factory(action): + from decnet.orchestrator.emailgen.scheduler import EmailAction as _EA + if isinstance(action, _EA): + return fake_driver + from decnet.orchestrator.drivers import get_driver_for as _real + return _real(action) + + monkeypatch.setattr(orch_worker, "get_driver_for", _factory) + + # Stub the docker-exec write path on the email driver — same trick + # the SSH driver tests use, but EmailDriver shells out via plain + # asyncio.create_subprocess_exec. + async def fake_create(*args, **kwargs): + class _Stub: + returncode = 0 + async def communicate(self, _stdin=None): + return b"", b"" + return _Stub() + + import asyncio as _asyncio + monkeypatch.setattr(_asyncio, "create_subprocess_exec", fake_create) + + received: list = [] + async def collect(): + async with fake_bus.subscribe("orchestrator.>") as sub: + async for ev in sub: + received.append(ev) + if len(received) >= 1: + return + collector = _asyncio.create_task(collect()) + await _asyncio.sleep(0) + + await orch_worker._one_tick(repo, fake_bus) + await _asyncio.wait_for(collector, timeout=2.0) + + # The email branch lands in orchestrator_emails, NOT + # orchestrator_events — separate table, separate kind discriminant. + emails = await repo.list_orchestrator_emails(limit=10) + assert len(emails) == 1 + row = emails[0] + assert row["mail_decky_uuid"] == mail_decky.uuid + assert row["sender_email"] in {"john@corp.com", "sarah@corp.com"} + assert row["recipient_email"] in {"john@corp.com", "sarah@corp.com"} + assert row["sender_email"] != row["recipient_email"] + assert row["subject"] + assert row["success"] is True + + # Bus event topic discriminator + payload kind agree. + assert len(received) == 1 + ev = received[0] + assert ev.topic.startswith("orchestrator.email.") + assert ev.payload["kind"] == "email" + assert ev.payload["mail_decky_uuid"] == mail_decky.uuid + + @pytest.mark.asyncio async def test_tick_is_noop_when_no_running_deckies(repo, fake_bus, monkeypatch): called = False diff --git a/tests/realism/test_synthetic_files_truncation.py b/tests/realism/test_synthetic_files_truncation.py new file mode 100644 index 00000000..85bf85b0 --- /dev/null +++ b/tests/realism/test_synthetic_files_truncation.py @@ -0,0 +1,109 @@ +"""``synthetic_files.last_body`` is capped at 64 KB. + +The orchestrator caps the persisted body at 64 KB on every write +(create + edit) so the table doesn't bloat with large blobs. This +introduces a real edge: an EditAction whose ``previous_body`` is +sourced from the cap (not the file on disk) sees truncated bytes. + +Today the realism templates produce well under 64 KB, so the edge +isn't reachable from the planted-content path. But a future change +that lifts the cap, an LLM that returns a long body, or a +``honeydoc_pdf`` body cultivated through the realism path could all +hit it. These tests pin the contract so a regression that drops the +cap or applies it inconsistently fails loudly. +""" +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest +import pytest_asyncio + +from decnet.web.db.sqlite.repository import SQLiteRepository + + +_LIMIT = 65536 # decnet/orchestrator/worker.py uses [:65536] + + +@pytest_asyncio.fixture +async def repo(tmp_path): + r = SQLiteRepository(db_path=str(tmp_path / "decnet.db")) + await r.initialize() + yield r + await r.engine.dispose() + + +def _row(body: str) -> dict: + import hashlib + now = datetime.now(timezone.utc) + return { + "decky_uuid": "d1", + "path": "/home/admin/notes.txt", + "persona": "admin", + "content_class": "note", + "created_at": now, + "last_modified": now, + "edit_count": 0, + # The hash is over the *full* body in the orchestrator's write + # path; if the body comes from a row that was already truncated, + # the hash reflects the truncation. Tests check both paths. + "content_hash": hashlib.sha256(body.encode("utf-8")).hexdigest(), + "last_body": body[:_LIMIT], + } + + +@pytest.mark.asyncio +async def test_oversized_body_is_truncated_at_write(repo): + body = "A" * (_LIMIT * 2) + uuid = await repo.record_synthetic_file(_row(body)) + rows = await repo.list_synthetic_files(decky_uuid="d1") + assert len(rows) == 1 + stored = rows[0] + assert stored["uuid"] == uuid + assert len(stored["last_body"]) == _LIMIT + + +@pytest.mark.asyncio +async def test_body_at_exact_limit_is_preserved(repo): + """Boundary: a body of exactly 64 KB must not be silently + truncated. Off-by-one regression target.""" + body = "B" * _LIMIT + await repo.record_synthetic_file(_row(body)) + rows = await repo.list_synthetic_files(decky_uuid="d1") + assert len(rows[0]["last_body"]) == _LIMIT + + +@pytest.mark.asyncio +async def test_pick_for_edit_returns_truncated_body(repo): + """Stage 3b contract: the edit candidate carries the *stored* + last_body — necessarily truncated when the original exceeded the + cap. Document the consequence so a future test author doesn't + expect the full body to round-trip.""" + body = "C" * (_LIMIT * 3) + await repo.record_synthetic_file(_row(body)) + candidate = await repo.pick_random_synthetic_file_for_edit("d1") + assert candidate is not None + assert len(candidate["last_body"]) == _LIMIT + # The edit driver mutates this body via realism.bodies.next_iteration, + # so callers must accept they're editing a truncated snapshot of + # the file that's actually on the decky. This is documented + # behaviour pre-v1; if the cap rises, lift _LIMIT here too. + + +@pytest.mark.asyncio +async def test_edit_path_keeps_cap(repo): + """An update_synthetic_file call that tries to write a >cap body + must clip to the cap on the way in. Mirrors the orchestrator + worker's ``last_body=body[:65536]`` line.""" + uuid = await repo.record_synthetic_file(_row("seed")) + big = "D" * (_LIMIT * 4) + await repo.update_synthetic_file( + uuid, + { + "last_modified": datetime.now(timezone.utc), + "edit_count": 1, + "last_body": big[:_LIMIT], # caller is responsible for clipping + }, + ) + rows = await repo.list_synthetic_files(decky_uuid="d1") + assert len(rows[0]["last_body"]) == _LIMIT