feat(fleet): systemd unit + bus signal for fleet reconciler
Two pieces, one PR because they share a deployment surface: 1. systemd. decnet-reconciler.service.j2 mirrors the orchestrator unit shape (docker group, hardened sandbox, append-logs). Read-only /var/lib/decnet so it can read decnet-state.json without write access. Auto-discovered by `decnet init` via the existing decnet-*.service.j2 glob — no init.py change needed. Added to decnet.target so `systemctl start decnet.target` brings it up alongside collector / sniffer / mutator / etc. Also added to the agent reaper script so self-destruct cleans it up on workers. 2. Bus signal. reconcile_once now publishes `decky.<host_uuid:name>.state` on every insert / delete / state-changed transition. Reuses the existing DECKY_STATE topic family (no bus/topics.py change → no wiki update needed per the bus-signals doc rule). Composite host_uuid:name segment keeps fleet rows distinguishable from MazeNET TopologyDecky rows whose ids are bare UUIDs. Quiet ticks publish nothing — convergence means silence. Bus is plumbed through the worker, defaults to None for unit-test callers. publish_safely keeps the source-of-truth contract: DB write is authoritative, the publish is best-effort notification. Captures previous_state into a local before update_fleet_decky_state runs — a fake repo that mutates rows in-place would otherwise see the post-update state and report previous == current. Real repos don't have this concern but the fix is cheap and makes the function less order-dependent.
This commit is contained in:
@@ -37,6 +37,9 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from decnet.bus import topics as _topics
|
||||
from decnet.bus.base import BaseBus
|
||||
from decnet.bus.publish import publish_safely
|
||||
from decnet.config import DecnetConfig, load_state as _real_load_state
|
||||
from decnet.logging import get_logger
|
||||
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
||||
@@ -105,8 +108,15 @@ async def reconcile_once(
|
||||
host_uuid: str = LOCAL_HOST_SENTINEL,
|
||||
load_state_fn: Callable[[], Optional[tuple[DecnetConfig, Any]]] = _real_load_state,
|
||||
docker_client_factory: Optional[Callable[[], Any]] = None,
|
||||
bus: Optional[BaseBus] = None,
|
||||
) -> dict[str, int]:
|
||||
"""Single reconciliation pass. Returns counts of work done."""
|
||||
"""Single reconciliation pass. Returns counts of work done.
|
||||
|
||||
When *bus* is provided, fires ``decky.<host_uuid:name>.state`` on every
|
||||
insert / delete / state transition. The DB write is the source of
|
||||
truth — bus publish is best-effort notification; a dropped event is at
|
||||
most one tick of UI latency.
|
||||
"""
|
||||
counts = {"inserted": 0, "deleted": 0, "state_updated": 0}
|
||||
|
||||
state = await asyncio.to_thread(load_state_fn)
|
||||
@@ -130,8 +140,9 @@ async def reconcile_once(
|
||||
_aggregate_decky_state(d.name, list(d.services), container_states)
|
||||
if docker_known else "running"
|
||||
)
|
||||
row_host = d.host_uuid or host_uuid
|
||||
await repo.upsert_fleet_decky({
|
||||
"host_uuid": d.host_uuid or host_uuid,
|
||||
"host_uuid": row_host,
|
||||
"name": d.name,
|
||||
"services": list(d.services),
|
||||
"decky_config": d.model_dump(mode="json"),
|
||||
@@ -139,6 +150,7 @@ async def reconcile_once(
|
||||
"state": new_state,
|
||||
})
|
||||
counts["inserted"] += 1
|
||||
await _emit_state(bus, row_host, d.name, new_state, transition="inserted")
|
||||
|
||||
# 2. DELETE: present in DB (this host), absent from JSON.
|
||||
# Scoped to host_uuid by list_fleet_deckies(host_uuid=...) call above —
|
||||
@@ -150,6 +162,10 @@ async def reconcile_once(
|
||||
host_uuid=r["host_uuid"], name=r["name"],
|
||||
)
|
||||
counts["deleted"] += 1
|
||||
await _emit_state(
|
||||
bus, r["host_uuid"], r["name"], "torn_down",
|
||||
transition="deleted",
|
||||
)
|
||||
|
||||
# 3. STATE: present in both, docker says something fresh.
|
||||
if docker_known:
|
||||
@@ -160,12 +176,56 @@ async def reconcile_once(
|
||||
new_state = _aggregate_decky_state(
|
||||
d.name, list(d.services), container_states,
|
||||
)
|
||||
if existing.get("state") != new_state:
|
||||
previous_state = existing.get("state")
|
||||
if previous_state != new_state:
|
||||
await repo.update_fleet_decky_state(
|
||||
host_uuid=existing["host_uuid"],
|
||||
name=d.name,
|
||||
state=new_state,
|
||||
)
|
||||
counts["state_updated"] += 1
|
||||
await _emit_state(
|
||||
bus, existing["host_uuid"], d.name, new_state,
|
||||
transition="state_changed",
|
||||
previous=previous_state,
|
||||
)
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
async def _emit_state(
|
||||
bus: Optional[BaseBus],
|
||||
host_uuid: str,
|
||||
name: str,
|
||||
state: str,
|
||||
*,
|
||||
transition: str,
|
||||
previous: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Publish ``decky.<host_uuid:name>.state`` on a fleet row transition.
|
||||
|
||||
Topic uses an existing topic family (``DECKY_STATE``) — no
|
||||
bus/topics.py change required. The composite ``host_uuid:name`` keeps
|
||||
fleet rows distinguishable from MazeNET TopologyDecky rows (whose ids
|
||||
are bare UUIDs). A ``:`` is a legal token character; ``.``, ``*``,
|
||||
``>``, and whitespace are the only banned ones (see
|
||||
``bus.topics._reject_tokens``).
|
||||
"""
|
||||
if bus is None:
|
||||
return
|
||||
decky_id = f"{host_uuid}:{name}"
|
||||
payload: dict[str, Any] = {
|
||||
"host_uuid": host_uuid,
|
||||
"name": name,
|
||||
"state": state,
|
||||
"transition": transition,
|
||||
"source": "fleet",
|
||||
}
|
||||
if previous is not None:
|
||||
payload["previous_state"] = previous
|
||||
await publish_safely(
|
||||
bus,
|
||||
_topics.decky(decky_id, _topics.DECKY_STATE),
|
||||
payload,
|
||||
event_type=_topics.DECKY_STATE,
|
||||
)
|
||||
|
||||
@@ -65,7 +65,9 @@ async def fleet_reconciler_worker(
|
||||
if shutdown.is_set():
|
||||
break
|
||||
try:
|
||||
counts = await reconcile_once(repo, host_uuid=host_uuid)
|
||||
counts = await reconcile_once(
|
||||
repo, host_uuid=host_uuid, bus=bus,
|
||||
)
|
||||
if any(counts.values()):
|
||||
logger.info(
|
||||
"reconcile inserted=%d deleted=%d state_updated=%d",
|
||||
|
||||
Reference in New Issue
Block a user