refactor(enroll-bundle): extract bundle_builder and move DTOs to swarm models

Pure tarball construction (_build_tarball, _render_*, _iter_included,
_SYSTEMD_UNITS) moved to decnet/swarm/bundle_builder.py — no FastAPI
dependency, independently testable. EnrollBundleRequest/Response moved
to decnet/web/db/models/swarm.py alongside the other swarm DTOs.
Router drops from 504 to 260 lines; keeps only the in-memory token
registry, sweeper, and endpoints.
This commit is contained in:
2026-04-30 20:39:42 -04:00
parent e124f9e296
commit a5487eb55f
3 changed files with 252 additions and 256 deletions

View File

@@ -0,0 +1,209 @@
"""Tarball + bootstrap construction for agent-enrollment bundles.
Pure I/O, no FastAPI dependency — independently testable.
"""
from __future__ import annotations
import io
import os
import pathlib
import tarfile
from datetime import datetime, timezone
from typing import Optional
from decnet.swarm import pki
# ---------------------------------------------------------------------------
# Include / exclude manifest
# ---------------------------------------------------------------------------
# Explicit include list — fails closed. Stray files on the master
# (dev venvs, .env files, editor scratch) cannot leak into the bundle.
_INCLUDED_ROOT_FILES: tuple[str, ...] = ("pyproject.toml",)
_INCLUDED_DIRS: tuple[str, ...] = ("decnet",)
# Subtrees of _INCLUDED_DIRS that must NOT ship (relative to repo root).
# * decnet/web — FastAPI master app, unused on agents.
# * decnet/mutator — swarm-wide respawn scheduler, master-only.
# * decnet/profiler — rebuilds profiles against master DB, master-only.
_EXCLUDED_DECNET_SUBTREES: frozenset[str] = frozenset({
"decnet/web",
"decnet/mutator",
"decnet/profiler",
})
# Agent-side systemd units. Profiler stays master-side intentionally.
_SYSTEMD_UNITS = (
"decnet-agent", "decnet-forwarder", "decnet-engine", "decnet-updater",
"decnet-collector", "decnet-prober", "decnet-sniffer",
)
# ---------------------------------------------------------------------------
# Path helpers
# ---------------------------------------------------------------------------
def _repo_root() -> pathlib.Path:
# decnet/swarm/bundle_builder.py -> parents[2] = repo root.
return pathlib.Path(__file__).resolve().parents[2]
def _templates_dir() -> pathlib.Path:
return pathlib.Path(__file__).resolve().parents[1] / "web" / "templates"
# ---------------------------------------------------------------------------
# Filesystem walk
# ---------------------------------------------------------------------------
def _iter_included(root: pathlib.Path) -> list[tuple[pathlib.Path, str]]:
"""Return ``(full_path, arcname)`` pairs for every file the agent needs.
Walk is pruned in-place: ``__pycache__`` and master-only subtrees are
skipped at directory level so we never descend into them.
"""
found: list[tuple[pathlib.Path, str]] = []
for rel in _INCLUDED_ROOT_FILES:
p = root / rel
if p.is_file():
found.append((p, rel))
for top in _INCLUDED_DIRS:
start = root / top
if not start.is_dir():
continue
for dirpath, dirnames, filenames in os.walk(start, topdown=True, followlinks=False):
dir_path = pathlib.Path(dirpath)
rel_dir = dir_path.relative_to(root).as_posix()
dirnames[:] = [
d for d in dirnames
if d != "__pycache__"
and f"{rel_dir}/{d}" not in _EXCLUDED_DECNET_SUBTREES
]
for fn in filenames:
if fn.endswith((".pyc", ".pyo")):
continue
full = dir_path / fn
if full.is_symlink():
continue
found.append((full, f"{rel_dir}/{fn}"))
found.sort(key=lambda t: t[1])
return found
# ---------------------------------------------------------------------------
# Content renderers
# ---------------------------------------------------------------------------
def _render_decnet_ini(
master_host: str,
host_uuid: str,
use_ipvlan: bool = False,
swarmctl_port: int = 8770,
) -> bytes:
ipvlan_line = f"ipvlan = {'true' if use_ipvlan else 'false'}\n"
return (
"; Generated by DECNET agent-enrollment bundle.\n"
"[decnet]\n"
"mode = agent\n"
"disallow-master = true\n"
"log-directory = /var/log/decnet\n"
f"{ipvlan_line}"
"\n"
"[agent]\n"
f"master-host = {master_host}\n"
f"swarmctl-port = {swarmctl_port}\n"
"swarm-syslog-port = 6514\n"
"agent-port = 8765\n"
"agent-dir = /etc/decnet/agent\n"
"updater-dir = /etc/decnet/updater\n"
f"host-uuid = {host_uuid}\n"
).encode()
def _add_bytes(tar: tarfile.TarFile, name: str, data: bytes, mode: int = 0o644) -> None:
info = tarfile.TarInfo(name)
info.size = len(data)
info.mode = mode
info.mtime = int(datetime.now(timezone.utc).timestamp())
tar.addfile(info, io.BytesIO(data))
def _render_systemd_unit(name: str, agent_name: str, master_host: str) -> bytes:
tpl_path = _templates_dir() / f"{name}.service.j2"
tpl = tpl_path.read_text()
return (
tpl.replace("{{ agent_name }}", agent_name)
.replace("{{ master_host }}", master_host)
).encode()
def render_bootstrap(
agent_name: str,
master_host: str,
tarball_url: str,
expires_at: datetime,
with_updater: bool,
) -> bytes:
tpl_path = _templates_dir() / "enroll_bootstrap.sh.j2"
tpl = tpl_path.read_text()
now = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
rendered = (
tpl.replace("{{ agent_name }}", agent_name)
.replace("{{ master_host }}", master_host)
.replace("{{ tarball_url }}", tarball_url)
.replace("{{ generated_at }}", now)
.replace("{{ expires_at }}", expires_at.replace(microsecond=0).isoformat())
.replace("{{ with_updater }}", "true" if with_updater else "false")
)
return rendered.encode()
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_tarball(
master_host: str,
agent_name: str,
host_uuid: str,
issued: pki.IssuedCert,
services_ini: Optional[str],
updater_issued: Optional[pki.IssuedCert] = None,
use_ipvlan: bool = False,
) -> bytes:
"""Return a gzipped tarball ready to be handed to the enrolling agent."""
root = _repo_root()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
for path, arcname in _iter_included(root):
tar.add(path, arcname=arcname, recursive=False)
_add_bytes(
tar,
"etc/decnet/decnet.ini",
_render_decnet_ini(master_host, host_uuid, use_ipvlan),
)
for unit in _SYSTEMD_UNITS:
_add_bytes(
tar,
f"etc/systemd/system/{unit}.service",
_render_systemd_unit(unit, agent_name, master_host),
)
_add_bytes(tar, "home/.decnet/agent/ca.crt", issued.ca_cert_pem)
_add_bytes(tar, "home/.decnet/agent/worker.crt", issued.cert_pem)
_add_bytes(tar, "home/.decnet/agent/worker.key", issued.key_pem, mode=0o600)
if updater_issued is not None:
_add_bytes(tar, "home/.decnet/updater/ca.crt", updater_issued.ca_cert_pem)
_add_bytes(tar, "home/.decnet/updater/updater.crt", updater_issued.cert_pem)
_add_bytes(tar, "home/.decnet/updater/updater.key", updater_issued.key_pem, mode=0o600)
if services_ini:
_add_bytes(tar, "services.ini", services_ini.encode())
return buf.getvalue()

