feat(fleet): reconciler converges JSON ↔ DB ↔ docker

Adds decnet.fleet.reconciler — a pure async function plus a long-lived
worker — that periodically reconciles the three sources of truth on a
DECNET host:

  1. decnet-state.json (CLI-canonical fleet record)
  2. fleet_deckies table (DB mirror, written by engine.deployer)
  3. docker inspect (actual per-container runtime state)

Drift handling:
  * JSON has X, DB doesn't       → INSERT (deploy ran with DB offline)
  * DB has X (this host), JSON doesn't → DELETE (teardown ran with DB offline)
  * Both have X, docker disagrees → flip state to running/failed/degraded
  * Docker socket unreachable    → leave existing state alone (don't
                                    torch every row to torn_down)

Cross-host safety: deletions are scoped to host_uuid for the local host;
a master that runs both a local fleet and swarm workers will never
clobber a peer's slice.

CLI:
  decnet reconcile --once            # one-shot, prints counts
  decnet reconcile [--interval N]    # long-lived worker, mirrors
                                     # orchestrator's lifecycle (control
                                     # listener + heartbeat + tick loop)

Promotes decnet/fleet.py → decnet/fleet/ package so the reconciler can
live alongside it without name collision (build_deckies_from_ini and
all_service_names re-exported unchanged via __init__.py).

14 new tests cover state aggregation rules, all four drift directions,
host_uuid scoping, docker-unreachable safety, and worker shutdown via
the bus control event.
This commit is contained in:
2026-04-26 21:14:48 -04:00
parent 8814902999
commit f775223a83
7 changed files with 676 additions and 1 deletions

View File

