Two pieces, one PR because they share a deployment surface: 1. systemd. decnet-reconciler.service.j2 mirrors the orchestrator unit shape (docker group, hardened sandbox, append-logs). Read-only /var/lib/decnet so it can read decnet-state.json without write access. Auto-discovered by `decnet init` via the existing decnet-*.service.j2 glob — no init.py change needed. Added to decnet.target so `systemctl start decnet.target` brings it up alongside collector / sniffer / mutator / etc. Also added to the agent reaper script so self-destruct cleans it up on workers. 2. Bus signal. reconcile_once now publishes `decky.<host_uuid:name>.state` on every insert / delete / state-changed transition. Reuses the existing DECKY_STATE topic family (no bus/topics.py change → no wiki update needed per the bus-signals doc rule). Composite host_uuid:name segment keeps fleet rows distinguishable from MazeNET TopologyDecky rows whose ids are bare UUIDs. Quiet ticks publish nothing — convergence means silence. Bus is plumbed through the worker, defaults to None for unit-test callers. publish_safely keeps the source-of-truth contract: DB write is authoritative, the publish is best-effort notification. Captures previous_state into a local before update_fleet_decky_state runs — a fake repo that mutates rows in-place would otherwise see the post-update state and report previous == current. Real repos don't have this concern but the fix is cheap and makes the function less order-dependent.
87 lines
2.8 KiB
Python
87 lines
2.8 KiB
Python
"""Long-lived periodic reconciler worker.
|
|
|
|
Modeled on :mod:`decnet.orchestrator.worker`: same control listener, same
|
|
heartbeat helper, same shutdown semantics. One tick = one
|
|
:func:`reconcile_once` pass.
|
|
|
|
Default interval is short (30s) because reconciliation is cheap when
|
|
nothing has drifted (three reads, no writes), and a short cadence keeps
|
|
the dashboard's view of crashed containers fresh.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
|
|
from decnet.bus.factory import get_bus
|
|
from decnet.bus.publish import (
|
|
run_control_listener,
|
|
run_health_heartbeat,
|
|
)
|
|
from decnet.fleet.reconciler import reconcile_once
|
|
from decnet.logging import get_logger
|
|
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
|
from decnet.web.db.repository import BaseRepository
|
|
|
|
logger = get_logger("fleet.reconciler")
|
|
|
|
|
|
async def fleet_reconciler_worker(
|
|
repo: BaseRepository,
|
|
*,
|
|
interval: int = 30,
|
|
host_uuid: str = LOCAL_HOST_SENTINEL,
|
|
) -> None:
|
|
"""Periodically converge JSON ↔ DB ↔ docker for the local host.
|
|
|
|
Honours the bus control topic (``system.reconciler.control``) for
|
|
graceful shutdown — same lifecycle contract as every other DECNET
|
|
worker.
|
|
"""
|
|
logger.info("fleet reconciler started interval=%ds host=%s", interval, host_uuid)
|
|
|
|
bus = None
|
|
try:
|
|
bus = get_bus(client_name="reconciler")
|
|
await bus.connect()
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning(
|
|
"reconciler: bus unavailable, continuing without publish: %s", exc,
|
|
)
|
|
bus = None
|
|
|
|
shutdown = asyncio.Event()
|
|
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "reconciler"))
|
|
control_task = asyncio.create_task(
|
|
run_control_listener(bus, "reconciler", shutdown),
|
|
)
|
|
|
|
try:
|
|
while not shutdown.is_set():
|
|
try:
|
|
await asyncio.wait_for(shutdown.wait(), timeout=interval)
|
|
except asyncio.TimeoutError:
|
|
pass # normal tick
|
|
if shutdown.is_set():
|
|
break
|
|
try:
|
|
counts = await reconcile_once(
|
|
repo, host_uuid=host_uuid, bus=bus,
|
|
)
|
|
if any(counts.values()):
|
|
logger.info(
|
|
"reconcile inserted=%d deleted=%d state_updated=%d",
|
|
counts["inserted"], counts["deleted"],
|
|
counts["state_updated"],
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.error("reconcile tick failed: %s", exc)
|
|
finally:
|
|
for t in (heartbeat_task, control_task):
|
|
t.cancel()
|
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
|
await t
|
|
if bus is not None:
|
|
with contextlib.suppress(Exception):
|
|
await bus.close()
|