Files
DECNET/decnet/orchestrator/worker.py
anti 32eeb0c813 refactor(orchestrator): collapse decnet-emailgen.service into orchestrator
Stage 5 of the realism migration. Email generation is no longer a
separate worker / systemd unit / CLI subcommand — the orchestrator's
single tick loop covers SSH traffic, file plants, and email drops.
Going from 21 services to 20.

Worker:
- _one_tick rolls between traffic / file / email (45/45/10 weights).
  The 10% email weight at a 60s orchestrator interval produces ~one
  email per 10 minutes, close to the pre-collapse 5-minute cadence.
- get_driver_for(action) (stage 4) handles SSH vs Email dispatch.
- Quiet branches fall through so a (decky-set, persona-pool,
  mail-decky) shape that silences one branch doesn't waste the tick.
- Periodic prune covers both orchestrator_events and
  orchestrator_emails tables.

Deletions:
- deploy/decnet-emailgen.service.j2
- decnet/orchestrator/emailgen/worker.py
- decnet/cli/emailgen.py
- tests/orchestrator/emailgen/test_worker_integration.py

Renames (history-preserving):
- decnet/web/router/emailgen/ -> decnet/web/router/realism/
- tests/api/emailgen/        -> tests/api/realism/
- tests/cli/test_emailgen_*  -> tests/cli/test_realism_*

Public surface changes (clean break, pre-v1):
- API URL /api/v1/emailgen/personas -> /api/v1/realism/personas
- CLI `decnet emailgen import-personas` -> `decnet realism
  import-personas`. `decnet emailgen run` is gone — the orchestrator
  covers it.
- gating.py: emailgen master-only group replaced by realism.
- decnet-orchestrator.service.j2: DECNET_REALISM_* env block added.
- decnet.target: decnet-emailgen.service entry removed.
- frontend: PersonaGeneration.tsx fetches /realism/personas.
2026-04-27 16:33:04 -04:00

334 lines
11 KiB
Python

