feat(swarm): AgentClient topology apply/teardown/state methods

Three new RPCs mirroring the existing deploy/teardown/status pattern:

- apply_topology(hydrated, version_hash) — long-timeout (600s) for
  image pulls + compose up.
- teardown_topology(topology_id) — 300s timeout; enough for a
  stubborn compose-down without hanging a heartbeat.
- get_topology_state() — short control-plane read for reconcile.

The per-call timeout swap uses the same trick as .deploy().

Step 5 of the agent <-> topology integration.
This commit is contained in:
2026-04-21 01:26:21 -04:00
parent 13cb0ff38e
commit 5f8a746d6e
2 changed files with 172 additions and 0 deletions

View File

@@ -34,6 +34,11 @@ log = get_logger("swarm.client")
# later iteration if the default proves too short.
_TIMEOUT_DEPLOY = httpx.Timeout(connect=10.0, read=600.0, write=30.0, pool=5.0)
_TIMEOUT_CONTROL = httpx.Timeout(connect=5.0, read=15.0, write=5.0, pool=5.0)
# Topology apply pulls images + runs compose on the agent — same ball-park
# as a fleet deploy. Teardown is faster but still long enough we can't
# reuse the control timeout.
_TIMEOUT_TOPOLOGY_APPLY = httpx.Timeout(connect=10.0, read=600.0, write=30.0, pool=5.0)
_TIMEOUT_TOPOLOGY_TEARDOWN = httpx.Timeout(connect=10.0, read=300.0, write=30.0, pool=5.0)
@dataclass(frozen=True)
@@ -191,6 +196,51 @@ class AgentClient:
resp.raise_for_status()
return resp.json()
# ------------------------------------------------------------ topology
async def apply_topology(
self,
hydrated: dict[str, Any],
version_hash: str,
) -> dict[str, Any]:
"""Push a hydrated topology to the agent for local materialisation.
The agent independently computes ``canonical_hash(hydrated)`` and
returns 400 if it disagrees with *version_hash* — that's how we
catch serialisation drift before half-creating bridges.
"""
old = self._require_client().timeout
self._require_client().timeout = _TIMEOUT_TOPOLOGY_APPLY
try:
resp = await self._require_client().post(
"/topology/apply",
json={"hydrated": hydrated, "version_hash": version_hash},
)
finally:
self._require_client().timeout = old
resp.raise_for_status()
return resp.json()
async def teardown_topology(self, topology_id: str) -> dict[str, Any]:
"""Ask the agent to dismantle the named topology."""
old = self._require_client().timeout
self._require_client().timeout = _TIMEOUT_TOPOLOGY_TEARDOWN
try:
resp = await self._require_client().post(
"/topology/teardown",
json={"topology_id": topology_id},
)
finally:
self._require_client().timeout = old
resp.raise_for_status()
return resp.json()
async def get_topology_state(self) -> dict[str, Any]:
"""Snapshot of the agent's applied topology + live docker state."""
resp = await self._require_client().get("/topology/state")
resp.raise_for_status()
return resp.json()
# -------------------------------------------------------------- diagnostics
def __repr__(self) -> str: