diff --git a/decnet/agent/executor.py b/decnet/agent/executor.py index 69851439..9e1c31bb 100644 --- a/decnet/agent/executor.py +++ b/decnet/agent/executor.py @@ -132,7 +132,7 @@ if command -v docker >/dev/null 2>&1; then fi # Stop+disable every systemd unit the installer may have dropped. -for unit in decnet-agent decnet-engine decnet-collector decnet-forwarder decnet-prober decnet-sniffer decnet-updater; do +for unit in decnet-agent decnet-engine decnet-collector decnet-forwarder decnet-prober decnet-reconciler decnet-sniffer decnet-updater; do systemctl stop "$unit" 2>/dev/null systemctl disable "$unit" 2>/dev/null done diff --git a/decnet/fleet/reconciler.py b/decnet/fleet/reconciler.py index e54d2e99..70e8fe5b 100644 --- a/decnet/fleet/reconciler.py +++ b/decnet/fleet/reconciler.py @@ -37,6 +37,9 @@ from __future__ import annotations import asyncio from typing import Any, Callable, Optional +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.publish import publish_safely from decnet.config import DecnetConfig, load_state as _real_load_state from decnet.logging import get_logger from decnet.web.db.models import LOCAL_HOST_SENTINEL @@ -105,8 +108,15 @@ async def reconcile_once( host_uuid: str = LOCAL_HOST_SENTINEL, load_state_fn: Callable[[], Optional[tuple[DecnetConfig, Any]]] = _real_load_state, docker_client_factory: Optional[Callable[[], Any]] = None, + bus: Optional[BaseBus] = None, ) -> dict[str, int]: - """Single reconciliation pass. Returns counts of work done.""" + """Single reconciliation pass. Returns counts of work done. + + When *bus* is provided, fires ``decky..state`` on every + insert / delete / state transition. The DB write is the source of + truth — bus publish is best-effort notification; a dropped event is at + most one tick of UI latency. + """ counts = {"inserted": 0, "deleted": 0, "state_updated": 0} state = await asyncio.to_thread(load_state_fn) @@ -130,8 +140,9 @@ async def reconcile_once( _aggregate_decky_state(d.name, list(d.services), container_states) if docker_known else "running" ) + row_host = d.host_uuid or host_uuid await repo.upsert_fleet_decky({ - "host_uuid": d.host_uuid or host_uuid, + "host_uuid": row_host, "name": d.name, "services": list(d.services), "decky_config": d.model_dump(mode="json"), @@ -139,6 +150,7 @@ async def reconcile_once( "state": new_state, }) counts["inserted"] += 1 + await _emit_state(bus, row_host, d.name, new_state, transition="inserted") # 2. DELETE: present in DB (this host), absent from JSON. # Scoped to host_uuid by list_fleet_deckies(host_uuid=...) call above — @@ -150,6 +162,10 @@ async def reconcile_once( host_uuid=r["host_uuid"], name=r["name"], ) counts["deleted"] += 1 + await _emit_state( + bus, r["host_uuid"], r["name"], "torn_down", + transition="deleted", + ) # 3. STATE: present in both, docker says something fresh. if docker_known: @@ -160,12 +176,56 @@ async def reconcile_once( new_state = _aggregate_decky_state( d.name, list(d.services), container_states, ) - if existing.get("state") != new_state: + previous_state = existing.get("state") + if previous_state != new_state: await repo.update_fleet_decky_state( host_uuid=existing["host_uuid"], name=d.name, state=new_state, ) counts["state_updated"] += 1 + await _emit_state( + bus, existing["host_uuid"], d.name, new_state, + transition="state_changed", + previous=previous_state, + ) return counts + + +async def _emit_state( + bus: Optional[BaseBus], + host_uuid: str, + name: str, + state: str, + *, + transition: str, + previous: Optional[str] = None, +) -> None: + """Publish ``decky..state`` on a fleet row transition. + + Topic uses an existing topic family (``DECKY_STATE``) — no + bus/topics.py change required. The composite ``host_uuid:name`` keeps + fleet rows distinguishable from MazeNET TopologyDecky rows (whose ids + are bare UUIDs). A ``:`` is a legal token character; ``.``, ``*``, + ``>``, and whitespace are the only banned ones (see + ``bus.topics._reject_tokens``). + """ + if bus is None: + return + decky_id = f"{host_uuid}:{name}" + payload: dict[str, Any] = { + "host_uuid": host_uuid, + "name": name, + "state": state, + "transition": transition, + "source": "fleet", + } + if previous is not None: + payload["previous_state"] = previous + await publish_safely( + bus, + _topics.decky(decky_id, _topics.DECKY_STATE), + payload, + event_type=_topics.DECKY_STATE, + ) diff --git a/decnet/fleet/reconciler_worker.py b/decnet/fleet/reconciler_worker.py index e2aeae80..863072ae 100644 --- a/decnet/fleet/reconciler_worker.py +++ b/decnet/fleet/reconciler_worker.py @@ -65,7 +65,9 @@ async def fleet_reconciler_worker( if shutdown.is_set(): break try: - counts = await reconcile_once(repo, host_uuid=host_uuid) + counts = await reconcile_once( + repo, host_uuid=host_uuid, bus=bus, + ) if any(counts.values()): logger.info( "reconcile inserted=%d deleted=%d state_updated=%d", diff --git a/deploy/decnet-reconciler.service.j2 b/deploy/decnet-reconciler.service.j2 new file mode 100644 index 00000000..47663183 --- /dev/null +++ b/deploy/decnet-reconciler.service.j2 @@ -0,0 +1,47 @@ +[Unit] +Description=DECNET Fleet Reconciler (converges decnet-state.json ↔ fleet_deckies DB ↔ docker) +Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#reconciler +After=network-online.target decnet-bus.service +Wants=network-online.target decnet-bus.service + +[Service] +Type=simple +User={{ user }} +Group={{ group }} +WorkingDirectory={{ install_dir }} +EnvironmentFile=-{{ install_dir }}/.env.local +Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.reconciler.log +ExecStart={{ venv_dir }}/bin/decnet reconcile +StandardOutput=append:/var/log/decnet/decnet.reconciler.log +StandardError=append:/var/log/decnet/decnet.reconciler.log + +# The reconciler queries the docker daemon (via `docker.from_env()`) to +# observe per-container state. Membership in the docker group lets it +# read /var/run/docker.sock without root. It does NOT exec into +# containers, bind to the network, or spawn new containers. +SupplementaryGroups=docker + +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +# Read-only access to /var/lib/decnet so we can read decnet-state.json. +# Read-write access only to install_dir + log dir. +ReadOnlyPaths=/var/lib/decnet +ReadWritePaths={{ install_dir }} /var/log/decnet + +Restart=on-failure +RestartSec=5 +TimeoutStopSec=15 + +[Install] +WantedBy=multi-user.target diff --git a/deploy/decnet.target b/deploy/decnet.target index d4fb8e33..cf3a50f8 100644 --- a/deploy/decnet.target +++ b/deploy/decnet.target @@ -13,6 +13,7 @@ Wants=decnet-bus.service \ decnet-sniffer.service \ decnet-prober.service \ decnet-mutator.service \ + decnet-reconciler.service \ decnet-reuse-correlator.service \ decnet-enrich.service \ decnet-clusterer.service \ diff --git a/tests/fleet/test_reconciler.py b/tests/fleet/test_reconciler.py index abfdf53e..53ac24ec 100644 --- a/tests/fleet/test_reconciler.py +++ b/tests/fleet/test_reconciler.py @@ -255,6 +255,103 @@ class TestReconcileOnce: # host-b's row survives assert any(r["host_uuid"] == "host-b" for r in repo.rows) + @pytest.mark.anyio + async def test_publishes_decky_state_on_transitions(self): + """When *bus* is provided, every insert/delete/state-change must + publish on ``decky..state``.""" + from decnet.bus.fake import FakeBus + bus = FakeBus() + await bus.connect() + + published: list = [] + + async def collect(): + async with bus.subscribe("decky.>") as sub: + async for ev in sub: + published.append(ev) + if len(published) >= 3: + return + + try: + collector = asyncio.create_task(collect()) + await asyncio.sleep(0) # let subscription register + + repo = FakeRepo([ + # An existing row that will be deleted (not in JSON). + {"host_uuid": "local", "name": "ghost", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.99"}, + # An existing row whose state will flip running → failed. + {"host_uuid": "local", "name": "d-flip", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.20"}, + ]) + json_deckies = [ + _decky(name="d-new", ip="10.0.0.30", services=["http"]), + _decky(name="d-flip", ip="10.0.0.20", services=["ssh"]), + ] + await reconcile_once( + repo, + load_state_fn=_state_loader(json_deckies), + docker_client_factory=_docker_factory({ + "d-new-http": "running", + "d-flip-ssh": "exited", + }), + bus=bus, + ) + await asyncio.wait_for(collector, timeout=2.0) + finally: + await bus.close() + + topics = sorted(ev.topic for ev in published) + assert topics == [ + "decky.local:d-flip.state", + "decky.local:d-new.state", + "decky.local:ghost.state", + ] + by_name = {ev.payload["name"]: ev.payload for ev in published} + assert by_name["d-new"]["transition"] == "inserted" + assert by_name["d-new"]["state"] == "running" + assert by_name["ghost"]["transition"] == "deleted" + assert by_name["ghost"]["state"] == "torn_down" + assert by_name["d-flip"]["transition"] == "state_changed" + assert by_name["d-flip"]["state"] == "failed" + assert by_name["d-flip"]["previous_state"] == "running" + + @pytest.mark.anyio + async def test_no_bus_publish_when_already_converged(self): + """Quiet ticks must not publish — otherwise every 30s the bus + floods with no-op events.""" + from decnet.bus.fake import FakeBus + bus = FakeBus() + await bus.connect() + try: + published: list = [] + + async def collect(): + async with bus.subscribe("decky.>") as sub: + async for ev in sub: + published.append(ev) + + collector = asyncio.create_task(collect()) + await asyncio.sleep(0) + + repo = FakeRepo([ + {"host_uuid": "local", "name": "d1", "services": ["ssh"], + "state": "running", "decky_ip": "10.0.0.10"}, + ]) + d = _decky(name="d1", services=["ssh"]) + await reconcile_once( + repo, + load_state_fn=_state_loader([d]), + docker_client_factory=_docker_factory({"d1-ssh": "running"}), + bus=bus, + ) + await asyncio.sleep(0.1) # give the bus a window to deliver + collector.cancel() + finally: + await bus.close() + + assert published == [] + @pytest.mark.anyio async def test_combined_drift_in_one_pass(self): """JSON has new decky AND DB has stale decky AND third decky's