@@ -32,6 +32,7 @@ from . import (
listener,
orchestrator,
profiler,
reconciler,
sniffer,
swarm,
swarmctl,
@@ -55,7 +56,7 @@ for _mod in (
api, swarmctl, agent, updater, listener, forwarder,
swarm,
deploy, lifecycle, workers, inventory,
web, profiler, orchestrator, sniffer, db,
web, profiler, orchestrator, reconciler, sniffer, db,
topology, bus, geoip, init, webhook,
):
_mod.register(app)

62
decnet/cli/reconciler.py Normal file
View File

@@ -0,0 +1,62 @@
from __future__ import annotations
import typer
from . import utils as _utils
from .utils import console, log
def register(app: typer.Typer) -> None:
@app.command(name="reconcile")
def reconcile_cmd(
once: bool = typer.Option(
False, "--once",
help="Run a single reconcile pass and exit (no daemon loop).",
),
interval: int = typer.Option(
30, "--interval", "-i",
help="Seconds between reconcile passes (ignored with --once).",
),
daemon: bool = typer.Option(
False, "--daemon", "-d",
help="Detach to background as a daemon process (long-lived only).",
),
) -> None:
"""Converge fleet state across decnet-state.json, the DB, and docker."""
import asyncio
from decnet.web.dependencies import repo
if once:
from decnet.fleet.reconciler import reconcile_once
async def _one() -> None:
await repo.initialize()
counts = await reconcile_once(repo)
console.print(
f"[bold cyan]reconcile:[/] "
f"inserted={counts['inserted']} "
f"deleted={counts['deleted']} "
f"state_updated={counts['state_updated']}"
)
asyncio.run(_one())
return
from decnet.fleet.reconciler_worker import fleet_reconciler_worker
if daemon:
log.info("reconciler daemonizing interval=%d", interval)
_utils._daemonize()
log.info("reconciler starting interval=%d", interval)
console.print(
f"[bold cyan]Fleet reconciler starting[/] (interval: {interval}s)"
)
async def _run() -> None:
await repo.initialize()
await fleet_reconciler_worker(repo, interval=interval)
try:
asyncio.run(_run())
except KeyboardInterrupt:
console.print("\n[yellow]Reconciler stopped.[/]")

171
decnet/fleet/reconciler.py Normal file
View File

@@ -0,0 +1,171 @@
"""Fleet reconciler — converges JSON ↔ DB ↔ docker.
Three sources of truth on a DECNET host can disagree:
1. ``decnet-state.json`` — written by ``engine.deployer.deploy/teardown``;
the canonical record for offline / no-API consumers (``decnet status``,
``decnet teardown``, sniffer, collector).
2. ``fleet_deckies`` table — DB mirror written by the same deployer; what
the orchestrator, web dashboard, and REST API see.
3. ``docker inspect`` — actual per-container runtime state.
Drift sources we accept and correct:
* CLI deploy on a host whose DB was unreachable → JSON ahead of DB.
* CLI teardown on a host whose DB was unreachable → DB ahead of JSON.
* Operator hand-edited ``decnet-state.json`` → JSON ahead of DB.
* Container crashed / was killed externally → DB state stale until docker
is observed.
Resolution:
* JSON has X, DB doesn't → INSERT.
* DB has X (this host), JSON doesn't → DELETE.
* Both have X → state := docker-aggregated state.
Cross-host safety: deletions are scoped to ``host_uuid == this host``.
A multi-host master that runs swarm workers (each with their own
reconciler) must never delete a peer's rows.
The reconciler intentionally does NOT publish bus events for state
changes today — the dashboard reads the DB on every render. A
``fleet.{name}.state`` topic is a natural follow-up if SSE consumers
appear, but is out of scope for this PR.
"""
from __future__ import annotations
import asyncio
from typing import Any, Callable, Optional
from decnet.config import DecnetConfig, load_state as _real_load_state
from decnet.logging import get_logger
from decnet.web.db.models import LOCAL_HOST_SENTINEL
from decnet.web.db.repository import BaseRepository
logger = get_logger("fleet.reconciler")
# ── docker observation ────────────────────────────────────────────────────────
def _collect_container_states(
docker_client_factory: Optional[Callable[[], Any]] = None,
) -> Optional[dict[str, str]]:
"""Return ``{container_name: status}`` or ``None`` if docker is unreachable.
``None`` is the explicit "unknown" signal — callers must NOT treat
docker failure as "every container is gone" (that would torch every
fleet row to ``torn_down`` whenever the docker socket is busy).
"""
if docker_client_factory is None:
try:
import docker # local import — keeps tests import-clean
docker_client_factory = docker.from_env
except ImportError:
return None
try:
client = docker_client_factory()
return {
c.name: c.status
for c in client.containers.list(all=True, ignore_removed=True)
}
except Exception as exc: # noqa: BLE001
logger.debug("reconciler: docker query failed: %s", exc)
return None
def _aggregate_decky_state(
decky_name: str,
services: list[str],
container_states: dict[str, str],
) -> str:
"""Aggregate per-decky state from per-service container statuses.
``running`` — every expected service container is ``running``.
``failed`` — every observed container is non-running.
``degraded`` — partial: some running, some not (or some missing).
``torn_down`` — no expected container observed at all.
"""
expected = {f"{decky_name}-{svc.replace('_', '-')}" for svc in services}
seen = {n: s for n, s in container_states.items() if n in expected}
if not seen:
return "torn_down"
statuses = set(seen.values())
if statuses == {"running"} and len(seen) == len(expected):
return "running"
if "running" not in statuses:
return "failed"
return "degraded"
# ── reconcile pass ────────────────────────────────────────────────────────────
async def reconcile_once(
repo: BaseRepository,
*,
host_uuid: str = LOCAL_HOST_SENTINEL,
load_state_fn: Callable[[], Optional[tuple[DecnetConfig, Any]]] = _real_load_state,
docker_client_factory: Optional[Callable[[], Any]] = None,
) -> dict[str, int]:
"""Single reconciliation pass. Returns counts of work done."""
counts = {"inserted": 0, "deleted": 0, "state_updated": 0}
state = await asyncio.to_thread(load_state_fn)
json_deckies: list[Any] = list(state[0].deckies) if state else []
db_rows = await repo.list_fleet_deckies(host_uuid=host_uuid)
db_by_name = {r["name"]: r for r in db_rows}
container_states = await asyncio.to_thread(
_collect_container_states, docker_client_factory,
)
docker_known = container_states is not None
json_names = {d.name for d in json_deckies}
# 1. INSERT: present in JSON, absent from DB.
for d in json_deckies:
if d.name in db_by_name:
continue
new_state = (
_aggregate_decky_state(d.name, list(d.services), container_states)
if docker_known else "running"
)
await repo.upsert_fleet_decky({
"host_uuid": d.host_uuid or host_uuid,
"name": d.name,
"services": list(d.services),
"decky_config": d.model_dump(mode="json"),
"decky_ip": d.ip,
"state": new_state,
})
counts["inserted"] += 1
# 2. DELETE: present in DB (this host), absent from JSON.
# Scoped to host_uuid by list_fleet_deckies(host_uuid=...) call above —
# peer-host rows are never visible here, so we can't accidentally
# clobber another worker's slice.
for r in db_rows:
if r["name"] not in json_names:
await repo.delete_fleet_decky(
host_uuid=r["host_uuid"], name=r["name"],
)
counts["deleted"] += 1
# 3. STATE: present in both, docker says something fresh.
if docker_known:
for d in json_deckies:
existing = db_by_name.get(d.name)
if existing is None:
continue # already handled in step 1
new_state = _aggregate_decky_state(
d.name, list(d.services), container_states,
)
if existing.get("state") != new_state:
await repo.update_fleet_decky_state(
host_uuid=existing["host_uuid"],
name=d.name,
state=new_state,
)
counts["state_updated"] += 1
return counts

View File

@@ -0,0 +1,84 @@
"""Long-lived periodic reconciler worker.
Modeled on :mod:`decnet.orchestrator.worker`: same control listener, same
heartbeat helper, same shutdown semantics. One tick = one
:func:`reconcile_once` pass.
Default interval is short (30s) because reconciliation is cheap when
nothing has drifted (three reads, no writes), and a short cadence keeps
the dashboard's view of crashed containers fresh.
"""
from __future__ import annotations
import asyncio
import contextlib
from decnet.bus.factory import get_bus
from decnet.bus.publish import (
run_control_listener,
run_health_heartbeat,
)
from decnet.fleet.reconciler import reconcile_once
from decnet.logging import get_logger
from decnet.web.db.models import LOCAL_HOST_SENTINEL
from decnet.web.db.repository import BaseRepository
logger = get_logger("fleet.reconciler")
async def fleet_reconciler_worker(
repo: BaseRepository,
*,
interval: int = 30,
host_uuid: str = LOCAL_HOST_SENTINEL,
) -> None:
"""Periodically converge JSON ↔ DB ↔ docker for the local host.
Honours the bus control topic (``system.reconciler.control``) for
graceful shutdown — same lifecycle contract as every other DECNET
worker.
"""
logger.info("fleet reconciler started interval=%ds host=%s", interval, host_uuid)
bus = None
try:
bus = get_bus(client_name="reconciler")
await bus.connect()
except Exception as exc: # noqa: BLE001
logger.warning(
"reconciler: bus unavailable, continuing without publish: %s", exc,
)
bus = None
shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "reconciler"))
control_task = asyncio.create_task(
run_control_listener(bus, "reconciler", shutdown),
)
try:
while not shutdown.is_set():
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
except asyncio.TimeoutError:
pass # normal tick
if shutdown.is_set():
break
try:
counts = await reconcile_once(repo, host_uuid=host_uuid)
if any(counts.values()):
logger.info(
"reconcile inserted=%d deleted=%d state_updated=%d",
counts["inserted"], counts["deleted"],
counts["state_updated"],
)
except Exception as exc: # noqa: BLE001
logger.error("reconcile tick failed: %s", exc)
finally:
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()