feat(fleet): systemd unit + bus signal for fleet reconciler
Two pieces, one PR because they share a deployment surface: 1. systemd. decnet-reconciler.service.j2 mirrors the orchestrator unit shape (docker group, hardened sandbox, append-logs). Read-only /var/lib/decnet so it can read decnet-state.json without write access. Auto-discovered by `decnet init` via the existing decnet-*.service.j2 glob — no init.py change needed. Added to decnet.target so `systemctl start decnet.target` brings it up alongside collector / sniffer / mutator / etc. Also added to the agent reaper script so self-destruct cleans it up on workers. 2. Bus signal. reconcile_once now publishes `decky.<host_uuid:name>.state` on every insert / delete / state-changed transition. Reuses the existing DECKY_STATE topic family (no bus/topics.py change → no wiki update needed per the bus-signals doc rule). Composite host_uuid:name segment keeps fleet rows distinguishable from MazeNET TopologyDecky rows whose ids are bare UUIDs. Quiet ticks publish nothing — convergence means silence. Bus is plumbed through the worker, defaults to None for unit-test callers. publish_safely keeps the source-of-truth contract: DB write is authoritative, the publish is best-effort notification. Captures previous_state into a local before update_fleet_decky_state runs — a fake repo that mutates rows in-place would otherwise see the post-update state and report previous == current. Real repos don't have this concern but the fix is cheap and makes the function less order-dependent.
This commit is contained in:
@@ -132,7 +132,7 @@ if command -v docker >/dev/null 2>&1; then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
# Stop+disable every systemd unit the installer may have dropped.
|
# Stop+disable every systemd unit the installer may have dropped.
|
||||||
for unit in decnet-agent decnet-engine decnet-collector decnet-forwarder decnet-prober decnet-sniffer decnet-updater; do
|
for unit in decnet-agent decnet-engine decnet-collector decnet-forwarder decnet-prober decnet-reconciler decnet-sniffer decnet-updater; do
|
||||||
systemctl stop "$unit" 2>/dev/null
|
systemctl stop "$unit" 2>/dev/null
|
||||||
systemctl disable "$unit" 2>/dev/null
|
systemctl disable "$unit" 2>/dev/null
|
||||||
done
|
done
|
||||||
|
|||||||
@@ -37,6 +37,9 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
from typing import Any, Callable, Optional
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.bus.base import BaseBus
|
||||||
|
from decnet.bus.publish import publish_safely
|
||||||
from decnet.config import DecnetConfig, load_state as _real_load_state
|
from decnet.config import DecnetConfig, load_state as _real_load_state
|
||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
||||||
@@ -105,8 +108,15 @@ async def reconcile_once(
|
|||||||
host_uuid: str = LOCAL_HOST_SENTINEL,
|
host_uuid: str = LOCAL_HOST_SENTINEL,
|
||||||
load_state_fn: Callable[[], Optional[tuple[DecnetConfig, Any]]] = _real_load_state,
|
load_state_fn: Callable[[], Optional[tuple[DecnetConfig, Any]]] = _real_load_state,
|
||||||
docker_client_factory: Optional[Callable[[], Any]] = None,
|
docker_client_factory: Optional[Callable[[], Any]] = None,
|
||||||
|
bus: Optional[BaseBus] = None,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
"""Single reconciliation pass. Returns counts of work done."""
|
"""Single reconciliation pass. Returns counts of work done.
|
||||||
|
|
||||||
|
When *bus* is provided, fires ``decky.<host_uuid:name>.state`` on every
|
||||||
|
insert / delete / state transition. The DB write is the source of
|
||||||
|
truth — bus publish is best-effort notification; a dropped event is at
|
||||||
|
most one tick of UI latency.
|
||||||
|
"""
|
||||||
counts = {"inserted": 0, "deleted": 0, "state_updated": 0}
|
counts = {"inserted": 0, "deleted": 0, "state_updated": 0}
|
||||||
|
|
||||||
state = await asyncio.to_thread(load_state_fn)
|
state = await asyncio.to_thread(load_state_fn)
|
||||||
@@ -130,8 +140,9 @@ async def reconcile_once(
|
|||||||
_aggregate_decky_state(d.name, list(d.services), container_states)
|
_aggregate_decky_state(d.name, list(d.services), container_states)
|
||||||
if docker_known else "running"
|
if docker_known else "running"
|
||||||
)
|
)
|
||||||
|
row_host = d.host_uuid or host_uuid
|
||||||
await repo.upsert_fleet_decky({
|
await repo.upsert_fleet_decky({
|
||||||
"host_uuid": d.host_uuid or host_uuid,
|
"host_uuid": row_host,
|
||||||
"name": d.name,
|
"name": d.name,
|
||||||
"services": list(d.services),
|
"services": list(d.services),
|
||||||
"decky_config": d.model_dump(mode="json"),
|
"decky_config": d.model_dump(mode="json"),
|
||||||
@@ -139,6 +150,7 @@ async def reconcile_once(
|
|||||||
"state": new_state,
|
"state": new_state,
|
||||||
})
|
})
|
||||||
counts["inserted"] += 1
|
counts["inserted"] += 1
|
||||||
|
await _emit_state(bus, row_host, d.name, new_state, transition="inserted")
|
||||||
|
|
||||||
# 2. DELETE: present in DB (this host), absent from JSON.
|
# 2. DELETE: present in DB (this host), absent from JSON.
|
||||||
# Scoped to host_uuid by list_fleet_deckies(host_uuid=...) call above —
|
# Scoped to host_uuid by list_fleet_deckies(host_uuid=...) call above —
|
||||||
@@ -150,6 +162,10 @@ async def reconcile_once(
|
|||||||
host_uuid=r["host_uuid"], name=r["name"],
|
host_uuid=r["host_uuid"], name=r["name"],
|
||||||
)
|
)
|
||||||
counts["deleted"] += 1
|
counts["deleted"] += 1
|
||||||
|
await _emit_state(
|
||||||
|
bus, r["host_uuid"], r["name"], "torn_down",
|
||||||
|
transition="deleted",
|
||||||
|
)
|
||||||
|
|
||||||
# 3. STATE: present in both, docker says something fresh.
|
# 3. STATE: present in both, docker says something fresh.
|
||||||
if docker_known:
|
if docker_known:
|
||||||
@@ -160,12 +176,56 @@ async def reconcile_once(
|
|||||||
new_state = _aggregate_decky_state(
|
new_state = _aggregate_decky_state(
|
||||||
d.name, list(d.services), container_states,
|
d.name, list(d.services), container_states,
|
||||||
)
|
)
|
||||||
if existing.get("state") != new_state:
|
previous_state = existing.get("state")
|
||||||
|
if previous_state != new_state:
|
||||||
await repo.update_fleet_decky_state(
|
await repo.update_fleet_decky_state(
|
||||||
host_uuid=existing["host_uuid"],
|
host_uuid=existing["host_uuid"],
|
||||||
name=d.name,
|
name=d.name,
|
||||||
state=new_state,
|
state=new_state,
|
||||||
)
|
)
|
||||||
counts["state_updated"] += 1
|
counts["state_updated"] += 1
|
||||||
|
await _emit_state(
|
||||||
|
bus, existing["host_uuid"], d.name, new_state,
|
||||||
|
transition="state_changed",
|
||||||
|
previous=previous_state,
|
||||||
|
)
|
||||||
|
|
||||||
return counts
|
return counts
|
||||||
|
|
||||||
|
|
||||||
|
async def _emit_state(
|
||||||
|
bus: Optional[BaseBus],
|
||||||
|
host_uuid: str,
|
||||||
|
name: str,
|
||||||
|
state: str,
|
||||||
|
*,
|
||||||
|
transition: str,
|
||||||
|
previous: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Publish ``decky.<host_uuid:name>.state`` on a fleet row transition.
|
||||||
|
|
||||||
|
Topic uses an existing topic family (``DECKY_STATE``) — no
|
||||||
|
bus/topics.py change required. The composite ``host_uuid:name`` keeps
|
||||||
|
fleet rows distinguishable from MazeNET TopologyDecky rows (whose ids
|
||||||
|
are bare UUIDs). A ``:`` is a legal token character; ``.``, ``*``,
|
||||||
|
``>``, and whitespace are the only banned ones (see
|
||||||
|
``bus.topics._reject_tokens``).
|
||||||
|
"""
|
||||||
|
if bus is None:
|
||||||
|
return
|
||||||
|
decky_id = f"{host_uuid}:{name}"
|
||||||
|
payload: dict[str, Any] = {
|
||||||
|
"host_uuid": host_uuid,
|
||||||
|
"name": name,
|
||||||
|
"state": state,
|
||||||
|
"transition": transition,
|
||||||
|
"source": "fleet",
|
||||||
|
}
|
||||||
|
if previous is not None:
|
||||||
|
payload["previous_state"] = previous
|
||||||
|
await publish_safely(
|
||||||
|
bus,
|
||||||
|
_topics.decky(decky_id, _topics.DECKY_STATE),
|
||||||
|
payload,
|
||||||
|
event_type=_topics.DECKY_STATE,
|
||||||
|
)
|
||||||
|
|||||||
@@ -65,7 +65,9 @@ async def fleet_reconciler_worker(
|
|||||||
if shutdown.is_set():
|
if shutdown.is_set():
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
counts = await reconcile_once(repo, host_uuid=host_uuid)
|
counts = await reconcile_once(
|
||||||
|
repo, host_uuid=host_uuid, bus=bus,
|
||||||
|
)
|
||||||
if any(counts.values()):
|
if any(counts.values()):
|
||||||
logger.info(
|
logger.info(
|
||||||
"reconcile inserted=%d deleted=%d state_updated=%d",
|
"reconcile inserted=%d deleted=%d state_updated=%d",
|
||||||
|
|||||||
47
deploy/decnet-reconciler.service.j2
Normal file
47
deploy/decnet-reconciler.service.j2
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=DECNET Fleet Reconciler (converges decnet-state.json ↔ fleet_deckies DB ↔ docker)
|
||||||
|
Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#reconciler
|
||||||
|
After=network-online.target decnet-bus.service
|
||||||
|
Wants=network-online.target decnet-bus.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User={{ user }}
|
||||||
|
Group={{ group }}
|
||||||
|
WorkingDirectory={{ install_dir }}
|
||||||
|
EnvironmentFile=-{{ install_dir }}/.env.local
|
||||||
|
Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.reconciler.log
|
||||||
|
ExecStart={{ venv_dir }}/bin/decnet reconcile
|
||||||
|
StandardOutput=append:/var/log/decnet/decnet.reconciler.log
|
||||||
|
StandardError=append:/var/log/decnet/decnet.reconciler.log
|
||||||
|
|
||||||
|
# The reconciler queries the docker daemon (via `docker.from_env()`) to
|
||||||
|
# observe per-container state. Membership in the docker group lets it
|
||||||
|
# read /var/run/docker.sock without root. It does NOT exec into
|
||||||
|
# containers, bind to the network, or spawn new containers.
|
||||||
|
SupplementaryGroups=docker
|
||||||
|
|
||||||
|
CapabilityBoundingSet=
|
||||||
|
AmbientCapabilities=
|
||||||
|
|
||||||
|
# Security Hardening
|
||||||
|
NoNewPrivileges=yes
|
||||||
|
ProtectSystem=full
|
||||||
|
ProtectHome=read-only
|
||||||
|
PrivateTmp=yes
|
||||||
|
ProtectKernelTunables=yes
|
||||||
|
ProtectKernelModules=yes
|
||||||
|
ProtectControlGroups=yes
|
||||||
|
RestrictSUIDSGID=yes
|
||||||
|
LockPersonality=yes
|
||||||
|
# Read-only access to /var/lib/decnet so we can read decnet-state.json.
|
||||||
|
# Read-write access only to install_dir + log dir.
|
||||||
|
ReadOnlyPaths=/var/lib/decnet
|
||||||
|
ReadWritePaths={{ install_dir }} /var/log/decnet
|
||||||
|
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=5
|
||||||
|
TimeoutStopSec=15
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
@@ -13,6 +13,7 @@ Wants=decnet-bus.service \
|
|||||||
decnet-sniffer.service \
|
decnet-sniffer.service \
|
||||||
decnet-prober.service \
|
decnet-prober.service \
|
||||||
decnet-mutator.service \
|
decnet-mutator.service \
|
||||||
|
decnet-reconciler.service \
|
||||||
decnet-reuse-correlator.service \
|
decnet-reuse-correlator.service \
|
||||||
decnet-enrich.service \
|
decnet-enrich.service \
|
||||||
decnet-clusterer.service \
|
decnet-clusterer.service \
|
||||||
|
|||||||
@@ -255,6 +255,103 @@ class TestReconcileOnce:
|
|||||||
# host-b's row survives
|
# host-b's row survives
|
||||||
assert any(r["host_uuid"] == "host-b" for r in repo.rows)
|
assert any(r["host_uuid"] == "host-b" for r in repo.rows)
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_publishes_decky_state_on_transitions(self):
|
||||||
|
"""When *bus* is provided, every insert/delete/state-change must
|
||||||
|
publish on ``decky.<host_uuid:name>.state``."""
|
||||||
|
from decnet.bus.fake import FakeBus
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
|
||||||
|
published: list = []
|
||||||
|
|
||||||
|
async def collect():
|
||||||
|
async with bus.subscribe("decky.>") as sub:
|
||||||
|
async for ev in sub:
|
||||||
|
published.append(ev)
|
||||||
|
if len(published) >= 3:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
collector = asyncio.create_task(collect())
|
||||||
|
await asyncio.sleep(0) # let subscription register
|
||||||
|
|
||||||
|
repo = FakeRepo([
|
||||||
|
# An existing row that will be deleted (not in JSON).
|
||||||
|
{"host_uuid": "local", "name": "ghost", "services": ["ssh"],
|
||||||
|
"state": "running", "decky_ip": "10.0.0.99"},
|
||||||
|
# An existing row whose state will flip running → failed.
|
||||||
|
{"host_uuid": "local", "name": "d-flip", "services": ["ssh"],
|
||||||
|
"state": "running", "decky_ip": "10.0.0.20"},
|
||||||
|
])
|
||||||
|
json_deckies = [
|
||||||
|
_decky(name="d-new", ip="10.0.0.30", services=["http"]),
|
||||||
|
_decky(name="d-flip", ip="10.0.0.20", services=["ssh"]),
|
||||||
|
]
|
||||||
|
await reconcile_once(
|
||||||
|
repo,
|
||||||
|
load_state_fn=_state_loader(json_deckies),
|
||||||
|
docker_client_factory=_docker_factory({
|
||||||
|
"d-new-http": "running",
|
||||||
|
"d-flip-ssh": "exited",
|
||||||
|
}),
|
||||||
|
bus=bus,
|
||||||
|
)
|
||||||
|
await asyncio.wait_for(collector, timeout=2.0)
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
topics = sorted(ev.topic for ev in published)
|
||||||
|
assert topics == [
|
||||||
|
"decky.local:d-flip.state",
|
||||||
|
"decky.local:d-new.state",
|
||||||
|
"decky.local:ghost.state",
|
||||||
|
]
|
||||||
|
by_name = {ev.payload["name"]: ev.payload for ev in published}
|
||||||
|
assert by_name["d-new"]["transition"] == "inserted"
|
||||||
|
assert by_name["d-new"]["state"] == "running"
|
||||||
|
assert by_name["ghost"]["transition"] == "deleted"
|
||||||
|
assert by_name["ghost"]["state"] == "torn_down"
|
||||||
|
assert by_name["d-flip"]["transition"] == "state_changed"
|
||||||
|
assert by_name["d-flip"]["state"] == "failed"
|
||||||
|
assert by_name["d-flip"]["previous_state"] == "running"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_no_bus_publish_when_already_converged(self):
|
||||||
|
"""Quiet ticks must not publish — otherwise every 30s the bus
|
||||||
|
floods with no-op events."""
|
||||||
|
from decnet.bus.fake import FakeBus
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
try:
|
||||||
|
published: list = []
|
||||||
|
|
||||||
|
async def collect():
|
||||||
|
async with bus.subscribe("decky.>") as sub:
|
||||||
|
async for ev in sub:
|
||||||
|
published.append(ev)
|
||||||
|
|
||||||
|
collector = asyncio.create_task(collect())
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
repo = FakeRepo([
|
||||||
|
{"host_uuid": "local", "name": "d1", "services": ["ssh"],
|
||||||
|
"state": "running", "decky_ip": "10.0.0.10"},
|
||||||
|
])
|
||||||
|
d = _decky(name="d1", services=["ssh"])
|
||||||
|
await reconcile_once(
|
||||||
|
repo,
|
||||||
|
load_state_fn=_state_loader([d]),
|
||||||
|
docker_client_factory=_docker_factory({"d1-ssh": "running"}),
|
||||||
|
bus=bus,
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.1) # give the bus a window to deliver
|
||||||
|
collector.cancel()
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
assert published == []
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_combined_drift_in_one_pass(self):
|
async def test_combined_drift_in_one_pass(self):
|
||||||
"""JSON has new decky AND DB has stale decky AND third decky's
|
"""JSON has new decky AND DB has stale decky AND third decky's
|
||||||
|
|||||||
Reference in New Issue
Block a user