"""Orchestrator main loop.
One tick = one action pick + one driver invocation + one DB write +
one fire-and-forget bus publish. Intentionally serial — MVP honesty:
a wedged docker exec stalls only this worker, never another.
Three action shapes are folded into the single tick after stage 5 of
the realism migration: SSH traffic between deckies, file plants on
deckies (driven by :func:`decnet.realism.planner.pick`), and email
drops into mail-decky maildirs (driven by
:func:`decnet.orchestrator.emailgen.scheduler.pick`). ``decnet
emailgen`` and ``decnet-emailgen.service`` are gone; this worker
covers all three.
Modeled after :mod:`decnet.profiler.worker` for consistency: same
control listener, same heartbeat helper, same shutdown semantics.
"""
from __future__ import annotations
import asyncio
import contextlib
import hashlib
import secrets
from datetime import datetime, timezone
from decnet.bus.factory import get_bus
from decnet.bus.publish import (
publish_safely,
run_control_listener,
run_health_heartbeat,
)
from decnet.logging import get_logger
from decnet.orchestrator import events, scheduler
from decnet.orchestrator.drivers import get_driver_for
from decnet.orchestrator.emailgen import (
events as email_events,
scheduler as email_scheduler,
)
from decnet.orchestrator.emailgen.scheduler import EmailAction
from decnet.web.db.repository import BaseRepository
logger = get_logger("orchestrator")
# Periodic-prune knobs. Trim per-decky history every _PRUNE_EVERY_TICKS
# to keep orchestrator_events / orchestrator_emails from unbounded
# growth on long-running fleets. Cheap on the write path (zero overhead
# per tick); the cost pays in once every ~100 ticks.
_PRUNE_EVERY_TICKS = 100
_PRUNE_PER_DST_CAP = 10000
_PRUNE_PER_MAIL_DECKY_CAP = 5000
# Action-kind weights for the per-tick roll. Email is rare because
# each LLM round-trip is expensive (~seconds) and the prior emailgen
# worker only ticked every 5 minutes. At a 60s orchestrator interval,
# a 10% email weight produces ~one email every ~10 minutes — close
# enough to the pre-collapse cadence.
_ACTION_WEIGHTS: tuple[tuple[str, int], ...] = (
("traffic", 45),
("file", 45),
("email", 10),
)
async def orchestrator_worker(
repo: BaseRepository,
*,
interval: int = 60,
) -> None:
"""Periodically inject synthetic activity into the running fleet.
Runs as a long-lived asyncio task. Honours the bus control topic
(``system.orchestrator.control``) for graceful shutdown.
"""
logger.info("orchestrator worker started interval=%ds", interval)
bus = None
try:
bus = get_bus(client_name="orchestrator")
await bus.connect()
except Exception as exc: # noqa: BLE001
logger.warning(
"orchestrator: bus unavailable, continuing without publish: %s", exc
)
bus = None
shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "orchestrator"))
control_task = asyncio.create_task(
run_control_listener(bus, "orchestrator", shutdown),
)
tick_n = 0
try:
while not shutdown.is_set():
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
except asyncio.TimeoutError:
pass # normal tick
if shutdown.is_set():
break
try:
await _one_tick(repo, bus)
except Exception as exc: # noqa: BLE001
logger.error("orchestrator tick failed: %s", exc)
tick_n += 1
if tick_n % _PRUNE_EVERY_TICKS == 0:
await _periodic_prune(repo)
finally:
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
async def _periodic_prune(repo: BaseRepository) -> None:
try:
deleted = await repo.prune_orchestrator_events(per_dst_cap=_PRUNE_PER_DST_CAP)
if deleted:
logger.info(
"orchestrator events prune deleted=%d cap=%d",
deleted, _PRUNE_PER_DST_CAP,
)
except Exception as exc: # noqa: BLE001
logger.error("orchestrator events prune failed: %s", exc)
try:
deleted = await repo.prune_orchestrator_emails(
per_decky_cap=_PRUNE_PER_MAIL_DECKY_CAP,
)
if deleted:
logger.info(
"orchestrator emails prune deleted=%d cap=%d",
deleted, _PRUNE_PER_MAIL_DECKY_CAP,
)
except Exception as exc: # noqa: BLE001
logger.error("orchestrator emails prune failed: %s", exc)
def _roll_action_kind(rng: secrets.SystemRandom) -> str:
total = sum(w for _, w in _ACTION_WEIGHTS)
target = rng.randint(1, total)
running = 0
for kind, w in _ACTION_WEIGHTS:
running += w
if target <= running:
return kind
return _ACTION_WEIGHTS[-1][0] # unreachable, satisfy mypy
async def _pick_action(
repo: BaseRepository,
deckies: list[dict],
rng: secrets.SystemRandom,
):
"""Roll an action-kind, then pick the matching action.
Quiet branches fall through to the other two so a (decky-set,
persona-pool, mail-decky) shape that would silence one branch
doesn't waste the whole tick.
"""
kinds_in_priority_order = [_roll_action_kind(rng)]
for kind, _ in _ACTION_WEIGHTS:
if kind not in kinds_in_priority_order:
kinds_in_priority_order.append(kind)
for kind in kinds_in_priority_order:
if kind == "traffic":
action = scheduler.pick(deckies, rand=rng)
elif kind == "file":
action = await scheduler.pick_file(deckies, repo, rand=rng)
elif kind == "email":
try:
action = await email_scheduler.pick(repo, rand=rng)
except Exception as exc: # noqa: BLE001
logger.debug("orchestrator: email pick failed: %s", exc)
action = None
else:
action = None
if action is not None:
return action
return None
async def _one_tick(repo: BaseRepository, bus) -> None:
deckies = await repo.list_running_deckies()
rng = secrets.SystemRandom()
action = await _pick_action(repo, deckies, rng)
if action is None:
ssh_eligible = sum(
1 for d in deckies
if isinstance(d.get("services"), list)
and "ssh" in d["services"]
and d.get("ip")
)
by_source: dict[str, int] = {}
for d in deckies:
src = d.get("source", "unknown")
by_source[src] = by_source.get(src, 0) + 1
logger.debug(
"orchestrator: no actionable deckies "
"(running=%d ssh_eligible=%d sources=%s)",
len(deckies), ssh_eligible, by_source,
)
return
driver = get_driver_for(action)
result = await driver.run(action)
if isinstance(action, EmailAction):
await _persist_email(repo, action, result, bus)
else:
await _persist_event(repo, action, result, bus)
if isinstance(action, scheduler.FileAction) and result.success:
try:
await _record_synthetic_file(repo, action)
except Exception as exc: # noqa: BLE001
logger.warning(
"orchestrator: synthetic_files write failed dst=%s path=%s: %s",
action.dst_uuid, action.path, exc,
)
async def _persist_event(repo, action, result, bus) -> None:
row = events.to_row(action, result)
await repo.record_orchestrator_event(row)
if bus is not None:
topic = events.topic_for(action)
bus_payload = {
"kind": row["kind"],
"protocol": row["protocol"],
"action": row["action"],
"src_decky_uuid": row.get("src_decky_uuid"),
"dst_decky_uuid": row["dst_decky_uuid"],
"success": row["success"],
"payload": result.payload,
"ts": row["ts"].isoformat(),
}
await publish_safely(
bus, topic, bus_payload, event_type=events.event_type_for(action),
)
logger.info(
"orchestrator tick kind=%s success=%s dst=%s",
row["kind"], row["success"], row["dst_decky_uuid"],
)
async def _persist_email(repo, action: EmailAction, result, bus) -> None:
"""Persist + publish an email tick result.
Mirrors the pre-collapse emailgen worker payload exactly so SSE
subscribers and dashboards keep working without a breaking change
to the on-the-wire shape.
"""
row = email_events.to_row(action, result)
await repo.record_orchestrator_email(row)
if bus is not None:
topic = email_events.topic_for(action)
bus_payload = {
"kind": "email",
"mail_decky_uuid": row["mail_decky_uuid"],
"thread_id": row["thread_id"],
"message_id": row["message_id"],
"in_reply_to": row["in_reply_to"],
"sender_email": row["sender_email"],
"recipient_email": row["recipient_email"],
"subject": row["subject"],
"language": row["language"],
"success": row["success"],
"ts": row["ts"].isoformat(),
}
await publish_safely(
bus, topic, bus_payload,
event_type=email_events.event_type_for(action),
)
logger.info(
"orchestrator tick kind=email mail_decky=%s thread=%s success=%s reply=%s",
row["mail_decky_uuid"], row["thread_id"], row["success"], action.is_reply,
)
async def _record_synthetic_file(repo, action) -> None:
"""Persist (or patch) a synthetic_files row after a FileAction plant.
Idempotent on ``(decky_uuid, path)``: when the unique constraint
fires (the file existed already), we patch the existing row's
``last_modified`` / ``content_hash`` / ``last_body`` / bump
``edit_count`` so the dashboard's "files this decky has grown"
view stays accurate even when the orchestrator re-plants the same
location.
"""
body = action.content or ""
content_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
now = datetime.now(timezone.utc)
row = {
"decky_uuid": action.dst_uuid,
"path": action.path,
"persona": action.persona,
"content_class": action.content_class,
"created_at": now,
"last_modified": now,
"edit_count": 0,
"content_hash": content_hash,
# Cap the persisted body — large blobs (DOCX/PDF/canary
# artifacts in stage 7) are wasted disk on this side; the
# decky filesystem holds the canonical bytes.
"last_body": body[:65536],
}
try:
await repo.record_synthetic_file(row)
except Exception: # noqa: BLE001
existing = await repo.list_synthetic_files(
decky_uuid=action.dst_uuid, limit=200,
)
match = next(
(r for r in existing if r.get("path") == action.path), None,
)
if match is None:
raise
await repo.update_synthetic_file(
match["uuid"],
{
"last_modified": now,
"content_hash": content_hash,
"last_body": body[:65536],
"edit_count": int(match.get("edit_count", 0)) + 1,
},
)