feat(engine): route agent-pinned topologies via AgentClient

deploy_topology and teardown_topology now branch on
target_host_uuid.  When set:

- Hydrate the topology locally (validator runs exactly as before).
- Compute canonical_hash; push {hydrated, version_hash} to the
  pinned agent through AgentClient.apply_topology.
- Status machine still moves PENDING -> DEPLOYING -> ACTIVE on 2xx,
  PENDING -> DEPLOYING -> FAILED on error; master remains the sole
  owner of the row.

Teardown flips to TEARING_DOWN, fires /topology/teardown, then
TORN_DOWN — we log a warning on agent error but still settle to
TORN_DOWN so operators can delete the row (agent garbage is cleaned
on the next re-enroll).

Unihost deploys are unchanged — the field defaults to NULL so every
existing flow takes the local path.

Step 6 of the agent <-> topology integration.
This commit is contained in:
2026-04-21 01:27:59 -04:00
parent 5f8a746d6e
commit 05d1ebbaaa
2 changed files with 243 additions and 0 deletions

View File

@@ -11,6 +11,8 @@ import docker
from rich.console import Console
from rich.table import Table
from decnet.topology.hashing import canonical_hash
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.config import DecnetConfig, clear_state, load_state, save_state
@@ -311,6 +313,69 @@ def _topology_compose_path(topology_id: str) -> Path:
return Path(f"decnet-topology-{topology_id[:8]}-compose.yml")
async def _resolve_swarm_host(repo, host_uuid: str) -> dict:
host = await repo.get_swarm_host_by_uuid(host_uuid)
if host is None:
raise ValueError(
f"topology pinned to unknown swarm host {host_uuid!r}"
)
return host
async def _deploy_on_agent(repo, topology_id: str, hydrated: dict) -> None:
"""Route a topology apply to the agent pinned by ``target_host_uuid``.
Local imports avoid a circular dependency: decnet.swarm.client already
pulls decnet.engine indirectly via decnet.config.
"""
from decnet.swarm.client import AgentClient
target_host_uuid = hydrated["topology"]["target_host_uuid"]
host = await _resolve_swarm_host(repo, target_host_uuid)
version_hash = canonical_hash(hydrated)
await transition_status(repo, topology_id, TopologyStatus.DEPLOYING)
try:
async with AgentClient(host=host) as agent:
await agent.apply_topology(hydrated, version_hash)
except Exception as exc:
log.error(
"topology %s agent-apply failed on %s: %s",
topology_id, host.get("name"), exc,
)
await transition_status(
repo, topology_id, TopologyStatus.FAILED, reason=str(exc)
)
raise
await transition_status(repo, topology_id, TopologyStatus.ACTIVE)
log.info(
"topology %s deployed on agent %s (hash=%s)",
topology_id, host.get("name"), version_hash[:12],
)
async def _teardown_on_agent(repo, topology_id: str, hydrated: dict) -> None:
"""Route a topology teardown to the pinned agent."""
from decnet.swarm.client import AgentClient
target_host_uuid = hydrated["topology"]["target_host_uuid"]
host = await _resolve_swarm_host(repo, target_host_uuid)
await transition_status(repo, topology_id, TopologyStatus.TEARING_DOWN)
try:
async with AgentClient(host=host) as agent:
await agent.teardown_topology(topology_id)
except Exception as exc:
log.warning(
"topology %s agent-teardown failed on %s (continuing): %s",
topology_id, host.get("name"), exc,
)
await transition_status(repo, topology_id, TopologyStatus.TORN_DOWN)
log.info("topology %s torn down on agent %s", topology_id, host.get("name"))
def _warn_if_userland_proxy_enabled(hydrated: dict) -> None:
"""Soft warning: docker-proxy masks attacker source IPs.
@@ -378,6 +443,12 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N
_warn_if_userland_proxy_enabled(hydrated)
# Pinned to an agent? Hand off to the mTLS path. Everything below
# this line is the master-local deploy.
if hydrated["topology"].get("target_host_uuid"):
await _deploy_on_agent(repo, topology_id, hydrated)
return
await transition_status(repo, topology_id, TopologyStatus.DEPLOYING)
client = docker.from_env()
@@ -418,6 +489,10 @@ async def teardown_topology(repo, topology_id: str) -> None:
if hydrated is None:
raise ValueError(f"topology {topology_id!r} not found")
if hydrated["topology"].get("target_host_uuid"):
await _teardown_on_agent(repo, topology_id, hydrated)
return
await transition_status(repo, topology_id, TopologyStatus.TEARING_DOWN)
client = docker.from_env()