Files
DECNET/tests/fleet/test_reconciler_worker.py
anti f775223a83 feat(fleet): reconciler converges JSON ↔ DB ↔ docker
Adds decnet.fleet.reconciler — a pure async function plus a long-lived
worker — that periodically reconciles the three sources of truth on a
DECNET host:

  1. decnet-state.json (CLI-canonical fleet record)
  2. fleet_deckies table (DB mirror, written by engine.deployer)
  3. docker inspect (actual per-container runtime state)

Drift handling:
  * JSON has X, DB doesn't       → INSERT (deploy ran with DB offline)
  * DB has X (this host), JSON doesn't → DELETE (teardown ran with DB offline)
  * Both have X, docker disagrees → flip state to running/failed/degraded
  * Docker socket unreachable    → leave existing state alone (don't
                                    torch every row to torn_down)

Cross-host safety: deletions are scoped to host_uuid for the local host;
a master that runs both a local fleet and swarm workers will never
clobber a peer's slice.

CLI:
  decnet reconcile --once            # one-shot, prints counts
  decnet reconcile [--interval N]    # long-lived worker, mirrors
                                     # orchestrator's lifecycle (control
                                     # listener + heartbeat + tick loop)

Promotes decnet/fleet.py → decnet/fleet/ package so the reconciler can
live alongside it without name collision (build_deckies_from_ini and
all_service_names re-exported unchanged via __init__.py).

14 new tests cover state aggregation rules, all four drift directions,
host_uuid scoping, docker-unreachable safety, and worker shutdown via
the bus control event.
2026-04-26 21:14:48 -04:00

73 lines
2.3 KiB
Python

"""Worker shutdown smoke test for fleet_reconciler_worker.
The reconcile logic itself is exercised in test_reconciler.py. This file
just verifies the worker's lifecycle wrapper (control listener + heartbeat
+ tick loop) exits cleanly when the bus shutdown signal fires.
"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from decnet.fleet.reconciler_worker import fleet_reconciler_worker
class _FakeRepo:
async def list_fleet_deckies(self, *, host_uuid=None):
return []
async def upsert_fleet_decky(self, data): pass
async def delete_fleet_decky(self, **kw): pass
async def update_fleet_decky_state(self, **kw): pass
@pytest.mark.anyio
async def test_worker_exits_on_shutdown_event(monkeypatch):
# Patch the bus + control listener so the worker doesn't try to bind
# to a real socket. The control_task will set `shutdown` once we fire it.
fake_bus = AsyncMock()
monkeypatch.setattr(
"decnet.fleet.reconciler_worker.get_bus",
lambda **kw: fake_bus,
)
captured: dict = {}
async def _capturing_control_listener(bus, name, shutdown_event):
captured["shutdown_event"] = shutdown_event
# Hold the event loop briefly so the worker enters its tick wait,
# then trigger shutdown.
await asyncio.sleep(0.05)
shutdown_event.set()
async def _noop_heartbeat(bus, name):
await asyncio.sleep(3600) # never returns naturally
monkeypatch.setattr(
"decnet.fleet.reconciler_worker.run_control_listener",
_capturing_control_listener,
)
monkeypatch.setattr(
"decnet.fleet.reconciler_worker.run_health_heartbeat",
_noop_heartbeat,
)
# Skip docker observation entirely — we just need the loop to exit.
monkeypatch.setattr(
"decnet.fleet.reconciler._real_load_state",
lambda: None,
)
with patch("decnet.fleet.reconciler._collect_container_states",
return_value=None):
# interval=10 (long) so we exit via shutdown, not via tick completion
await asyncio.wait_for(
fleet_reconciler_worker(_FakeRepo(), interval=10),
timeout=2.0,
)
assert captured["shutdown_event"].is_set()
@pytest.fixture
def anyio_backend():
return "asyncio"