View File

@@ -198,3 +198,34 @@ class SwarmHostHealth(BaseModel):
class SwarmCheckResponse(BaseModel):
results: list[SwarmHostHealth]
class EnrollBundleRequest(BaseModel):
master_host: str = PydanticField(..., min_length=1, max_length=253,
description="IP/host the agent will reach back to")
agent_name: str = PydanticField(..., pattern=r"^[a-z0-9][a-z0-9-]{0,62}$",
description="Worker name (DNS-label safe)")
with_updater: bool = PydanticField(
default=True,
description="Include updater cert bundle and auto-start decnet updater on the agent",
)
use_ipvlan: bool = PydanticField(
default=False,
description=(
"Run deckies on this agent over IPvlan L2 instead of MACVLAN. "
"Required when the agent is a VirtualBox/VMware guest bridged over Wi-Fi — "
"Wi-Fi APs bind one MAC per station, so MACVLAN's extra container MACs "
"rotate the VM's DHCP lease. Safe no-op on wired/bare-metal hosts."
),
)
services_ini: Optional[str] = PydanticField(
default=None,
description="Optional INI text shipped to the agent as /etc/decnet/services.ini",
)
class EnrollBundleResponse(BaseModel):
token: str
command: str
expires_at: datetime
host_uuid: str

