fix(engine): offload blocking compose to a worker thread

deploy_topology and teardown_topology are async, but every
_compose_with_retry / _compose call inside them was running in the
main event loop via subprocess.run — which means a multi-minute
docker compose --build froze the entire API: other endpoints,
mutator events, SSE streams, status polls. The user noticed when a
2-decky deploy blocked everything else for the duration of the build.

Wrap both calls in anyio.to_thread.run_sync. Same pattern the
mutator engine has been using at engine.py:104 since forever.

Per-LAN bridge create/remove docker SDK calls are still synchronous
in the loop — they're individually fast (~50-200ms per LAN) and
the loops are bounded by topology size, so they don't dominate.
Worth revisiting if a 200-LAN deploy turns out to stall noticeably.
This commit is contained in:
2026-04-24 22:14:08 -04:00
parent f8ef0a5cf1
commit 99bc9a8b6d

View File

@@ -8,6 +8,7 @@ import subprocess # nosec B404
import time import time
from pathlib import Path from pathlib import Path
import anyio
import docker import docker
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
@@ -744,7 +745,14 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N
console.print( console.print(
f"[bold cyan]Topology compose file written[/] → {compose_path}" f"[bold cyan]Topology compose file written[/] → {compose_path}"
) )
_compose_with_retry("up", "--build", "-d", compose_file=compose_path) # Offload to a worker thread so the API event loop stays
# responsive during the build — otherwise every other request
# (mutator events, SSE, status polls) waits behind compose.
await anyio.to_thread.run_sync(
lambda: _compose_with_retry(
"up", "--build", "-d", compose_file=compose_path,
),
)
compose_started = True compose_started = True
except Exception as exc: except Exception as exc:
log.error("topology %s deploy failed: %s", topology_id, exc) log.error("topology %s deploy failed: %s", topology_id, exc)
@@ -808,7 +816,11 @@ async def teardown_topology(repo, topology_id: str) -> None:
if compose_path.exists(): if compose_path.exists():
try: try:
_compose("down", "--remove-orphans", compose_file=compose_path) await anyio.to_thread.run_sync(
lambda: _compose(
"down", "--remove-orphans", compose_file=compose_path,
),
)
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
log.warning( log.warning(
"topology %s compose down failed (continuing): %s", "topology %s compose down failed (continuing): %s",