feat(engine,api): add orphan topology resource reaper
Topology rows deleted without a proper teardown leave Docker containers and bridge networks behind, holding IPAM pools that cause 403 "Pool overlaps" on the next deploy at the same subnet. - engine/reaper.py walks the local Docker daemon, extracts the 8-char topology prefix from every decnet_t_* resource, and force-removes containers + networks whose prefix is not in the repo. - POST /api/v1/topologies/reap-orphans (admin-only) returns a report of live/orphan prefixes and what was removed. - Resources belonging to live topologies are never touched; per-resource errors are captured without aborting the sweep.
This commit is contained in:
171
decnet/engine/reaper.py
Normal file
171
decnet/engine/reaper.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""Orphan Docker resource reaper for MazeNET topologies.
|
||||
|
||||
Every topology's Docker resources carry the fixed prefix
|
||||
``decnet_t_<first-8-of-topology-uuid>_`` (see
|
||||
:func:`decnet.topology.compose._network_name`). When a topology row is
|
||||
deleted from the DB without a proper teardown — operator error, crashed
|
||||
master, straight ``DELETE FROM topologies`` — the containers and
|
||||
networks linger and steal IPAM pools.
|
||||
|
||||
This module walks the local Docker daemon, extracts the 8-char prefix
|
||||
from every matching container/network, compares against the set of
|
||||
prefixes that *do* map to a known topology, and removes the rest.
|
||||
|
||||
It never touches resources whose prefix matches a live topology, and it
|
||||
never touches non-DECNET resources.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Iterable, Optional
|
||||
|
||||
import docker
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.network import remove_bridge_network
|
||||
|
||||
log = get_logger("engine.reaper")
|
||||
|
||||
# decnet_t_<8hex>_<anything>. The 8-char prefix is sliced from the
|
||||
# topology UUID in decnet.topology.compose._network_name. Tolerate any
|
||||
# suffix (network name, decky name) after the second underscore.
|
||||
_RESOURCE_NAME_RE = re.compile(r"^decnet_t_([0-9a-f]{8})_")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReapReport:
|
||||
"""Outcome of a reap pass — what was found and what was removed."""
|
||||
|
||||
live_prefixes: list[str] = field(default_factory=list)
|
||||
orphan_prefixes: list[str] = field(default_factory=list)
|
||||
containers_removed: list[str] = field(default_factory=list)
|
||||
networks_removed: list[str] = field(default_factory=list)
|
||||
errors: list[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"live_prefixes": self.live_prefixes,
|
||||
"orphan_prefixes": self.orphan_prefixes,
|
||||
"containers_removed": self.containers_removed,
|
||||
"networks_removed": self.networks_removed,
|
||||
"errors": self.errors,
|
||||
}
|
||||
|
||||
|
||||
def _prefix_of(name: str) -> Optional[str]:
|
||||
m = _RESOURCE_NAME_RE.match(name)
|
||||
return m.group(1) if m else None
|
||||
|
||||
|
||||
async def _live_prefixes(repo: Any) -> set[str]:
|
||||
"""Every topology-id prefix the DB still knows about.
|
||||
|
||||
Tearing down only marks ``torn_down``; the row stays around for
|
||||
audit. We consider *every* persisted topology to be live for the
|
||||
reaper so we never race a concurrent teardown / redeploy by nuking
|
||||
its networks mid-flight. Operators who want those resources gone
|
||||
should delete the topology row (which cascades) or run teardown.
|
||||
"""
|
||||
rows = await repo.list_topologies()
|
||||
return {r["id"][:8] for r in rows}
|
||||
|
||||
|
||||
def _orphan_prefixes(
|
||||
container_names: Iterable[str],
|
||||
network_names: Iterable[str],
|
||||
live: set[str],
|
||||
) -> tuple[set[str], list[str], list[str]]:
|
||||
"""Return (orphan_prefixes, decnet_containers, decnet_networks).
|
||||
|
||||
Pure function — no Docker / repo I/O. Kept separate so the test
|
||||
suite can drive it without mocking the docker SDK."""
|
||||
c_decnet = [n for n in container_names if _prefix_of(n) is not None]
|
||||
n_decnet = [n for n in network_names if _prefix_of(n) is not None]
|
||||
orphans = {
|
||||
_prefix_of(n) for n in (*c_decnet, *n_decnet)
|
||||
} - live
|
||||
orphans.discard(None)
|
||||
return orphans, c_decnet, n_decnet # type: ignore[return-value]
|
||||
|
||||
|
||||
async def reap_orphan_topology_resources(
|
||||
repo: Any,
|
||||
client: Optional[docker.DockerClient] = None,
|
||||
) -> ReapReport:
|
||||
"""Remove Docker containers + networks whose topology id is gone.
|
||||
|
||||
* Enumerates every container and network whose name matches the
|
||||
DECNET topology pattern.
|
||||
* Computes the set of prefixes still referenced in the DB.
|
||||
* Force-removes every container (so networks can drop their
|
||||
endpoints), then removes the networks in a second pass.
|
||||
* Errors on any single resource are captured into the report but
|
||||
never abort the sweep — one stuck container should not block the
|
||||
other nineteen from being cleaned up.
|
||||
"""
|
||||
if client is None:
|
||||
client = docker.from_env()
|
||||
|
||||
live = await _live_prefixes(repo)
|
||||
report = ReapReport(live_prefixes=sorted(live))
|
||||
|
||||
try:
|
||||
containers = client.containers.list(all=True)
|
||||
networks = client.networks.list()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
report.errors.append(f"docker list failed: {exc}")
|
||||
return report
|
||||
|
||||
container_names = [c.name for c in containers]
|
||||
network_names = [n.name for n in networks]
|
||||
orphans, decnet_containers, decnet_networks = _orphan_prefixes(
|
||||
container_names, network_names, live
|
||||
)
|
||||
report.orphan_prefixes = sorted(orphans)
|
||||
|
||||
if not orphans:
|
||||
log.info(
|
||||
"reaper: no orphans (decnet containers=%d, networks=%d, live=%d)",
|
||||
len(decnet_containers), len(decnet_networks), len(live),
|
||||
)
|
||||
return report
|
||||
|
||||
# Pass 1: containers. Force-remove so we don't hang on a stopped
|
||||
# container whose network is about to be killed.
|
||||
for c in containers:
|
||||
prefix = _prefix_of(c.name)
|
||||
if prefix is None or prefix not in orphans:
|
||||
continue
|
||||
try:
|
||||
c.remove(force=True)
|
||||
report.containers_removed.append(c.name)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
report.errors.append(f"container {c.name!r}: {exc}")
|
||||
log.warning("reaper: container %s remove failed: %s", c.name, exc)
|
||||
|
||||
# Pass 2: networks.
|
||||
for n in networks:
|
||||
prefix = _prefix_of(n.name)
|
||||
if prefix is None or prefix not in orphans:
|
||||
continue
|
||||
try:
|
||||
remove_bridge_network(client, n.name)
|
||||
report.networks_removed.append(n.name)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
report.errors.append(f"network {n.name!r}: {exc}")
|
||||
log.warning("reaper: network %s remove failed: %s", n.name, exc)
|
||||
|
||||
log.info(
|
||||
"reaper: removed %d container(s), %d network(s) across %d orphan prefix(es)",
|
||||
len(report.containers_removed),
|
||||
len(report.networks_removed),
|
||||
len(report.orphan_prefixes),
|
||||
)
|
||||
return report
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ReapReport",
|
||||
"reap_orphan_topology_resources",
|
||||
]
|
||||
@@ -21,6 +21,7 @@ from .api_get_topology import router as _get_router
|
||||
from .api_lan_crud import router as _lan_router
|
||||
from .api_list_topologies import router as _list_router
|
||||
from .api_mutations import router as _mutations_router
|
||||
from .api_reap_orphans import router as _reap_router
|
||||
from .api_teardown_topology import router as _teardown_router
|
||||
|
||||
topology_router = APIRouter(prefix="/topologies", tags=["topologies"])
|
||||
@@ -34,6 +35,7 @@ topology_router.include_router(_catalog_router)
|
||||
topology_router.include_router(_list_router)
|
||||
topology_router.include_router(_create_blank_router)
|
||||
topology_router.include_router(_create_router)
|
||||
topology_router.include_router(_reap_router)
|
||||
topology_router.include_router(_deploy_router)
|
||||
topology_router.include_router(_teardown_router)
|
||||
topology_router.include_router(_delete_router)
|
||||
|
||||
46
decnet/web/router/topology/api_reap_orphans.py
Normal file
46
decnet/web/router/topology/api_reap_orphans.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""POST /topologies/reap-orphans — remove Docker resources for topology
|
||||
ids the DB no longer knows about.
|
||||
|
||||
A topology row deleted outside the teardown flow (operator error,
|
||||
crashed master, direct DB edit) leaves its containers and bridge
|
||||
networks behind. The orphan networks keep their IPAM pools, so the
|
||||
next deploy at the same subnet hits a 403 ``Pool overlaps`` from the
|
||||
Docker daemon.
|
||||
|
||||
This endpoint walks the local Docker daemon, computes the set of
|
||||
topology prefixes still known to the repo, and force-removes every
|
||||
container + network whose prefix is orphaned. Resources belonging to
|
||||
live topologies are never touched.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
|
||||
from decnet.engine.reaper import reap_orphan_topology_resources
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.web.dependencies import repo, require_admin
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post(
|
||||
"/reap-orphans",
|
||||
tags=["MazeNET Topologies"],
|
||||
responses={
|
||||
401: {"description": "Missing or invalid credentials"},
|
||||
403: {"description": "Insufficient permissions"},
|
||||
},
|
||||
)
|
||||
@_traced("api.topology.reap_orphans")
|
||||
async def api_reap_orphans(
|
||||
_admin: dict = Depends(require_admin),
|
||||
) -> dict:
|
||||
"""Reap Docker resources whose topology id is absent from the DB.
|
||||
|
||||
Returns a report with the live prefixes, the orphan prefixes that
|
||||
were identified, every container + network actually removed, and
|
||||
any per-resource errors encountered. Errors are non-fatal — a
|
||||
single stuck resource does not abort the sweep.
|
||||
"""
|
||||
report = await reap_orphan_topology_resources(repo)
|
||||
return report.to_dict()
|
||||
228
tests/topology/test_reaper.py
Normal file
228
tests/topology/test_reaper.py
Normal file
@@ -0,0 +1,228 @@
|
||||
"""Tests for the orphan topology-resource reaper."""
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.engine.reaper import (
|
||||
ReapReport,
|
||||
_orphan_prefixes,
|
||||
_prefix_of,
|
||||
reap_orphan_topology_resources,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------- pure helpers
|
||||
|
||||
|
||||
def test_prefix_of_matches_decnet_convention():
|
||||
assert _prefix_of("decnet_t_abcd1234_dmz") == "abcd1234"
|
||||
assert _prefix_of("decnet_t_abcd1234_subnet-01") == "abcd1234"
|
||||
assert _prefix_of("decnet_t_abcd1234_decky-631b") == "abcd1234"
|
||||
|
||||
|
||||
def test_prefix_of_rejects_non_decnet_names():
|
||||
assert _prefix_of("bridge") is None
|
||||
assert _prefix_of("host") is None
|
||||
assert _prefix_of("development_default") is None
|
||||
# Prefix must be 8 hex chars exactly.
|
||||
assert _prefix_of("decnet_t_abcd_dmz") is None
|
||||
assert _prefix_of("decnet_t_abcd1234_") == "abcd1234" # trailing edge
|
||||
|
||||
|
||||
def test_orphan_prefixes_flags_only_unknowns():
|
||||
live = {"aaaa1111", "bbbb2222"}
|
||||
containers = [
|
||||
"decnet_t_aaaa1111_decky-01", # live
|
||||
"decnet_t_cccc3333_dmz-gateway", # orphan
|
||||
"bridge", # not DECNET
|
||||
]
|
||||
networks = [
|
||||
"decnet_t_bbbb2222_subnet-01", # live
|
||||
"decnet_t_cccc3333_dmz", # orphan
|
||||
"decnet_t_dddd4444_subnet-01", # orphan
|
||||
]
|
||||
orphans, decnet_cs, decnet_ns = _orphan_prefixes(containers, networks, live)
|
||||
assert orphans == {"cccc3333", "dddd4444"}
|
||||
assert "bridge" not in decnet_cs
|
||||
assert len(decnet_ns) == 3
|
||||
|
||||
|
||||
def test_orphan_prefixes_empty_when_all_live():
|
||||
live = {"aaaa1111"}
|
||||
containers = ["decnet_t_aaaa1111_decky"]
|
||||
networks = ["decnet_t_aaaa1111_dmz"]
|
||||
orphans, *_ = _orphan_prefixes(containers, networks, live)
|
||||
assert orphans == set()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------- integration
|
||||
|
||||
|
||||
class _FakeContainer:
|
||||
def __init__(self, name, remove_raises=None):
|
||||
self.name = name
|
||||
self._raises = remove_raises
|
||||
self.removed = False
|
||||
def remove(self, force=False): # noqa: ARG002
|
||||
if self._raises:
|
||||
raise self._raises
|
||||
self.removed = True
|
||||
|
||||
|
||||
class _FakeNetwork:
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
self.id = f"id-{name}"
|
||||
self.attrs = {"Containers": {}}
|
||||
self.removed = False
|
||||
def remove(self):
|
||||
self.removed = True
|
||||
def disconnect(self, cid, force=False): # pragma: no cover
|
||||
pass
|
||||
|
||||
|
||||
class _FakeClient:
|
||||
def __init__(self, containers, networks):
|
||||
self._cs = containers
|
||||
self._ns = networks
|
||||
self.containers = SimpleNamespace(list=lambda all=False: list(self._cs))
|
||||
self.networks = self
|
||||
|
||||
def list(self, names=None, filters=None): # noqa: ARG002
|
||||
if names is None:
|
||||
return list(self._ns)
|
||||
return [n for n in self._ns if n.name in set(names)]
|
||||
|
||||
|
||||
class _StubRepo:
|
||||
def __init__(self, topology_ids):
|
||||
self._ids = topology_ids
|
||||
async def list_topologies(self):
|
||||
return [{"id": tid} for tid in self._ids]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reap_removes_only_orphans():
|
||||
live_tid = "aaaa1111-1111-1111-1111-111111111111"
|
||||
repo = _StubRepo([live_tid])
|
||||
|
||||
containers = [
|
||||
_FakeContainer("decnet_t_aaaa1111_decky"), # live — keep
|
||||
_FakeContainer("decnet_t_dead0000_dmz-gateway"), # orphan
|
||||
_FakeContainer("decnet_t_dead0000_decky-1"), # orphan
|
||||
_FakeContainer("bridge"), # non-DECNET
|
||||
]
|
||||
networks = [
|
||||
_FakeNetwork("decnet_t_aaaa1111_dmz"), # live — keep
|
||||
_FakeNetwork("decnet_t_dead0000_dmz"), # orphan
|
||||
_FakeNetwork("decnet_t_dead0000_subnet-01"), # orphan
|
||||
_FakeNetwork("host"), # non-DECNET
|
||||
]
|
||||
client = _FakeClient(containers, networks)
|
||||
|
||||
report = await reap_orphan_topology_resources(repo, client=client)
|
||||
|
||||
assert report.live_prefixes == ["aaaa1111"]
|
||||
assert report.orphan_prefixes == ["dead0000"]
|
||||
assert set(report.containers_removed) == {
|
||||
"decnet_t_dead0000_dmz-gateway",
|
||||
"decnet_t_dead0000_decky-1",
|
||||
}
|
||||
assert set(report.networks_removed) == {
|
||||
"decnet_t_dead0000_dmz",
|
||||
"decnet_t_dead0000_subnet-01",
|
||||
}
|
||||
assert report.errors == []
|
||||
# Live resources must survive.
|
||||
assert all(c.removed is False for c in containers if "aaaa1111" in c.name)
|
||||
assert all(n.removed is False for n in networks if "aaaa1111" in n.name)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reap_is_noop_when_no_orphans():
|
||||
repo = _StubRepo(["aaaa1111-xxx"])
|
||||
containers = [_FakeContainer("decnet_t_aaaa1111_d")]
|
||||
networks = [_FakeNetwork("decnet_t_aaaa1111_net")]
|
||||
client = _FakeClient(containers, networks)
|
||||
|
||||
report = await reap_orphan_topology_resources(repo, client=client)
|
||||
|
||||
assert report.orphan_prefixes == []
|
||||
assert report.containers_removed == []
|
||||
assert report.networks_removed == []
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reap_captures_per_resource_errors_without_aborting():
|
||||
repo = _StubRepo([])
|
||||
containers = [
|
||||
_FakeContainer("decnet_t_dead0000_c1", remove_raises=RuntimeError("stuck")),
|
||||
_FakeContainer("decnet_t_dead0000_c2"),
|
||||
]
|
||||
networks = [_FakeNetwork("decnet_t_dead0000_net")]
|
||||
client = _FakeClient(containers, networks)
|
||||
|
||||
report = await reap_orphan_topology_resources(repo, client=client)
|
||||
|
||||
# The failing container is reported; the next one still gets removed.
|
||||
assert any("c1" in e for e in report.errors)
|
||||
assert "decnet_t_dead0000_c2" in report.containers_removed
|
||||
assert "decnet_t_dead0000_net" in report.networks_removed
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_reap_handles_docker_list_failure():
|
||||
repo = _StubRepo(["aaaa1111"])
|
||||
client = MagicMock()
|
||||
client.containers.list.side_effect = RuntimeError("docker down")
|
||||
client.networks.list.return_value = []
|
||||
report = await reap_orphan_topology_resources(repo, client=client)
|
||||
assert any("docker list failed" in e for e in report.errors)
|
||||
assert report.containers_removed == []
|
||||
assert report.networks_removed == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- report
|
||||
|
||||
|
||||
def test_reap_report_to_dict_is_serialisable():
|
||||
r = ReapReport(
|
||||
live_prefixes=["aa"], orphan_prefixes=["bb"],
|
||||
containers_removed=["c"], networks_removed=["n"], errors=[],
|
||||
)
|
||||
d = r.to_dict()
|
||||
assert d == {
|
||||
"live_prefixes": ["aa"],
|
||||
"orphan_prefixes": ["bb"],
|
||||
"containers_removed": ["c"],
|
||||
"networks_removed": ["n"],
|
||||
"errors": [],
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- API
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_api_reap_orphans_requires_admin(monkeypatch):
|
||||
"""POST /topologies/reap-orphans returns the report dict."""
|
||||
from decnet.web.router.topology.api_reap_orphans import api_reap_orphans
|
||||
|
||||
with patch(
|
||||
"decnet.web.router.topology.api_reap_orphans.reap_orphan_topology_resources"
|
||||
) as mock_reap:
|
||||
mock_reap.return_value = ReapReport(
|
||||
live_prefixes=["aaaa1111"],
|
||||
orphan_prefixes=["dead0000"],
|
||||
containers_removed=["decnet_t_dead0000_c"],
|
||||
networks_removed=["decnet_t_dead0000_n"],
|
||||
)
|
||||
result = await api_reap_orphans(_admin={"role": "admin"})
|
||||
|
||||
assert result["orphan_prefixes"] == ["dead0000"]
|
||||
assert result["containers_removed"] == ["decnet_t_dead0000_c"]
|
||||
assert result["networks_removed"] == ["decnet_t_dead0000_n"]
|
||||
assert result["errors"] == []
|
||||
Reference in New Issue
Block a user