View File

@@ -18,20 +18,19 @@ the embedded payload. Two URLs, one paste.
from __future__ import annotations
import asyncio
import io
import os
import pathlib
import secrets
import tarfile
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from pydantic import BaseModel, Field
from decnet.logging import get_logger
from decnet.swarm import pki
from decnet.swarm.bundle_builder import build_tarball, render_bootstrap
from decnet.web.db.models.swarm import EnrollBundleRequest, EnrollBundleResponse
from decnet.web.db.repository import BaseRepository
from decnet.web.dependencies import get_repo, require_admin
@@ -43,72 +42,6 @@ BUNDLE_TTL = timedelta(minutes=5)
BUNDLE_DIR = pathlib.Path(os.environ.get("DECNET_ENROLL_BUNDLE_DIR", "/tmp/decnet-enroll")) # nosec B108 - short-lived 0600 bundle cache, env-overridable
SWEEP_INTERVAL_SECS = 30
# Include list — explicit set of paths that ship to the agent. An
# include list fails closed: anything new on the master (stray .env, dev
# venvs, data dumps, editor scratch dirs) cannot leak into the bundle
# just because we forgot to exclude it.
#
# What the agent actually needs:
# * pyproject.toml at the repo root, so ``pip install`` works against
# the bundle during enroll_bootstrap.sh.
# * the ``decnet/`` package, MINUS the master-only subtrees called out
# by _EXCLUDED_DECNET_SUBTREES — those never import on an agent host.
# Everything else the bootstrap needs (the INI, certs, systemd units) is
# synthesized in-memory by ``_build_tarball`` below — it never hits the
# filesystem walk.
# Top-level files shipped verbatim. Relative to the repo root.
_INCLUDED_ROOT_FILES: tuple[str, ...] = ("pyproject.toml",)
# Top-level directories walked into the bundle. Relative to the repo root.
_INCLUDED_DIRS: tuple[str, ...] = ("decnet",)
# Subtrees of an included directory that must NOT ship. Paths are
# relative to the repo root, forward-slash separated.
# * ``decnet/web`` — FastAPI master app, unused by agents.
# * ``decnet/mutator`` — schedules respawns swarm-wide; master-only.
# * ``decnet/profiler`` — rebuilds profiles against the master DB.
_EXCLUDED_DECNET_SUBTREES: frozenset[str] = frozenset({
"decnet/web",
"decnet/mutator",
"decnet/profiler",
})
# ---------------------------------------------------------------------------
# DTOs
# ---------------------------------------------------------------------------
class EnrollBundleRequest(BaseModel):
master_host: str = Field(..., min_length=1, max_length=253,
description="IP/host the agent will reach back to")
agent_name: str = Field(..., pattern=r"^[a-z0-9][a-z0-9-]{0,62}$",
description="Worker name (DNS-label safe)")
with_updater: bool = Field(
default=True,
description="Include updater cert bundle and auto-start decnet updater on the agent",
)
use_ipvlan: bool = Field(
default=False,
description=(
"Run deckies on this agent over IPvlan L2 instead of MACVLAN. "
"Required when the agent is a VirtualBox/VMware guest bridged over Wi-Fi — "
"Wi-Fi APs bind one MAC per station, so MACVLAN's extra container MACs "
"rotate the VM's DHCP lease. Safe no-op on wired/bare-metal hosts."
),
)
services_ini: Optional[str] = Field(
default=None,
description="Optional INI text shipped to the agent as /etc/decnet/services.ini",
)
class EnrollBundleResponse(BaseModel):
token: str
command: str
expires_at: datetime
host_uuid: str
# ---------------------------------------------------------------------------
# In-memory registry
@@ -156,181 +89,16 @@ def _ensure_sweeper() -> None:
_SWEEPER_TASK = asyncio.create_task(_sweep_loop())
# ---------------------------------------------------------------------------
# Tarball construction
# ---------------------------------------------------------------------------
def _repo_root() -> pathlib.Path:
# decnet/web/router/swarm_mgmt/api_enroll_bundle.py -> 4 parents = repo root.
return pathlib.Path(__file__).resolve().parents[4]
def _now() -> datetime:
# Indirection so tests can monkeypatch.
return datetime.now(timezone.utc)
def _iter_included(root: pathlib.Path) -> "list[tuple[pathlib.Path, str]]":
"""Return ``(full_path, arcname)`` pairs for every file the agent needs.
Walk is pruned in-place: ``__pycache__`` and the master-only subtrees
in :data:`_EXCLUDED_DECNET_SUBTREES` are skipped at the directory
level so we never descend into them (critical on dev boxes where
``decnet/web/`` pulls in a fat frontend tree via package-data).
"""
found: list[tuple[pathlib.Path, str]] = []
# Top-level files.
for rel in _INCLUDED_ROOT_FILES:
p = root / rel
if p.is_file():
found.append((p, rel))
# Top-level dirs, pruned.
for top in _INCLUDED_DIRS:
start = root / top
if not start.is_dir():
continue
for dirpath, dirnames, filenames in os.walk(start, topdown=True, followlinks=False):
dir_path = pathlib.Path(dirpath)
rel_dir = dir_path.relative_to(root).as_posix()
# Prune excluded subtrees + cache dirs BEFORE descending.
dirnames[:] = [
d for d in dirnames
if d != "__pycache__"
and f"{rel_dir}/{d}" not in _EXCLUDED_DECNET_SUBTREES
]
for fn in filenames:
if fn.endswith((".pyc", ".pyo")):
continue
full = dir_path / fn
if full.is_symlink():
continue
found.append((full, f"{rel_dir}/{fn}"))
# Deterministic tarball ordering.
found.sort(key=lambda t: t[1])
return found
def _render_decnet_ini(
master_host: str,
host_uuid: str,
use_ipvlan: bool = False,
swarmctl_port: int = 8770,
) -> bytes:
ipvlan_line = f"ipvlan = {'true' if use_ipvlan else 'false'}\n"
return (
"; Generated by DECNET agent-enrollment bundle.\n"
"[decnet]\n"
"mode = agent\n"
"disallow-master = true\n"
"log-directory = /var/log/decnet\n"
f"{ipvlan_line}"
"\n"
"[agent]\n"
f"master-host = {master_host}\n"
f"swarmctl-port = {swarmctl_port}\n"
"swarm-syslog-port = 6514\n"
"agent-port = 8765\n"
"agent-dir = /etc/decnet/agent\n"
"updater-dir = /etc/decnet/updater\n"
f"host-uuid = {host_uuid}\n"
).encode()
def _add_bytes(tar: tarfile.TarFile, name: str, data: bytes, mode: int = 0o644) -> None:
info = tarfile.TarInfo(name)
info.size = len(data)
info.mode = mode
info.mtime = int(datetime.now(timezone.utc).timestamp())
tar.addfile(info, io.BytesIO(data))
def _build_tarball(
master_host: str,
agent_name: str,
host_uuid: str,
issued: pki.IssuedCert,
services_ini: Optional[str],
updater_issued: Optional[pki.IssuedCert] = None,
use_ipvlan: bool = False,
) -> bytes:
"""Gzipped tarball with:
- agent-required source (see :data:`_INCLUDED_DIRS` /
:data:`_INCLUDED_ROOT_FILES`; master-only decnet/ subtrees
pruned)
- etc/decnet/decnet.ini (pre-baked for mode=agent)
- home/.decnet/agent/{ca.crt,worker.crt,worker.key}
- home/.decnet/updater/{ca.crt,updater.crt,updater.key} (if updater_issued)
- services.ini at root if provided
"""
root = _repo_root()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
for path, arcname in _iter_included(root):
tar.add(path, arcname=arcname, recursive=False)
_add_bytes(
tar,
"etc/decnet/decnet.ini",
_render_decnet_ini(master_host, host_uuid, use_ipvlan),
)
for unit in _SYSTEMD_UNITS:
_add_bytes(
tar,
f"etc/systemd/system/{unit}.service",
_render_systemd_unit(unit, agent_name, master_host),
)
_add_bytes(tar, "home/.decnet/agent/ca.crt", issued.ca_cert_pem)
_add_bytes(tar, "home/.decnet/agent/worker.crt", issued.cert_pem)
_add_bytes(tar, "home/.decnet/agent/worker.key", issued.key_pem, mode=0o600)
if updater_issued is not None:
_add_bytes(tar, "home/.decnet/updater/ca.crt", updater_issued.ca_cert_pem)
_add_bytes(tar, "home/.decnet/updater/updater.crt", updater_issued.cert_pem)
_add_bytes(tar, "home/.decnet/updater/updater.key", updater_issued.key_pem, mode=0o600)
if services_ini:
_add_bytes(tar, "services.ini", services_ini.encode())
return buf.getvalue()
_SYSTEMD_UNITS = (
"decnet-agent", "decnet-forwarder", "decnet-engine", "decnet-updater",
# Per-host microservices — activated by enroll_bootstrap.sh. The
# profiler intentionally stays master-side: it rebuilds attacker
# profiles against the master DB, which workers don't share.
"decnet-collector", "decnet-prober", "decnet-sniffer",
)
def _render_systemd_unit(name: str, agent_name: str, master_host: str) -> bytes:
tpl_path = pathlib.Path(__file__).resolve().parents[1].parent / "templates" / f"{name}.service.j2"
tpl = tpl_path.read_text()
return (
tpl.replace("{{ agent_name }}", agent_name)
.replace("{{ master_host }}", master_host)
).encode()
def _render_bootstrap(
agent_name: str,
master_host: str,
tarball_url: str,
expires_at: datetime,
with_updater: bool,
) -> bytes:
tpl_path = pathlib.Path(__file__).resolve().parents[1].parent / "templates" / "enroll_bootstrap.sh.j2"
tpl = tpl_path.read_text()
now = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
rendered = (
tpl.replace("{{ agent_name }}", agent_name)
.replace("{{ master_host }}", master_host)
.replace("{{ tarball_url }}", tarball_url)
.replace("{{ generated_at }}", now)
.replace("{{ expires_at }}", expires_at.replace(microsecond=0).isoformat())
.replace("{{ with_updater }}", "true" if with_updater else "false")
)
return rendered.encode()
async def _lookup_live(token: str) -> _Bundle:
b = _BUNDLES.get(token)
if b is None or b.served or b.expires_at <= _now():
raise HTTPException(status_code=404, detail="bundle not found or expired")
return b
# ---------------------------------------------------------------------------
@@ -403,7 +171,7 @@ async def create_enroll_bundle(
)
# 3. Render payload + bootstrap.
tarball = _build_tarball(
tarball = build_tarball(
req.master_host, req.agent_name, host_uuid, issued, req.services_ini, updater_issued,
use_ipvlan=req.use_ipvlan,
)
@@ -423,7 +191,7 @@ async def create_enroll_bundle(
base = f"{scheme}://{netloc}"
tarball_url = f"{base}/api/v1/swarm/enroll-bundle/{token}.tgz"
bootstrap_url = f"{base}/api/v1/swarm/enroll-bundle/{token}.sh"
script = _render_bootstrap(req.agent_name, req.master_host, tarball_url, expires_at, req.with_updater)
script = render_bootstrap(req.agent_name, req.master_host, tarball_url, expires_at, req.with_updater)
tgz_path.write_bytes(tarball)
sh_path.write_bytes(script)
@@ -446,18 +214,6 @@ async def create_enroll_bundle(
)
def _now() -> datetime:
# Indirection so tests can monkeypatch.
return datetime.now(timezone.utc)
async def _lookup_live(token: str) -> _Bundle:
b = _BUNDLES.get(token)
if b is None or b.served or b.expires_at <= _now():
raise HTTPException(status_code=404, detail="bundle not found or expired")
return b
@router.get(
"/enroll-bundle/{token}.sh",
tags=["Swarm Management"],