Files
DECNET/decnet/engine/deployer.py
anti a58d42e492 feat(templates): wire SSH+Telnet to sessrec transcript recorder
Build login-session into both images as the swapped root shell, add a
quarantine bind mount for telnet (symmetric to SSH), seed transcripts/
dir and service discriminant at entrypoint. Deployer syncs sessrec.c +
Makefile into each build context alongside the existing syslog_bridge
helper. sessrec falls back to /etc/sessrec.service when env is stripped
(busybox /bin/login).
2026-04-21 23:03:42 -04:00

678 lines
25 KiB
Python

"""
Deploy, teardown, and status via Docker SDK + subprocess docker compose.
"""
import asyncio
import shutil
import subprocess # nosec B404
import time
from pathlib import Path
import docker
from rich.console import Console
from rich.table import Table
from decnet.topology.hashing import canonical_hash
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.config import DecnetConfig, clear_state, load_state, save_state
from decnet.composer import write_compose
from decnet.network import (
MACVLAN_NETWORK_NAME,
create_bridge_network,
create_ipvlan_network,
create_macvlan_network,
get_host_ip,
ips_to_range,
remove_bridge_network,
remove_macvlan_network,
setup_host_ipvlan,
setup_host_macvlan,
teardown_host_ipvlan,
teardown_host_macvlan,
)
from decnet.topology.compose import (
_network_name as _topology_network_name,
write_topology_compose,
)
from decnet.topology.persistence import hydrate, transition_status
from decnet.topology.status import TopologyStatus
from decnet.topology.validate import (
ValidationError,
check_no_host_port_collision,
errors as _validation_errors,
validate as _validate_topology,
)
log = get_logger("engine")
console = Console()
COMPOSE_FILE = Path("decnet-compose.yml")
_CANONICAL_LOGGING = Path(__file__).parent.parent / "templates" / "syslog_bridge.py"
_CANONICAL_SESSREC_DIR = Path(__file__).parent.parent / "templates" / "_shared" / "sessrec"
_SESSREC_SERVICES = {"ssh", "telnet"}
def _sync_logging_helper(config: DecnetConfig) -> None:
"""Copy the canonical syslog_bridge.py into every active template build context."""
from decnet.services.registry import get_service
seen: set[Path] = set()
for decky in config.deckies:
for svc_name in decky.services:
svc = get_service(svc_name)
if svc is None:
continue
ctx = svc.dockerfile_context()
if ctx is None or ctx in seen:
continue
seen.add(ctx)
dest = ctx / "syslog_bridge.py"
if not dest.exists() or dest.read_bytes() != _CANONICAL_LOGGING.read_bytes():
shutil.copy2(_CANONICAL_LOGGING, dest)
def _sync_sessrec_sources(config: DecnetConfig) -> None:
"""Copy sessrec.c + Makefile into SSH/Telnet build contexts as sessrec/."""
from decnet.services.registry import get_service
sources = [
_CANONICAL_SESSREC_DIR / "sessrec.c",
_CANONICAL_SESSREC_DIR / "Makefile",
]
seen: set[Path] = set()
for decky in config.deckies:
for svc_name in decky.services:
if svc_name not in _SESSREC_SERVICES:
continue
svc = get_service(svc_name)
if svc is None:
continue
ctx = svc.dockerfile_context()
if ctx is None or ctx in seen:
continue
seen.add(ctx)
dest_dir = ctx / "sessrec"
dest_dir.mkdir(exist_ok=True)
for src in sources:
dest = dest_dir / src.name
if not dest.exists() or dest.read_bytes() != src.read_bytes():
shutil.copy2(src, dest)
def _compose(*args: str, compose_file: Path = COMPOSE_FILE, env: dict | None = None) -> None:
import os
# -p decnet pins the compose project name. Without it, docker compose
# derives the project from basename($PWD); when a daemon (systemd) runs
# with WorkingDirectory=/ that basename is empty and compose aborts with
# "project name must not be empty".
cmd = ["docker", "compose", "-p", "decnet", "-f", str(compose_file), *args]
merged = {**os.environ, **(env or {})}
result = subprocess.run(cmd, capture_output=True, text=True, env=merged) # nosec B603
if result.stdout:
print(result.stdout, end="")
if result.returncode != 0:
# Docker emits the useful detail ("Address already in use", which IP,
# which port) on stderr. Surface it to the structured log so the
# agent's journal carries it — without this the upstream traceback
# just shows the exit code.
if result.stderr:
log.error("docker compose %s failed: %s", " ".join(args), result.stderr.strip())
raise subprocess.CalledProcessError(
result.returncode, cmd, result.stdout, result.stderr
)
_PERMANENT_ERRORS = (
"manifest unknown",
"manifest for",
"not found",
"pull access denied",
"repository does not exist",
)
@_traced("engine.compose_with_retry")
def _compose_with_retry(
*args: str,
compose_file: Path = COMPOSE_FILE,
retries: int = 3,
delay: float = 5.0,
env: dict | None = None,
) -> None:
"""Run a docker compose command, retrying on transient failures."""
import os
last_exc: subprocess.CalledProcessError | None = None
# -p decnet pins the compose project name. Without it, docker compose
# derives the project from basename($PWD); when a daemon (systemd) runs
# with WorkingDirectory=/ that basename is empty and compose aborts with
# "project name must not be empty".
cmd = ["docker", "compose", "-p", "decnet", "-f", str(compose_file), *args]
merged = {**os.environ, **(env or {})}
for attempt in range(1, retries + 1):
result = subprocess.run(cmd, capture_output=True, text=True, env=merged) # nosec B603
if result.returncode == 0:
if result.stdout:
print(result.stdout, end="")
return
last_exc = subprocess.CalledProcessError(
result.returncode, cmd, result.stdout, result.stderr
)
stderr_lower = (result.stderr or "").lower()
if any(pat in stderr_lower for pat in _PERMANENT_ERRORS):
console.print(f"[red]Permanent Docker error — not retrying:[/]\n{result.stderr.strip()}")
raise last_exc
if attempt < retries:
console.print(
f"[yellow]docker compose {' '.join(args)} failed "
f"(attempt {attempt}/{retries}), retrying in {delay:.0f}s…[/]"
)
if result.stderr:
console.print(f"[dim]{result.stderr.strip()}[/]")
time.sleep(delay)
delay *= 2
else:
if result.stderr:
console.print(f"[red]{result.stderr.strip()}[/]")
log.error("docker compose %s failed after %d attempts: %s",
" ".join(args), retries, result.stderr.strip())
raise last_exc
def _emit_lifecycle_event(
*,
decky_name: str,
old_services: list[str],
new_services: list[str],
trigger: str,
) -> None:
"""Fire a ``decky_mutated`` event from a sync code path.
Deploy/teardown are sync functions; ``emit_decky_mutated`` is async
because its bus half awaits. Bus is ``None`` here (CLI has no live
client), so only the syslog side actually does work — but running
the coroutine keeps the emission site a single call regardless.
Soft-fails: a missing log path or broken bus must not abort the
deploy. The import is lazy to dodge the circular dependency between
``decnet.mutator`` (which imports engine helpers) and this module.
"""
try:
from decnet.mutator.events import emit_decky_mutated
asyncio.run(
emit_decky_mutated(
bus=None,
decky=decky_name,
old_services=old_services,
new_services=new_services,
trigger=trigger, # type: ignore[arg-type]
)
)
except Exception as exc: # noqa: BLE001
log.warning("lifecycle event emission failed decky=%s trigger=%s: %s",
decky_name, trigger, exc)
@_traced("engine.deploy")
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
log.debug("deploy: deckies=%s", [d.name for d in config.deckies])
client = docker.from_env()
ip_list = [d.ip for d in config.deckies]
decky_range = ips_to_range(ip_list)
host_ip = get_host_ip(config.interface)
log.debug("deploy: ip_range=%s host_ip=%s", decky_range, host_ip)
net_driver = "IPvlan L2" if config.ipvlan else "MACVLAN"
console.print(f"[bold cyan]Creating {net_driver} network[/] ({MACVLAN_NETWORK_NAME}) on {config.interface}")
if not dry_run:
if config.ipvlan:
create_ipvlan_network(
client,
interface=config.interface,
subnet=config.subnet,
gateway=config.gateway,
ip_range=decky_range,
)
setup_host_ipvlan(config.interface, host_ip, decky_range)
else:
create_macvlan_network(
client,
interface=config.interface,
subnet=config.subnet,
gateway=config.gateway,
ip_range=decky_range,
)
setup_host_macvlan(config.interface, host_ip, decky_range)
_sync_logging_helper(config)
_sync_sessrec_sources(config)
compose_path = write_compose(config, COMPOSE_FILE)
console.print(f"[bold cyan]Compose file written[/] → {compose_path}")
if dry_run:
log.info("deployment dry-run complete compose_path=%s", compose_path)
console.print("[yellow]Dry run — no containers started.[/]")
return
save_state(config, compose_path)
# Emit one creation event per decky so the correlation graph has a
# well-formed lifecycle start (old_services=[] ⇒ new_services=<initial>).
# Bus is None here — the syslog line is what the correlator consumes.
for decky in config.deckies:
_emit_lifecycle_event(
decky_name=decky.name,
old_services=[],
new_services=list(decky.services),
trigger="creation",
)
# Pre-up cleanup: a prior half-failed `up` can leave containers still
# holding the IPs/ports this run wants, which surfaces as the recurring
# "Address already in use" from Docker's IPAM. Best-effort — ignore
# failure (e.g. nothing to tear down on a clean host).
try:
_compose("down", "--remove-orphans", compose_file=compose_path)
except subprocess.CalledProcessError:
log.debug("pre-up cleanup: compose down failed (likely nothing to remove)")
build_env = {"DOCKER_BUILDKIT": "1"} if parallel else {}
console.print("[bold cyan]Building images and starting deckies...[/]")
build_args = ["build"]
if no_cache:
build_args.append("--no-cache")
if parallel:
console.print("[bold cyan]Parallel build enabled — building all images concurrently...[/]")
_compose_with_retry(*build_args, compose_file=compose_path, env=build_env)
_compose_with_retry("up", "-d", compose_file=compose_path, env=build_env)
else:
if no_cache:
_compose_with_retry("build", "--no-cache", compose_file=compose_path)
_compose_with_retry("up", "--build", "-d", compose_file=compose_path)
log.info("deployment complete n_deckies=%d", len(config.deckies))
_print_status(config)
@_traced("engine.teardown")
def teardown(decky_id: str | None = None) -> None:
log.info("teardown requested decky_id=%s", decky_id or "all")
state = load_state()
if state is None:
log.warning("teardown: no active deployment found")
console.print("[red]No active deployment found (no decnet-state.json).[/]")
return
config, compose_path = state
client = docker.from_env()
if decky_id:
decky = next((d for d in config.deckies if d.name == decky_id), None)
if decky is None:
console.print(f"[red]Decky '{decky_id}' not found in current deployment.[/]")
return
svc_names = [f"{decky_id}-{svc}" for svc in decky.services]
if not svc_names:
log.warning("teardown: decky %s has no services to stop", decky_id)
return
_emit_lifecycle_event(
decky_name=decky.name,
old_services=list(decky.services),
new_services=[],
trigger="retirement",
)
_compose("stop", *svc_names, compose_file=compose_path)
_compose("rm", "-f", *svc_names, compose_file=compose_path)
else:
for decky in config.deckies:
_emit_lifecycle_event(
decky_name=decky.name,
old_services=list(decky.services),
new_services=[],
trigger="retirement",
)
_compose("down", compose_file=compose_path)
ip_list = [d.ip for d in config.deckies]
decky_range = ips_to_range(ip_list)
if config.ipvlan:
teardown_host_ipvlan(decky_range)
else:
teardown_host_macvlan(decky_range)
remove_macvlan_network(client)
clear_state()
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
log.info("teardown complete all deckies removed network_driver=%s", net_driver)
console.print(f"[green]All deckies torn down. {net_driver} network removed.[/]")
def status() -> None:
state = load_state()
if state is None:
console.print("[yellow]No active deployment.[/]")
return
config, _ = state
client = docker.from_env()
table = Table(title="DECNET Deckies", show_lines=True)
table.add_column("Decky", style="bold")
table.add_column("IP")
table.add_column("Services")
table.add_column("Hostname")
table.add_column("Status")
running = {c.name: c.status for c in client.containers.list(all=True, ignore_removed=True)}
for decky in config.deckies:
statuses = []
for svc in decky.services:
cname = f"{decky.name}-{svc.replace('_', '-')}"
st = running.get(cname, "absent")
color = "green" if st == "running" else "red"
statuses.append(f"[{color}]{svc}({st})[/{color}]")
table.add_row(
decky.name,
decky.ip,
" ".join(statuses),
decky.hostname,
"[green]up[/]" if all("running" in s for s in statuses) else "[red]degraded[/]",
)
console.print(table)
def _teardown_order(lans: list[dict]) -> list[str]:
"""Return LAN names in leaf-first (DMZ-last) teardown order.
The generator names LANs in BFS order (``LAN-00`` = DMZ root,
then children, then grandchildren), so reverse-name order is a
correct leaf-first topological sort for the tree. Cross-edges
are membership-only — they don't introduce parent/child
relationships, so the BFS numbering remains valid.
"""
return sorted((lan["name"] for lan in lans), reverse=True)
def _topology_compose_path(topology_id: str) -> Path:
return Path(f"decnet-topology-{topology_id[:8]}-compose.yml")
async def _resolve_swarm_host(repo, host_uuid: str) -> dict:
host = await repo.get_swarm_host_by_uuid(host_uuid)
if host is None:
raise ValueError(
f"topology pinned to unknown swarm host {host_uuid!r}"
)
return host
async def _deploy_on_agent(repo, topology_id: str, hydrated: dict) -> None:
"""Route a topology apply to the agent pinned by ``target_host_uuid``.
Local imports avoid a circular dependency: decnet.swarm.client already
pulls decnet.engine indirectly via decnet.config.
"""
from decnet.swarm.client import AgentClient
target_host_uuid = hydrated["topology"]["target_host_uuid"]
host = await _resolve_swarm_host(repo, target_host_uuid)
version_hash = canonical_hash(hydrated)
await transition_status(repo, topology_id, TopologyStatus.DEPLOYING)
try:
async with AgentClient(host=host) as agent:
await agent.apply_topology(hydrated, version_hash)
except Exception as exc:
log.error(
"topology %s agent-apply failed on %s: %s",
topology_id, host.get("name"), exc,
)
await transition_status(
repo, topology_id, TopologyStatus.FAILED, reason=str(exc)
)
raise
await transition_status(repo, topology_id, TopologyStatus.ACTIVE)
log.info(
"topology %s deployed on agent %s (hash=%s)",
topology_id, host.get("name"), version_hash[:12],
)
async def resync_agent_topology(repo, topology_id: str) -> None:
"""Re-push an ACTIVE agent-targeted topology without status churn.
Used by the mutator reconcile loop when an agent's reported
applied_version_hash drifts from what master expects. Unlike the
initial deploy, we do NOT flip status — the topology is already
ACTIVE; we just want the agent's cache + live state to match
master's current hydrated blob.
"""
from decnet.swarm.client import AgentClient
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
raise ValueError(f"topology {topology_id!r} not found")
target_host_uuid = hydrated["topology"].get("target_host_uuid")
if not target_host_uuid:
raise ValueError(
f"topology {topology_id!r} has no target_host_uuid; "
"resync is agent-only"
)
host = await _resolve_swarm_host(repo, target_host_uuid)
version_hash = canonical_hash(hydrated)
async with AgentClient(host=host) as agent:
await agent.apply_topology(hydrated, version_hash)
log.info(
"topology %s resynced to agent %s (hash=%s)",
topology_id, host.get("name"), version_hash[:12],
)
async def _teardown_on_agent(repo, topology_id: str, hydrated: dict) -> None:
"""Route a topology teardown to the pinned agent."""
from decnet.swarm.client import AgentClient
target_host_uuid = hydrated["topology"]["target_host_uuid"]
host = await _resolve_swarm_host(repo, target_host_uuid)
await transition_status(repo, topology_id, TopologyStatus.TEARING_DOWN)
try:
async with AgentClient(host=host) as agent:
await agent.teardown_topology(topology_id)
except Exception as exc:
log.warning(
"topology %s agent-teardown failed on %s (continuing): %s",
topology_id, host.get("name"), exc,
)
await transition_status(repo, topology_id, TopologyStatus.TORN_DOWN)
log.info("topology %s torn down on agent %s", topology_id, host.get("name"))
def _warn_if_userland_proxy_enabled(hydrated: dict) -> None:
"""Soft warning: docker-proxy masks attacker source IPs.
Only log if the topology will publish ports (gateway deckies with
``forwards_l3=True``) — no point scaring operators on port-less
topologies. Best-effort: any failure talking to the daemon is
silently ignored.
"""
publishes = any(
(d.get("decky_config") or {}).get("forwards_l3")
for d in hydrated.get("deckies", [])
)
if not publishes:
return
try:
info = docker.from_env().info()
except Exception:
return
if info.get("UserlandProxy") or info.get("Userland Proxy"):
log.warning(
"[USERLAND_PROXY] docker-proxy is enabled; attacker source IPs "
"will appear as the bridge gateway. Set "
'"userland-proxy": false in /etc/docker/daemon.json to preserve '
"real source IPs."
)
@_traced("engine.deploy_topology")
async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> None:
"""Deploy a persisted MazeNET topology.
Assumes ``repo`` has the topology in ``pending`` state. Creates one
Docker bridge network per LAN, writes a per-topology compose file,
and brings all deckies up. Marks ``active`` on success, ``failed``
on exception (partial state left for later teardown).
"""
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
raise ValueError(f"topology {topology_id!r} not found")
# Precondition: validate before any status transition or Docker call.
# Errors bubble up as ValidationError and leave status untouched.
issues = _validate_topology(hydrated)
if _validation_errors(issues):
raise ValidationError(issues)
lans = hydrated["lans"]
compose_path = _topology_compose_path(topology_id)
if dry_run:
# Plan-only: don't touch repo status or Docker — write the compose
# so operators can diff it, nothing else.
write_topology_compose(hydrated, compose_path)
console.print(
f"[bold cyan]Dry run — topology compose file written[/] → {compose_path}"
)
log.info("topology %s dry-run complete", topology_id)
return
# Host-state precheck: PORT_COLLISION is a warning (docker-compose
# will hard-fail if the port is actually unavailable; we just want
# the clearer log line up-front). Only runs at live deploy.
for w in check_no_host_port_collision(hydrated):
log.warning("[%s] %s", w.code, w.message)
_warn_if_userland_proxy_enabled(hydrated)
# Pinned to an agent? Hand off to the mTLS path. Everything below
# this line is the master-local deploy.
if hydrated["topology"].get("target_host_uuid"):
await _deploy_on_agent(repo, topology_id, hydrated)
return
await transition_status(repo, topology_id, TopologyStatus.DEPLOYING)
client = docker.from_env()
created_networks: list[str] = []
compose_started = False
try:
for lan in lans:
net_name = _topology_network_name(topology_id, lan["name"])
# DMZ LAN is publicly routable; internal LANs are isolated
# from the host's default egress.
internal = not lan["is_dmz"]
create_bridge_network(
client, net_name, lan["subnet"], internal=internal
)
created_networks.append(net_name)
write_topology_compose(hydrated, compose_path)
console.print(
f"[bold cyan]Topology compose file written[/] → {compose_path}"
)
_compose_with_retry("up", "--build", "-d", compose_file=compose_path)
compose_started = True
except Exception as exc:
log.error("topology %s deploy failed: %s", topology_id, exc)
# Roll back any Docker state we created in this attempt so the
# next deploy doesn't trip over orphan networks or half-started
# containers. Best-effort: rollback errors must not mask the
# original deploy failure.
if compose_started or compose_path.exists():
try:
_compose(
"down", "--remove-orphans", compose_file=compose_path
)
except Exception as rb_exc: # pragma: no cover
log.warning(
"topology %s rollback compose-down failed: %s",
topology_id, rb_exc,
)
for net_name in reversed(created_networks):
try:
remove_bridge_network(client, net_name)
except Exception as rb_exc: # pragma: no cover
log.warning(
"topology %s rollback network %s removal failed: %s",
topology_id, net_name, rb_exc,
)
if compose_path.exists():
try:
compose_path.unlink()
except OSError: # pragma: no cover
pass
await transition_status(
repo, topology_id, TopologyStatus.FAILED, reason=str(exc)
)
raise
await transition_status(repo, topology_id, TopologyStatus.ACTIVE)
log.info("topology %s deployed n_lans=%d", topology_id, len(lans))
@_traced("engine.teardown_topology")
async def teardown_topology(repo, topology_id: str) -> None:
"""Tear down a persisted MazeNET topology.
Legal from ``active|degraded|failed|deploying``. Brings compose
down, removes each LAN's Docker bridge network in leaf-first order,
and marks ``torn_down``.
"""
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
raise ValueError(f"topology {topology_id!r} not found")
if hydrated["topology"].get("target_host_uuid"):
await _teardown_on_agent(repo, topology_id, hydrated)
return
await transition_status(repo, topology_id, TopologyStatus.TEARING_DOWN)
client = docker.from_env()
compose_path = _topology_compose_path(topology_id)
if compose_path.exists():
try:
_compose("down", "--remove-orphans", compose_file=compose_path)
except subprocess.CalledProcessError as exc:
log.warning(
"topology %s compose down failed (continuing): %s",
topology_id, exc,
)
for lan_name in _teardown_order(hydrated["lans"]):
net_name = _topology_network_name(topology_id, lan_name)
remove_bridge_network(client, net_name)
if compose_path.exists():
compose_path.unlink()
await transition_status(repo, topology_id, TopologyStatus.TORN_DOWN)
log.info("topology %s torn down", topology_id)
def _print_status(config: DecnetConfig) -> None:
table = Table(title="Deployed Deckies", show_lines=True)
table.add_column("Decky")
table.add_column("IP")
table.add_column("Services")
for decky in config.deckies:
table.add_row(decky.name, decky.ip, ", ".join(decky.services))
console.print(table)